# Apache Spark intro


Apache Spark is a general-purpose, in-memory computing engine. Spark can be used with Hadoop, Yarn and other Big Data components to harness the power of Spark and improve the performance of your applications. It provides high-level APIs in Scala, Java, Python, R, and SQL.

**Spark Architecture**

Apache Spark works in a master-slave architecture where the master is called “Driver” and slaves are called “Workers”. The starting point of your Spark application is `sc`, a Spark Context Class instance. It runs inside the driver.

![](https://miro.medium.com/v2/resize:fit:1192/0*XzNeTtwEgIy5yWR_)


![](https://miro.medium.com/v2/resize:fit:1266/0*-PltnPR9row8iUDo)

**Apache Spark**: 
Sometimes also called Spark Core. The Spark Core implementation is a RDD (Resilient Distributed Dataset) which is a collection of distributed data across different nodes of the cluster that are processed in parallel.


**Spark SQL**: 
The implementation here is DataFrame, which is a relational representation of the data. It provides functions with SQL like capabilities. Also, we can write SQL like queries for our data analysis.


**Spark Streaming**: 
The implementation provided by this library is D-stream, also called Discretized Stream. This library provides capabilities to process/transform data in near real-time.


**MLlib**: 
This is a Machine Learning library with commonly used algorithms including collaborative filtering, classification, clustering, and regression.

**GraphX**: 
This library helps us to process Graphs, solving various problems (like Page Rank, Connected Components, etc) using Graph Theory.


Let’s dig a little deeper into Apache Spark (Spark Core), starting with RDD.




In [None]:
from pyspark import SparkContext

In [None]:
sc = SparkContext(appName="myAppName")

In [None]:
sc

### RDD

- Resilient Distributed Dataset
- Podstawowa abstrakcja oraz rdzeń Sparka
- Obsługiwane przez dwa rodzaje operacji:
    - Akcje:
        - operacje uruchamiająceegzekucję transformacji na RDD
        - przyjmują RDD jako input i zwracają wynik NIE będący RDD
    - Transformacje:
        - leniwe operacje
        - przyjmują RDD i zwracają RDD

- In-Memory - dane RDD przechowywane w pamięci
- Immutable 
- Lazy evaluated
- Parallel - przetwarzane równolegle
- Partitioned - rozproszone 

## WAŻNE informacje !

Ważne do zrozumienia działania SPARKA:

Term                   |Definition
----                   |-------
RDD                    |Resilient Distributed Dataset
Transformation         |Spark operation that produces an RDD
Action                 |Spark operation that produces a local object
Spark Job              |Sequence of transformations on data with a final action


Dwie podstawowe metody tworzenia RDD:

Method                      |Result
----------                               |-------
`sc.parallelize(array)`                  |Create RDD of elements of array (or list)
`sc.textFile(path/to/file)`                      |Create RDD of lines from file

Podstawowe transformacje

Transformation Example                          |Result
----------                               |-------
`filter(lambda x: x % 2 == 0)`           |Discard non-even elements
`map(lambda x: x * 2)`                   |Multiply each RDD element by `2`
`map(lambda x: x.split())`               |Split each string into words
`flatMap(lambda x: x.split())`           |Split each string into words and flatten sequence
`sample(withReplacement=True,0.25)`      |Create sample of 25% of elements with replacement
`union(rdd)`                             |Append `rdd` to existing RDD
`distinct()`                             |Remove duplicates in RDD
`sortBy(lambda x: x, ascending=False)`   |Sort elements in descending order

Podstawowe akcje 

Action                             |Result
----------                             |-------
`collect()`                            |Convert RDD to in-memory list 
`take(3)`                              |First 3 elements of RDD 
`top(3)`                               |Top 3 elements of RDD
`takeSample(withReplacement=True,3)`   |Create sample of 3 elements with replacement
`sum()`                                |Find element sum (assumes numeric elements)
`mean()`                               |Find element mean (assumes numeric elements)
`stdev()`                              |Find element deviation (assumes numeric elements)

In [None]:
keywords = ['Books', 'DVD', 'CD', 'PenDrive']
key_rdd = sc.parallelize(keywords)
key_rdd.collect()

In [None]:
key_small = key_rdd.map(lambda x: x.lower()) # transformacja

In [None]:
key_small.collect() # akcja 

In [None]:
sc.stop()

**Spark’s core data structure**

✅: A low level object that lets Spark work its magic by splitting data across multiple nodes in the cluster.

❌: However, RDDs are hard to work with directly, so we’ll be using the Spark DataFrame abstraction built on top of RDDs.

## MAP REDUCE

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("new").getOrCreate()

# otrzymanie obiektu SparkContext
sc = spark.sparkContext

In [None]:
import re
# Word Count on RDD 
sc.textFile("MobyDick.txt")\
.map(lambda x: re.findall(r"[a-z']+", x.lower())) \
.flatMap(lambda x: [(y, 1) for y in x]).reduceByKey(lambda x,y: x + y)\
.take(5)

## SPARK STREAMING

Część Sparka odpowiedzialna za przetwarzanie danych w czasie rzeczywistym. 


<img src="https://spark.apache.org/docs/latest/img/streaming-arch.png"/>

Dane mogą pochodzić z różnych źródeł np. sokety TCP, Kafka, etc. 
Korzystając z poznanych już metod `map, reduce, join, oraz window` można w łatwy sposób generować przetwarzanie strumienia tak jaby był to nieskończony ciąg RDD. 
Ponadto nie ma problemu aby wywołać na strumieniu operacje ML czy wykresy. 

Cała procedura przedstawia się następująco: 

<img src="https://spark.apache.org/docs/latest/img/streaming-flow.png"/>

SPARK STREAMING w tej wersji wprowadza abstrakcje zwaną `discretized stream` *DStream* (reprezentuje sekwencję RDD).

Operacje na DStream można wykonywać w API JAVA, SCALA, Python, R (nie wszystkie możliwości są dostępne dla Pythona). 


Spark Streaming potrzebuje minium 2 rdzenie.

----
- **StreamingContext(sparkContext, batchDuration)** - reprezentuje połączenie z klastrem i służy do tworzenia DStreamów, `batchDuration` wskazuje na granularność batch'y (w sekundach)
- **socketTextStream(hostname, port)** - tworzy DStream na podstawie danych napływających ze wskazanego źródła TCP
- **flatMap(f), map(f), reduceByKey(f)** - działają analogicznie jak w przypadku RDD z tym że tworzą nowe DStream'y
- **pprint(n)** - printuje pierwsze `n` (domyślnie 10) elementów z każdego RDD wygenerowanego w DStream'ie
- **StreamingContext.start()** - rozpoczyna działania na strumieniach
- **StreamingContext.awaitTermination(timeout)** - oczekuje na zakończenie działań na strumieniach
- **StreamingContext.stop(stopSparkContext, stopGraceFully)** - kończy działania na strumieniach

Obiekt StreamingContext można wygenerować za pomocą obiektu SparkContext.


<img src="https://spark.apache.org/docs/latest/img/streaming-dstream.png"/>

<img src="https://spark.apache.org/docs/latest/img/streaming-dstream-ops.png"/>

In [None]:
import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread
# and batch interval of 1 second

sc = SparkContext("local[2]", "NetworkWordCount2")
ssc = StreamingContext(sc, 2)

# DStream
lines = ssc.socketTextStream("localhost", 9998)

# podziel każdą linię na wyrazy
# DStream jest mapowany na kolejny DStream
# words = lines.flatMap(lambda line: line.split(" "))

words = lines.flatMap(lambda x: re.findall(r"[a-z']+", x.lower()))

# zliczmy każdy wyraz w każdym batchu
# DStream jest mapowany na kolejny DStream
# pairs = words.map(lambda word: (word, 1))

# DStream jest mapowany na kolejny DStream                  
# wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts = words.map(lambda word: (word,1)).reduceByKey(lambda x,y: x+y)
# wydrukuj pierwszy element
wordCounts.pprint()

In [None]:
# before start run a stream data
ssc.start()             # Start the computation
ssc.awaitTermination()
ssc.stop()
sc.stop()

In [None]:
# w konsoli linuxowej netcat Nmap for windows
!nc -lk 9998

In [None]:
%%file start_stream.py

from socket import *
import time

rdd = list()
with open("MobyDick_full.txt", 'r') as ad:
    for line in ad:
        rdd.append(line)

HOST = 'localhost'
PORT = 9998
ADDR = (HOST, PORT)
tcpSock = socket(AF_INET, SOCK_STREAM)
tcpSock.bind(ADDR)
tcpSock.listen(5)


while True:
    c, addr = tcpSock.accept()
    print('got connection')
    for line in rdd:
        try:
            c.send(line.encode())
            time.sleep(1)
        except:
            break
    c.close()
    print('disconnected')


## nowe podejscie

Designed to behave a lot like a SQL table

✅:
- easier to understand,
- Operations using DataFrames are automatically optimized
- When using RDDs, it’s up to the data scientist to figure out the right way to optimize the query, but the DataFrame implementation has much of this optimization built in!

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.getOrCreate()

## Eksploracyjna Analiza Danych

In [36]:
# zweryfikuj schemat danych
parsed.printSchema()

root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: double (nullable = true)
 |-- cmp_fname_c2: double (nullable = true)
 |-- cmp_lname_c1: double (nullable = true)
 |-- cmp_lname_c2: double (nullable = true)
 |-- cmp_sex: integer (nullable = true)
 |-- cmp_bd: integer (nullable = true)
 |-- cmp_bm: integer (nullable = true)
 |-- cmp_by: integer (nullable = true)
 |-- cmp_plz: integer (nullable = true)
 |-- is_match: boolean (nullable = true)



In [37]:
# sprawdz wartosci dla pierwszego rzedu
parsed.first()

Row(id_1=3148, id_2=8326, cmp_fname_c1=1.0, cmp_fname_c2=None, cmp_lname_c1=1.0, cmp_lname_c2=None, cmp_sex=1, cmp_bd=1, cmp_bm=1, cmp_by=1, cmp_plz=1, is_match=True)

In [38]:
# ile przypadkow 
parsed.count()

5749132

In [None]:
# zapisz do pamieci na klastrze (1 maszyna) 
parsed.cache()

In [39]:
# target "is_match" liczba zgodnych i niezgodnych rekordow
from pyspark.sql.functions import col

parsed.groupBy("is_match").count().orderBy(col("count").desc()).show()

+--------+-------+
|is_match|  count|
+--------+-------+
|   false|5728201|
|    true|  20931|
+--------+-------+



In [40]:
# inne agregaty agg
from pyspark.sql.functions import avg, stddev, stddev_pop

parsed.agg(avg("cmp_sex"), stddev("cmp_sex"), stddev_pop("cmp_sex")).show()

+-----------------+--------------------+-------------------+
|     avg(cmp_sex)|stddev_samp(cmp_sex)|stddev_pop(cmp_sex)|
+-----------------+--------------------+-------------------+
|0.955001381078048| 0.20730111116897781|0.20730109314007186|
+-----------------+--------------------+-------------------+



In [41]:
# polecenia sql - przypisanie nazwy dla silnika sql - tabela przejsciowa
parsed.createOrReplaceTempView("dane")

In [42]:
spark.sql(""" SELECT is_match, COUNT(*) cnt FROM dane group by is_match order by cnt DESC""").show()

+--------+-------+
|is_match|    cnt|
+--------+-------+
|   false|5728201|
|    true|  20931|
+--------+-------+



In [43]:
# zbiorcze statystyki 
summary = parsed.describe()
summary.show()

+-------+------------------+------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+
|summary|              id_1|              id_2|       cmp_fname_c1|      cmp_fname_c2|      cmp_lname_c1|       cmp_lname_c2|            cmp_sex|             cmp_bd|             cmp_bm|            cmp_by|            cmp_plz|
+-------+------------------+------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+
|  count|           5749132|           5749132|            5748125|            103698|           5749132|               2464|            5749132|            5748337|            5748337|           5748337|            5736289|
|   mean| 33324.48559643438| 66587.43558331935| 0.7129024704437266|0.9000176718903189|0.315627819308

In [44]:
summary.select("summary", "cmp_fname_c1", "cmp_fname_c2").show()

+-------+-------------------+------------------+
|summary|       cmp_fname_c1|      cmp_fname_c2|
+-------+-------------------+------------------+
|  count|            5748125|            103698|
|   mean| 0.7129024704437266|0.9000176718903189|
| stddev|0.38875835961628014|0.2713176105782334|
|    min|                0.0|               0.0|
|    max|                1.0|               1.0|
+-------+-------------------+------------------+



> która zmienna lepiej opisze dane c1 czy c2 

In [45]:
# statystyki dla poszczegolnych klas

# filtrowanie sql
matches = parsed.where("is_match = true")


# filtrowanie pyspark
misses = parsed.filter(col("is_match") == False)

match_summary = matches.describe()
miss_summary = misses.describe()

In [46]:
match_summary.show()

+-------+-----------------+-----------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+
|summary|             id_1|             id_2|       cmp_fname_c1|       cmp_fname_c2|        cmp_lname_c1|       cmp_lname_c2|            cmp_sex|             cmp_bd|              cmp_bm|             cmp_by|            cmp_plz|
+-------+-----------------+-----------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+
|  count|            20931|            20931|              20922|               1333|               20931|                475|              20931|              20925|               20925|              20925|              20902|
|   mean|34575.72117911232|51259.95939037791| 0.9973163859635038| 0.9898900320318176|  0

In [47]:
miss_summary.show()

+-------+------------------+------------------+-------------------+-------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+--------------------+
|summary|              id_1|              id_2|       cmp_fname_c1|       cmp_fname_c2|      cmp_lname_c1|       cmp_lname_c2|            cmp_sex|            cmp_bd|            cmp_bm|            cmp_by|             cmp_plz|
+-------+------------------+------------------+-------------------+-------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+--------------------+
|  count|           5728201|           5728201|            5727203|             102365|           5728201|               1989|            5728201|           5727412|           5727412|           5727412|             5715387|
|   mean|33319.913548075565| 66643.44259218557| 0.7118634802175091| 0.8988473514090158|0.31313801133

## Tabele przestawne spark

In [48]:
summary_p = summary.toPandas()

In [49]:
summary_p.head()

Unnamed: 0,summary,id_1,id_2,cmp_fname_c1,cmp_fname_c2,cmp_lname_c1,cmp_lname_c2,cmp_sex,cmp_bd,cmp_bm,cmp_by,cmp_plz
0,count,5749132.0,5749132.0,5748125.0,103698.0,5749132.0,2464.0,5749132.0,5748337.0,5748337.0,5748337.0,5736289.0
1,mean,33324.48559643438,66587.43558331935,0.7129024704437266,0.9000176718903189,0.3156278193080383,0.3184128315317443,0.955001381078048,0.2244652670850717,0.488855298497635,0.2227485966810923,0.0055286614743434
2,stddev,23659.859374488064,23620.48761326969,0.3887583596162801,0.2713176105782334,0.3342336339615828,0.3685670662006653,0.2073011111689778,0.4172297223846263,0.4998758236779031,0.4160909629831756,0.0741491492542004
3,min,1.0,6.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,max,99980.0,100000.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0


In [None]:
summary_p.shape

In [50]:
summary_p = summary_p.set_index('summary').transpose().reset_index()
summary_p = summary_p.rename(columns={'index':'field'})
summary_p = summary_p.rename_axis(None, axis=1)

In [51]:
summaryT = spark.createDataFrame(summary_p)
summaryT.show()

+------------+-------+-------------------+-------------------+---+------+
|       field|  count|               mean|             stddev|min|   max|
+------------+-------+-------------------+-------------------+---+------+
|        id_1|5749132|  33324.48559643438| 23659.859374488064|  1| 99980|
|        id_2|5749132|  66587.43558331935| 23620.487613269695|  6|100000|
|cmp_fname_c1|5748125| 0.7129024704437266|0.38875835961628014|0.0|   1.0|
|cmp_fname_c2| 103698| 0.9000176718903189| 0.2713176105782334|0.0|   1.0|
|cmp_lname_c1|5749132| 0.3156278193080383| 0.3342336339615828|0.0|   1.0|
|cmp_lname_c2|   2464| 0.3184128315317443|0.36856706620066537|0.0|   1.0|
|     cmp_sex|5749132|  0.955001381078048|0.20730111116897781|  0|     1|
|      cmp_bd|5748337|0.22446526708507172|0.41722972238462636|  0|     1|
|      cmp_bm|5748337|0.48885529849763504| 0.4998758236779031|  0|     1|
|      cmp_by|5748337| 0.2227485966810923| 0.4160909629831756|  0|     1|
|     cmp_plz|5736289|0.00552866147434

In [52]:
summaryT.printSchema() # czy dobre typy danych ?? 

root
 |-- field: string (nullable = true)
 |-- count: string (nullable = true)
 |-- mean: string (nullable = true)
 |-- stddev: string (nullable = true)
 |-- min: string (nullable = true)
 |-- max: string (nullable = true)



In [53]:
from pyspark.sql.types import DoubleType

for c in summaryT.columns:
    if c == 'field':
        continue
    summaryT = summaryT.withColumn(c, summaryT[c].cast(DoubleType()))

In [54]:
summaryT.printSchema() # teraz lepiej

root
 |-- field: string (nullable = true)
 |-- count: double (nullable = true)
 |-- mean: double (nullable = true)
 |-- stddev: double (nullable = true)
 |-- min: double (nullable = true)
 |-- max: double (nullable = true)



In [None]:
# wykonaj to samo dla tabel match i miss

In [55]:
def pivot_summary(desc):
    desc_p = desc.toPandas()
    desc_p = desc_p.set_index('summary').transpose().reset_index()
    desc_p = desc_p.rename(columns={'index':'field'})
    desc_p = desc_p.rename_axis(None, axis=1)
    descT = spark.createDataFrame(desc_p)
    for c in descT.columns:
        if c == 'field':
            continue
        else:
            descT = descT.withColumn(c, descT[c].cast(DoubleType()))
    return descT

In [56]:
match_summaryT = pivot_summary(match_summary)
miss_summaryT = pivot_summary(miss_summary)

## złączenia 

In [57]:
match_summaryT.createOrReplaceTempView("match_s")
miss_summaryT.createOrReplaceTempView("miss_s")

In [58]:
spark.sql("""
Select a.field, a.count + b.count total, a.mean - b.mean delta
from match_s a inner join miss_s b on a.field = b.field 
where a.field not in ("id_1", "id_2")
order by delta DESC, total DESC
""").show()

+------------+---------+--------------------+
|       field|    total|               delta|
+------------+---------+--------------------+
|     cmp_plz|5736289.0|  0.9563812499852176|
|cmp_lname_c2|   2464.0|  0.8064147192926266|
|      cmp_by|5748337.0|  0.7762059675300512|
|      cmp_bd|5748337.0|   0.775442311783404|
|cmp_lname_c1|5749132.0|  0.6838772482594513|
|      cmp_bm|5748337.0|  0.5109496938298685|
|cmp_fname_c1|5748125.0|  0.2854529057459947|
|cmp_fname_c2| 103698.0| 0.09104268062280174|
|     cmp_sex|5749132.0|0.032408185250332844|
+------------+---------+--------------------+



> do modelu : `cmp_plz`, `cmp_by`, `cmp_bd`, `cmp_lname_c1`, `cmp_bm`  

In [59]:
## score = suma zmiennych
zmienne = ['cmp_plz','cmp_by','cmp_bd','cmp_lname_c1','cmp_bm']
suma = " + ".join(zmienne)

In [60]:
suma

'cmp_plz + cmp_by + cmp_bd + cmp_lname_c1 + cmp_bm'

In [62]:
from pyspark.sql.functions import expr

In [63]:
scored = parsed.fillna(0, subset=zmienne)\
.withColumn('score', expr(suma))\
.select('score','is_match')

In [64]:
scored.show()

+-----+--------+
|score|is_match|
+-----+--------+
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  4.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
|  4.0|    true|
|  5.0|    true|
|  5.0|    true|
|  5.0|    true|
+-----+--------+
only showing top 20 rows



In [65]:
# ocena wartosci progowej
def crossTabs(scored, t):
    return scored.selectExpr(f"score >= {t} as above", "is_match")\
    .groupBy("above").pivot("is_match",("true","false"))\
    .count()

In [66]:
crossTabs(scored, 4.0).show()

+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20871|    637|
|false|   60|5727564|
+-----+-----+-------+



In [67]:
crossTabs(scored, 2.0).show()

+-----+-----+-------+
|above| true|  false|
+-----+-----+-------+
| true|20931| 596414|
|false| null|5131787|
+-----+-----+-------+

