# Distribuované počítanie - Spark

Spark je v súčasnosti prvý framework, po ktorom ľudia idú ak chcú robiť nejaké distribuovane in-memory spracovanie údajov.

Existuje ale veľmi veľa ďalších možností, nástrojov a technológii:
* **Hadoop (MapReduce + HDFS)** - po tomto idú ľudia ak majú fakt veľa dát, ktoré nevlezú do pamäti ani na veľkom clustri a/alebo chcú pracovať na disku
* Storm
* Flink


<img src="http://mattturck.com/wp-content/uploads/2019/07/2019_Matt_Turck_Big_Data_Landscape_Final_Fullsize.png" alt="BigData landscape 2019"/>
zdroj: https://mattturck.com/data2019/

# Spark - zopár vlastností
Budem používať neznáme termíny, ktoré ste ale možno počuli na iných predmetoch. Ak by ste narazili na nejaký termín, ktorému nerozumiete, tak ma zastavte.

* In-memory spracovanie dát
* Jednotný prístup k dátam a výpočtovým prostriedkom (rovnako píšem kód ak pracujem na mojom notebooku a na celom klastri)
* Na pozadí Scala a JVM, ale má veľmi dobré API pre Python a aj R
* v podstate MapReduce ale v pamäti a s možnosťou microbatchov na spracovanie prúdov dát
* Základ je RDD (Resilient distributed dataset) - kolekcia dát distribuovaná na jednotlivé uzly. Základ práce sú transformácie nad dátami reprezentovanými ako RDD. Jednoduchá podpora pre základné operácie ako map, filter, collect
* Transformácie sú lenivé (lazy). Nevykonávajú sa dokedy ich nepotrebujete.


In [1]:
sc

In [2]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distData # toto je prklad RDD

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:480

In [3]:
f = distData.map(lambda x: x % 2 == 0)
f.take(3)

[False, True, False]

In [4]:
f = distData.filter(lambda x: x % 2 == 0)
f.take(5)

[2, 4]

# Hračkársky príklad s hľadaním prvočísel

Mám funkciu, ktorá overuje či je číslo prvočíslo a ja ju chcem distribuovať na veľa dát


In [5]:
# prevzane z https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python

def isprime(n):
    n = abs(int(n))
    if n < 2:
        return False
    if n == 2:
        return True
    if not n & 1:
        return False
    for x in range(3, int(n**0.5)+1, 2):
        if n % x == 0:
            return False
    return True

In [6]:
import time

In [7]:
nums = sc.parallelize(range(10**6))

In [8]:
start = time.time()
print(nums.filter(isprime).count())
end = time.time()
print("Elapsed time: {} s".format(end - start))

78498
Elapsed time: 2.577939033508301 s


# Oneskorený výpočet

In [9]:
start = time.time()
print(nums.filter(isprime))
end = time.time()
print("Elapsed time: {} s".format(end - start))

PythonRDD[7] at RDD at PythonRDD.scala:48
Elapsed time: 0.019066810607910156 s


Nezavolal som funkciu, ktorá by vracala výsledky, tak sa ešte nič nevykonalo. Pripravil sa len RDD s transformáciami, ktoré sa vykonajú vtedy, keď to bude treba

In [10]:
start = time.time()
print(nums.filter(isprime).take(5))
end = time.time()
print("Elapsed time: {} s".format(end - start))

[2, 3, 5, 7, 11]
Elapsed time: 0.07054328918457031 s


Až teraz som zavolal funkciu, ktorá vyžaduje aby sa niečo aj spočítalo. Stačilo mi ale spočítať prvých pár čísel, takže sa vykonala len časť výpočtu. Ďalšie funkcie, ktoré vracajú dáta sú napr. collect, count, ...

Opatrne s týmito funkciami (hlavne s collect). Vracajú vám všetky dáta, ktoré sú výsledkom výpočtu. Ak by ich bolo veľa, tak sa ich aj tak pokúsia vrátiť a počítač, z ktorého pristupujete k sparku to nemusí zvládnuť (ak teda pristupujem k nejakému väčšiemu stroju).


In [11]:
start = time.time()
print(nums.filter(isprime).takeOrdered(5, key = lambda x: -x))
end = time.time()
print("Elapsed time: {} s".format(end - start))

[999983, 999979, 999961, 999959, 999953]
Elapsed time: 2.7241902351379395 s


Tu som už potreboval spočítať všetko na to aby som to usporiadal, tak sa to muselo vykonať všetko a chvíľu to teda trvalo

# Skúsme trochu reálnejší príklad

zdroje:
* https://github.com/jadianes/spark-py-notebooks
* https://www.codementor.io/jadianes/python-spark-sql-dataframes-du107w74i


## Stiahnem si dáta o útokoch na počítačovú sieť

Sú to dáta charakterizujúce spojenia v sieti


In [20]:
import urllib.request
# f = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "data/kddcup.data.gz")
# f = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "data/kddcup.data_10_percent.gz")
# f = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "data/corrected.gz")

In [21]:
%%bash
ls -lh data

total 21M
-rw-r--r-- 1 jakub.sevcech anomaly 1.4M Dec 11 23:39 corrected.gz
-rw-r--r-- 1 jakub.sevcech anomaly 2.1M Dec 11 23:32 kddcup.data_10_percent.gz
-rw-r--r-- 1 jakub.sevcech anomaly  18M Dec 11 23:32 kddcup.data.gz


# Vytvorím si RDD

In [14]:
data_file = "data/kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file).cache()

Všimnite si, že pracujem s komprimovanými dátami. Skutočný objem je cca. 10 násobný.

# Definujem schému - hlavičku

In [15]:
from pyspark.sql import Row

# nacitam data a nastavim im schemu
csv_data = raw_data.map(lambda l: l.split(","))
row_data = csv_data.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5])
    )
)

## Vytvorím si DataFrame veľmi podobný tomu v Pandas

In [16]:
interactions_df = sqlContext.createDataFrame(row_data)
type(interactions_df)

pyspark.sql.dataframe.DataFrame

## Viem robiť podobné operácie ako s Pandas, akurát distribuovane

In [17]:
interactions_df.groupBy("protocol_type").count().show()

+-------------+------+
|protocol_type| count|
+-------------+------+
|          tcp|190065|
|          udp| 20354|
|         icmp|283602|
+-------------+------+



# Čo keď by som chcel k tým dátam pristupovať cez SQL?

In [24]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [18]:
interactions_df.registerTempTable("interactions")

In [27]:
tcp_interactions = sqlContext.sql("""
    SELECT duration, dst_bytes FROM interactions WHERE protocol_type = 'tcp' AND duration > 1000 AND dst_bytes = 0
""")
tcp_interactions.show()

+--------+---------+
|duration|dst_bytes|
+--------+---------+
|    5057|        0|
|    5059|        0|
|    5051|        0|
|    5056|        0|
|    5051|        0|
|    5039|        0|
|    5062|        0|
|    5041|        0|
|    5056|        0|
|    5064|        0|
|    5043|        0|
|    5061|        0|
|    5049|        0|
|    5061|        0|
|    5048|        0|
|    5047|        0|
|    5044|        0|
|    5063|        0|
|    5068|        0|
|    5062|        0|
+--------+---------+
only showing top 20 rows



## Čiže by sme vedeli Spark použiť napríklad na explikatívnu analýzu celkom veľkých objemov dát

## Máme tiež knižnice, na to aby sme natrénovali aj nejaké modely


# Zoberme si trénovacie a testovacie dáta

In [23]:
data_file = "data/kddcup.data.gz"
raw_data = sc.textFile(data_file)

print("Train data size is {}".format(raw_data.count()))

Train data size is 4898431


In [24]:
test_data_file = "data/corrected.gz"
test_raw_data = sc.textFile(test_data_file)

print("Test data size is {}".format(test_raw_data.count()))

Test data size is 311029


## Načítajme si ich ako zoznamy riadkov = pozorovania s atribútmi

In [25]:
from pyspark.mllib.regression import LabeledPoint
from numpy import array

csv_data = raw_data.map(lambda x: x.split(","))
test_csv_data = test_raw_data.map(lambda x: x.split(","))

## Pozorovanie vyzerá nejak takto

In [30]:
csv_data.take(1)

[['0',
  'tcp',
  'http',
  'SF',
  '215',
  '45076',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '1',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '0',
  '0',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.']]

## Opis dát je dostupný tu
http://kdd.ics.uci.edu/databases/kddcup99/task.html

## Takéto hodnoty nadobúdajú niektoré atribúty

In [26]:
protocols = csv_data.map(lambda x: x[1]).distinct().collect()
protocols

['icmp', 'udp', 'tcp']

In [27]:
services = csv_data.map(lambda x: x[2]).distinct().collect()
services

['finger',
 'netbios_dgm',
 'name',
 'X11',
 'hostnames',
 'vmnet',
 'systat',
 'shell',
 'netstat',
 'netbios_ssn',
 'urh_i',
 'pop_3',
 'ldap',
 'domain',
 'mtp',
 'remote_job',
 'exec',
 'supdup',
 'courier',
 'urp_i',
 'pop_2',
 'csnet_ns',
 'smtp',
 'whois',
 'daytime',
 'bgp',
 'imap4',
 'nntp',
 'http_443',
 'klogin',
 'rje',
 'IRC',
 'link',
 'http_8001',
 'uucp',
 'tftp_u',
 'iso_tsap',
 'uucp_path',
 'auth',
 'ecr_i',
 'other',
 'domain_u',
 'ssh',
 'discard',
 'ctf',
 'red_i',
 'tim_i',
 'time',
 'login',
 'Z39_50',
 'ftp',
 'telnet',
 'ntp_u',
 'sql_net',
 'aol',
 'private',
 'gopher',
 'efs',
 'http_2784',
 'ftp_data',
 'nnsp',
 'http',
 'sunrpc',
 'eco_i',
 'harvest',
 'kshell',
 'echo',
 'netbios_ns',
 'pm_dump',
 'printer']

In [28]:
flags = csv_data.map(lambda x: x[3]).distinct().collect()
flags

['S0', 'RSTR', 'SH', 'S1', 'S2', 'RSTOS0', 'REJ', 'OTH', 'SF', 'S3', 'RSTO']

# Zakódujme si kategorické dáta

Pre jednoduchosť to teraz môžeme spraviť ako keby to boli ordinálne premenné. V modeli explicitne povieme, že sú to kategorické premenné aby to nebral ako čísla.


In [31]:
def create_labeled_point(line_split):
    # leave_out = [41] (label)
    clean_line_split = line_split[0:41]
    
    # convert protocol to numeric categorical variable
    try: 
        clean_line_split[1] = protocols.index(clean_line_split[1])
    except:
        clean_line_split[1] = len(protocols)
        
    # convert service to numeric categorical variable
    try:
        clean_line_split[2] = services.index(clean_line_split[2])
    except:
        clean_line_split[2] = len(services)
    
    # convert flag to numeric categorical variable
    try:
        clean_line_split[3] = flags.index(clean_line_split[3])
    except:
        clean_line_split[3] = len(flags)
    
    # convert label to binary label
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
        
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data = csv_data.map(create_labeled_point)
test_data = test_csv_data.map(create_labeled_point)

## A natrénujme rozhodovací strom

Používame knižnicu **mllib**


In [34]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel

t0 = time.time()
tree_model = DecisionTree.trainClassifier(training_data, numClasses=2, 
                                          categoricalFeaturesInfo={1: len(protocols), 2: len(services), 3: len(flags)},
                                          impurity='gini', maxDepth=4, maxBins=100)

print("Classifier trained in {} seconds".format(round(time.time() - t0,3)))

Classifier trained in 272.709 seconds


## Predikcie sa dajú spočítať takto

In [35]:
t0 = time.time()
predictions = tree_model.predict(test_data.map(lambda p: p.features))

print("Predictions generated in {} seconds".format(round(time.time() - t0,3)))

Predictions generated in 0.049 seconds


## A úspešnosť môžeme vyhodnotiť takto

In [38]:
labels_and_preds = test_data.map(lambda p: p.label).zip(predictions)

t0 = time.time()
test_accuracy = labels_and_preds.filter(lambda x: x[0] == x[1]).count() / float(test_data.count())

print("Prediction made in {} seconds. Test accuracy is {}".format(round(time.time() - t0,3), round(test_accuracy,4)))

Prediction made in 23.226 seconds. Test accuracy is 0.916


# Pravidla zo stromu si vieme jednoducho vypísať

In [39]:
print("Learned classification tree model:")
print(tree_model.toDebugString())

Learned classification tree model:
DecisionTreeModel classifier of depth 4 with 27 nodes
  If (feature 22 <= 33.0)
   If (feature 25 <= 0.5)
    If (feature 36 <= 0.48)
     If (feature 34 <= 0.91)
      Predict: 0.0
     Else (feature 34 > 0.91)
      Predict: 1.0
    Else (feature 36 > 0.48)
     If (feature 2 in {0.0,56.0,42.0,52.0,14.0,61.0,38.0,13.0,41.0,2.0,32.0,22.0,44.0,50.0,11.0,23.0,30.0,51.0,19.0,47.0,15.0})
      Predict: 0.0
     Else (feature 2 not in {0.0,56.0,42.0,52.0,14.0,61.0,38.0,13.0,41.0,2.0,32.0,22.0,44.0,50.0,11.0,23.0,30.0,51.0,19.0,47.0,15.0})
      Predict: 1.0
   Else (feature 25 > 0.5)
    If (feature 3 in {5.0,6.0,9.0,3.0,8.0,4.0})
     If (feature 2 in {0.0,61.0,38.0,22.0,59.0,7.0,3.0,50.0,31.0,11.0,40.0,51.0,47.0})
      Predict: 0.0
     Else (feature 2 not in {0.0,61.0,38.0,22.0,59.0,7.0,3.0,50.0,31.0,11.0,40.0,51.0,47.0})
      Predict: 1.0
    Else (feature 3 not in {5.0,6.0,9.0,3.0,8.0,4.0})
     If (feature 38 <= 0.07)
      Predict: 0.0
     Else 

Teraz by som vedel interpretovať celý strom. Stačí mi pozrieť sa čo mám v jednotlivých stĺpcoch.  Popis dát je dostupný tu: http://kdd.ics.uci.edu/databases/kddcup99/task.html