<a href="https://colab.research.google.com/github/mekkUr13/BigData/blob/main/sparkgyak1-2/sparkgyak1-2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark



In [2]:
import os

os.getcwd()

'/content'

In [4]:
from pyspark import SparkConf, SparkContext

conf = SparkConf()
sc = SparkContext(conf=conf)


In [5]:
x = "MWűGkyösdwinko Qau fSJpDaTrykv!w"

rdd = sc.parallelize([x[i:i+2] for i in range(0, len(x), 2)])
res = rdd.map(lambda x: x[0]).collect()

print("".join(res))

Működik a Spark!


In [6]:
# RDD = resilient distributed dataset
# Az RDD objektumok immutábilis elosztott kollekciója
# RDD létrehozható bármilyen kollekcióból az sc.parallelize() függvénnyel.
# Az sc.parallelize() második argumentumában megadhatjuk hány partíción legyenek elosztva az adatok

tomb = ['a', 'b', 'c', 'd', 'e', 'f']

rdd = sc.parallelize(tomb, 5)
rdd

ParallelCollectionRDD[2] at readRDDFromFile at PythonRDD.scala:289

In [7]:
# Az alábbi függvény visszaadja az RDD partícióinak a számát.

rdd.getNumPartitions()

5

In [8]:
# Egy RDD újrapraticionálható, azonban ez a klaszter gépei közötti kommunikációt igényel

rdd2 = rdd.repartition(3)
rdd2.getNumPartitions()

3

In [9]:
# Az RDD-ken alkalmazhatunk transzformációkat és akciókat
# A transzformáció RDD-ből új RDD-t hoz létre
# Az akciókkat RDD-re alkalmazzuk és egyszerű értékkel térnek vissza

# Az alábbi kód létrehoz egy RDD-t, amely 1000 számot tartalmaz
# A filter() transzformáció szűr a hárommal osztható számokra
# A count() akicó megszámolja az RDD-ben található értékeket

szamok = sc.parallelize(range(1000)).filter(lambda x: x % 3 == 0)
szamok.count()

334

In [10]:
# Transzformációk: filter, map, flatMap, sort, sample, union, intersection, distinct
# Akciók: count, take, collect, max, min, reduce, foreach
# A nevük elárulja mit csinálnak, de érdemes kipróbálni őket!

# Példa: számok összeadása
# A reduce() akció aggregálja az elemeket egy függvénnyel, amely két argumentumot kap és egy értékkel tér vissza
# Kommutatív és asszociatív függvénynek kell lennie, hogy párhuzamosan végrehajtható legyen

szamok = sc.parallelize(range(1000))\
.filter(lambda x: x % 3 == 0)\
.reduce(lambda a, b: a + b)

print(szamok)

166833


In [15]:
# Lemezen lévő fájlból is készíthetünk RDD-t
# Az alábbi kód kírja a szöveg első két sorát:

lines = sc.textFile('alkotmany.txt').take(2)

print(lines)

['Mi, a magyar nemzet tagjai, az új évezred kezdetén, felelősséggel minden magyarért, kinyilvánítjuk az alábbiakat:', 'Büszkék vagyunk arra, hogy Szent István királyunk ezer évvel ezelőtt szilárd alapokra helyezte a magyar államot, és hazánkat a keresztény Európa részévé tette.']


In [16]:
# Feladat: Grep külön függvénnyel

def filter_row(line):
    grepWord = 'magyar'
    if grepWord in line:
        return True
    else:
        return False


lines = sc.textFile('alkotmany.txt')\
.filter(filter_row)

lines.take(2)

['Mi, a magyar nemzet tagjai, az új évezred kezdetén, felelősséggel minden magyarért, kinyilvánítjuk az alábbiakat:',
 'Büszkék vagyunk arra, hogy Szent István királyunk ezer évvel ezelőtt szilárd alapokra helyezte a magyar államot, és hazánkat a keresztény Európa részévé tette.']

In [17]:
# Feladat: Grep

grepWord = 'magyar'

lines = sc.textFile('alkotmany.txt')\
.filter(lambda line: grepWord in line)

lines.take(2)

['Mi, a magyar nemzet tagjai, az új évezred kezdetén, felelősséggel minden magyarért, kinyilvánítjuk az alábbiakat:',
 'Büszkék vagyunk arra, hogy Szent István királyunk ezer évvel ezelőtt szilárd alapokra helyezte a magyar államot, és hazánkat a keresztény Európa részévé tette.']

In [18]:
lines = sc.textFile('alkotmany.txt')\
.map(lambda l: l.split(" "))

lines.take(2)

[['Mi,',
  'a',
  'magyar',
  'nemzet',
  'tagjai,',
  'az',
  'új',
  'évezred',
  'kezdetén,',
  'felelősséggel',
  'minden',
  'magyarért,',
  'kinyilvánítjuk',
  'az',
  'alábbiakat:'],
 ['Büszkék',
  'vagyunk',
  'arra,',
  'hogy',
  'Szent',
  'István',
  'királyunk',
  'ezer',
  'évvel',
  'ezelőtt',
  'szilárd',
  'alapokra',
  'helyezte',
  'a',
  'magyar',
  'államot,',
  'és',
  'hazánkat',
  'a',
  'keresztény',
  'Európa',
  'részévé',
  'tette.']]

In [19]:
lines = sc.textFile('alkotmany.txt')\
.flatMap(lambda l: l.split(" "))\
.sortBy(lambda word: word)

lines.take(10)

['1944.',
 '1949.',
 '1956-os',
 '1990.',
 'Alaptörvényünk',
 'Becsüljük',
 'Bízunk',
 'Büszkék',
 'Büszkék',
 'Büszkék']

In [20]:
# Kulcs-érték párok készítése
# Műveletek kulcs-érték párokon: reduceByKey, groupByKey, countByKey

lines = sc.textFile('alkotmany.txt')\
.flatMap(lambda l: l.split(" "))\
.map(lambda w: (w.lower()[0], w.lower()))

lines.take(10)

[('m', 'mi,'),
 ('a', 'a'),
 ('m', 'magyar'),
 ('n', 'nemzet'),
 ('t', 'tagjai,'),
 ('a', 'az'),
 ('ú', 'új'),
 ('é', 'évezred'),
 ('k', 'kezdetén,'),
 ('f', 'felelősséggel')]

In [21]:
# Feladat: WordCount
# Kulcs alapján aggregáló függvények: reduceByKey, aggregateByKey, groupByKey, sortByKey

lines = sc.textFile('alkotmany.txt')\
.flatMap(lambda l: l.split(" "))\
.map(lambda l: l.replace(':',"").replace(",","").replace(".", "").replace("!",""))\
.map(lambda w: (w.lower(), 1))\
.reduceByKey(lambda a, b: a + b)\
.sortBy(lambda x: x[1], False)

lines.take(5)

[('a', 41), ('és', 24), ('az', 19), ('hogy', 17), ('valljuk', 9)]

In [22]:
# Feladat: WordCount más megoldással

lines = sc.textFile('alkotmany.txt')\
.flatMap(lambda l: l.split(" "))\
.map(lambda l: l.replace(':',"").replace(",","").replace(".", "").replace("!",""))\
.map(lambda w: (w.lower(), 1))\
.groupByKey()\
.mapValues(sum)\
.sortBy(lambda t: t[1], False)

lines.take(5)

[('a', 41), ('és', 24), ('az', 19), ('hogy', 17), ('valljuk', 9)]

Gyakorló Feladatok

In [23]:
# 1. feladat: Milyen hosszú a leghosszabb szó?

lines = sc.textFile('alkotmany.txt')\
.flatMap(lambda l: l.split(" "))\
.map(lambda l: l.replace(':',"").replace(",","").replace(".", "").replace("!",""))\
.map(lambda w: (w.lower(), len(w)))\
.sortBy(lambda t: t[1], False)

lines.take(1)[0][1]

# Egyszerűbben
lines = sc.textFile('alkotmany.txt')\
.flatMap(lambda l: l.split(" "))\
.map(lambda l: l.replace(':',"").replace(",","").replace(".", "").replace("!",""))\
.map(lambda w: len(w))\
.sortBy(lambda t: t, False)

lines.take(1)[0]

#Pancser vagyok, ennyire egyszerű
lines = sc.textFile("alkotmany.txt")\
.flatMap(lambda l : l.split(" "))\
.map(lambda w : len(w))

lines.max()

18

In [34]:
# 2. feladat: Adjuk meg a szavak átlagos hosszát!

avg = sc.textFile('alkotmany.txt')\
.flatMap(lambda l: l.split(" "))\
.map(lambda l: l.replace(':',"").replace(",","").replace(".", "").replace("!",""))\
.map(lambda w: len(w))\
.reduce(lambda a, b: (a+b)/2)

print(avg)

# Megoldás

lines = sc.textFile("alkotmany.txt")\
.flatMap(lambda l : l.split(" "))\
.map(lambda l: l.replace(':',"").replace(",","").replace(".", "").replace("!",""))\
.map(lambda w: (len(w), 1) )\
.reduce(lambda a, b: (a[0]+b[0], a[1]+b[1]) )

print(lines[0]/lines[1])

9.616566583863989
6.782881002087683


In [25]:
# 3. feladat: Melyik a leghosszabb szó és milyen hosszú?

lines = sc.textFile('alkotmany.txt')\
.flatMap(lambda l: l.split(" "))\
.map(lambda l: l.replace(':',"").replace(",","").replace(".", "").replace("!",""))\
.map(lambda w: (w.lower(), len(w)))\
.sortBy(lambda t: t[1], False)

lines.take(1)

# Megoldás

maxl = sc.textFile("alkotmany.txt")\
.flatMap(lambda l : l.split(" "))\
.map(lambda l: l.replace(':',"").replace(",","").replace(".", "").replace("!",""))\
.map(lambda w : (w,len(w)))\
.reduce(lambda a, b: a if a[1] > b[1] else b)

print(maxl)

# Pancser nő megoldása
maxl = sc.textFile("alkotmany.txt")\
.flatMap(lambda l : l.split(" "))\
.map(lambda l: l.replace(':',"").replace(",","").replace(".", "").replace("!",""))\
.map(lambda w : (w,len(w)))\
.max(key=lambda x: x[1])

print(maxl)

# Saját vagány
maxl = sc.textFile("alkotmany.txt")\
.flatMap(lambda l : l.split(" "))\
.map(lambda l: l.replace(':',"").replace(",","").replace(".", "").replace("!",""))\
.map(lambda w : (w,len(w)))\
.reduce(lambda a, b: max(a,b, key = lambda x: x[1]))

print(maxl)

('nemzetiszocialista', 18)
('nemzetiszocialista', 18)
('nemzetiszocialista', 18)


In [26]:
# 4. feladat: Adjuk meg kezdőbetűnként a leghosszabb szó hosszát!

lines = sc.textFile('alkotmany.txt')\
.flatMap(lambda l: l.split(" "))\
.map(lambda l: l.replace(':',"").replace(",","").replace(".", "").replace("!",""))\
.map(lambda w: (w.lower()[0], len(w)))\
.groupByKey()\
.mapValues(max)\
.sortBy(lambda t: t[1], False)

a = lines.take(10)

# Megoldás

lines = sc.textFile("alkotmany.txt")\
.flatMap(lambda l : l.split(" "))\
.map(lambda l: l.replace(':',"").replace(",","").replace(".", "").replace("!",""))\
.map(lambda w: (w[0].lower(), len(w)))\
.reduceByKey(lambda a,b: a if a > b else b)\
.sortBy(lambda t: t[1], False)

b = lines.take(10)
a == b

True

In [27]:
import random
import math

num_sample = 1000

def inside(p):
  x, y = random.random(), random.random()
  return math.sqrt(x **2 + y ** 2) <= 1

count = sc.parallelize(range(0, num_sample))\
.filter(inside)\
.count()

pi = 4 * (count / num_sample)

print(pi)

3.072


In [37]:
def isfloat(s:str) -> bool:
    try:
        float(s)
        return True
    except ValueError:
        return False

'''lines = sc.textFile("tempBudapestMeteoBlue.csv")\
.flatMap(lambda l: l.split(","))\
.filter(lambda x: isfloat(x[1]) and len(x[0]) > 10)\
.max(key=lambda x: float(x[1]))'''

print(lines)

# Megoldás
data = sc.textFile('tempBudapestMeteoBlue.csv')

data\
.filter(lambda l: l.startswith('20'))\
.map(lambda l: l.split(','))\
.map(lambda l: (l[0].split('T'), l[1]))\
.map(lambda l: (l[0][0], (l[0][1], float(l[1]))))\
.reduceByKey(lambda a,b: a if a[1] > b[1] else b)\
.collect()


(3249, 479)


[('20200916', ('1600', 31.431458)),
 ('20200917', ('1500', 30.111458)),
 ('20200918', ('1600', 22.861458)),
 ('20200919', ('1600', 23.311459)),
 ('20200920', ('1700', 25.321459)),
 ('20200921', ('1300', 27.171457)),
 ('20200922', ('1600', 27.831459)),
 ('20200923', ('1400', 27.131458)),
 ('20200924', ('1500', 26.421457)),
 ('20200926', ('0000', 17.621458)),
 ('20200927', ('1500', 18.401459)),
 ('20200928', ('1500', 19.731459)),
 ('20200914', ('1700', 30.651459)),
 ('20200915', ('1600', 32.06146)),
 ('20200925', ('1800', 22.981459))]

In [30]:
#Akkumulátor
keresendo = sc.broadcast(['MAGYAR'])
elofordulas = sc.accumulator(0)

def countWord(w):
  if keresendo.value[0] in w.upper():
    elofordulas.add(1)

lines = sc.textFile('alkotmany.txt')\
.flatMap(lambda l: l.split(" "))\
.foreach(countWord)

print(elofordulas)

12


In [38]:
#Maxtemp feladat úgy, hogy megszámoljuk a 30 foknál melegebb napok számát
hotterthan = sc.accumulator(0)
degree = sc.broadcast([30])
def countHot(d):
  if degree.value[0] < d[1][1]:
    hotterthan.add(1)
  return d

data = sc.textFile('tempBudapestMeteoBlue.csv')

datas = data\
.filter(lambda l: l.startswith('20'))\
.map(lambda l: l.split(','))\
.map(lambda l: (l[0].split('T'), l[1]))\
.map(lambda l: (l[0][0], (l[0][1], float(l[1]))))\
.reduceByKey(lambda a,b: a if a[1] > b[1] else b)\
.map(lambda l: countHot(l))\
.collect()

print(hotterthan)
print(*datas, sep='\n')

4
('20200916', ('1600', 31.431458))
('20200917', ('1500', 30.111458))
('20200918', ('1600', 22.861458))
('20200919', ('1600', 23.311459))
('20200920', ('1700', 25.321459))
('20200921', ('1300', 27.171457))
('20200922', ('1600', 27.831459))
('20200923', ('1400', 27.131458))
('20200924', ('1500', 26.421457))
('20200926', ('0000', 17.621458))
('20200927', ('1500', 18.401459))
('20200928', ('1500', 19.731459))
('20200914', ('1700', 30.651459))
('20200915', ('1600', 32.06146))
('20200925', ('1800', 22.981459))


In [83]:
# Spark gyakorló feladatok #2

letters = ['A','C','G','T']

three_mer = {}
for i in range(4):
  for j in range(4):
    for k in range(4):
        three_mer[(letters[i], letters[j],letters[k])] =  sc.accumulator(0)
#print(three_mer)
data = sc.textFile('kmer_input1.txt')

kmer = data.flatMap(lambda l: [l[i:i+3] for i in range(len(l) -2)])\
.filter(lambda l: 'T' in l.upper())\
.map(lambda kmer: (kmer, 1))\
.reduceByKey(lambda a,b: a +b)\
.filter(lambda x: x[1] > 30)



kmer.collect()

[('GCT', 31),
 ('TTC', 32),
 ('ATT', 40),
 ('AAT', 35),
 ('TGG', 47),
 ('GTT', 33),
 ('CGT', 32),
 ('ATC', 32),
 ('TTT', 37),
 ('TCA', 31),
 ('CAT', 32),
 ('CTG', 58),
 ('TGA', 35),
 ('TGC', 53),
 ('ATG', 35),
 ('GTG', 39),
 ('GAT', 39),
 ('TTA', 31),
 ('GGT', 38),
 ('TTG', 32)]