In [2]:
# na te zajęcia potrzebować będziemy wersji lokalnej SPARKA (może być docker !)
# git clone 
# docker build -t spark .
# docker run -p 8888:8888 spark


from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread
# and batch interval of 1 second
spark = SparkSession\
        .builder\
        .master("local[2]") \
        .appName("Stream_Socket")\
        .getOrCreate()


sc = spark.sparkContext

In [3]:
# biblioteka re - regular expression 
import re
# =================
# RDD - czyli podstawowy obiekt do obsługi danych dla SPARKA
rdd = sc.parallelize([1,2,3])
rdd.take(2)

[1, 2]

In [4]:
# ===================
# Prosty Word Count 
# r'[a-z'] - oznacza jeden znak dla liter od a do z (male) + apostrof
# []+ - oznacza, że może pojawić się jeden lub więcej znaków
# pamiętajcie, że przetwarzamy x.lower()
# Licznik ten jest wygenerowany na podstawie map-reduce
# ===================

sc.textFile("RDD_input") \
.map(lambda x: re.findall(r"[a-z']+", x.lower())) \
.flatMap(lambda x: [(y, 1) for y in x]) \
.reduceByKey(lambda x,y: x + y) \
.collect()

[('is', 12),
 ('engine', 1),
 ('compatible', 1),
 ('hadoop', 3),
 ('run', 1),
 ('in', 3),
 ('clusters', 1),
 ('yarn', 1),
 ("spark's", 1),
 ('mode', 1),
 ('process', 1),
 ('hdfs', 1),
 ('cassandra', 1),
 ('hive', 1),
 ('designed', 1),
 ('perform', 1),
 ('both', 1),
 ('similar', 1),
 ('new', 1),
 ('like', 1),
 ('streaming', 1),
 ('machine', 1),
 ('learning', 1),
 ('than', 8),
 ('implicit', 1),
 ('simple', 1),
 ('complicated', 1),
 ('flat', 1),
 ('nested', 1),
 ('sparse', 1),
 ('readability', 1),
 ('counts', 1),
 ('cases', 1),
 ("aren't", 1),
 ('rules', 1),
 ('although', 3),
 ('errors', 1),
 ('never', 3),
 ('pass', 1),
 ('explicitly', 1),
 ('silenced', 1),
 ('of', 2),
 ('refuse', 1),
 ('temptation', 1),
 ('guess', 1),
 ('there', 1),
 ('only', 1),
 ('way', 2),
 ('do', 2),
 ('may', 2),
 ('at', 1),
 ("you're", 1),
 ('dutch', 1),
 ('now', 2),
 ('right', 1),
 ('idea', 3),
 ('good', 1),
 ('are', 1),
 ('honking', 1),
 ("let's", 1),
 ('more', 1),
 ('spark', 1),
 ('a', 3),
 ('fast', 1),
 ('and', 

## SPARK STREAMING

Część Sparka odpowiedzialna za przetwarzanie danych w czasie rzeczywistym. 


<img src="https://spark.apache.org/docs/latest/img/streaming-arch.png"/>

Dane mogą pochodzić z różnych źródeł np. sokety TCP, Kafka, etc. 
Korzystając z poznanych już metod `map, reduce, join, oraz window` można w łatwy sposób generować przetwarzanie strumienia tak jaby był to nieskończony ciąg RDD. 
Ponadto nie ma problemu aby wywołać na strumieniu operacje ML czy wykresy. 

Cała procedura przedstawia się następująco: 

<img src="https://spark.apache.org/docs/latest/img/streaming-flow.png"/>

SPARK STREAMING w tej wersji wprowadza abstrakcje zwaną `discretized stream` *DStream* (reprezentuje sekwencję RDD).

Operacje na DStream można wykonywać w API JAVA, SCALA, Python, R (nie wszystkie możliwości są dostępne dla Pythona). 

## Spark Streaming potrzebuje minium 2 rdzenie !

----
- **StreamingContext(sparkContext, batchDuration)** - reprezentuje połączenie z klastrem i służy do tworzenia DStreamów, `batchDuration` wskazuje na granularność batch'y (w sekundach)
- **socketTextStream(hostname, port)** - tworzy DStream na podstawie danych napływających ze wskazanego źródła TCP
- **flatMap(f), map(f), reduceByKey(f)** - działają analogicznie jak w przypadku RDD z tym że tworzą nowe DStream'y
- **pprint(n)** - printuje pierwsze `n` (domyślnie 10) elementów z każdego RDD wygenerowanego w DStream'ie
- **StreamingContext.start()** - rozpoczyna działania na strumieniach
- **StreamingContext.awaitTermination(timeout)** - oczekuje na zakończenie działań na strumieniach
- **StreamingContext.stop(stopSparkContext, stopGraceFully)** - kończy działania na strumieniach

Obiekt StreamingContext można wygenerować za pomocą obiektu SparkContext.

In [6]:
ssc = StreamingContext(sc, 5) # ustawiłem na 5 sekund aby był czas na wpisywanie

Po wygenerowaniu obiektu `ssc` musisz wskazać źródło i utworzyć na jego podstawie DStream. Określić wszystkie transformacje. Uruchomić metodę `start()`, która powoduje nasłuchiwanie. Włączyć oczekiwanie na zakończenie procesu `awaitTermination()` bądź zatrzymać nasłuch ręcznie `stop()`. 

- po rozpoczęciu nasłuchu nie można już ustawić nowych przekształceń !
- po zatrzymaniu nie można zrestartować
- tylko jeden StreamingContext aktywny na JVM 

In [7]:
# DStream - dane pobierane z socketu TCP - na porcie 9999

lines = ssc.socketTextStream("localhost", 9999)

In [8]:
words = lines.flatMap(lambda x: re.findall(r"[a-z']+", x.lower()))
wordCounts = words.map(lambda word: (word,1)).reduceByKey(lambda x,y: x+y)
wordCounts.pprint()

In [9]:
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate
ssc.stop(True,True)

-------------------------------------------
Time: 2021-06-03 10:01:35
-------------------------------------------

-------------------------------------------
Time: 2021-06-03 10:01:40
-------------------------------------------

-------------------------------------------
Time: 2021-06-03 10:01:45
-------------------------------------------



KeyboardInterrupt: 

In [11]:
# pierwsza wersja nadawania na TCP 9999 (unix)
# w konsoli linuxowej netcat Nmap for windows
!Nmap -lk 9999

# wpisujesz tekst
# jeśli się nie uda to generuj plik poniżej (Możecie też zmienić plik źródłowy)
# zweryfikujcie żeby ścieżka była odpowiednia

/usr/bin/sh: 1: Nmap: not found
-------------------------------------------
Time: 2021-06-03 10:16:30
-------------------------------------------

-------------------------------------------
Time: 2021-06-03 10:16:35
-------------------------------------------

-------------------------------------------
Time: 2021-06-03 10:16:40
-------------------------------------------

-------------------------------------------
Time: 2021-06-03 10:16:45
-------------------------------------------

-------------------------------------------
Time: 2021-06-03 10:16:50
-------------------------------------------

-------------------------------------------
Time: 2021-06-03 10:16:55
-------------------------------------------

-------------------------------------------
Time: 2021-06-03 10:17:00
-------------------------------------------

-------------------------------------------
Time: 2021-06-03 10:17:05
-------------------------------------------

-------------------------------------------
Time

In [None]:
%%file stream.py

from socket import *
import time

rdd = list()
with open("RDD_input", 'r') as ad:
    for line in ad:
        rdd.append(line)

HOST = 'localhost'
PORT = 9999
ADDR = (HOST, PORT)
tcpSock = socket(AF_INET, SOCK_STREAM)
tcpSock.bind(ADDR)
tcpSock.listen(5)


while True:
    c, addr = tcpSock.accept()
    print('got connection')
    for line in rdd:
        try:
            c.send(line.encode())
            time.sleep(1)
        except:
            break
    c.close()
    print('disconnected')

In [None]:
# w osobnej konsoli (możesz uruchomić ją w jupyter notebook)
! python stream.py 

<img src="https://spark.apache.org/docs/latest/img/streaming-dstream.png"/>

<img src="https://spark.apache.org/docs/latest/img/streaming-dstream-ops.png"/>

In [None]:
## dane do strumienia można wygenerować też jako listę i wykorzystać tzw kolejkę 
## kolejka to tak jak w sklepie obsługujemy pierwszego (ostatni dochodzi na koniec kolejki)
## po obsłużeniu wypada pierwszy i kolejka się zmniejsza

In [None]:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

spark = SparkSession\
        .builder\
        .master("local[2]") \ # TUTAJ WAŻNE ABY BYŁY PRZYNAJMNIEJ DWA PROCESORY
        .appName("Stream_Kolejka")\
        .getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 1) # ustawiłem na 5 sekund aby był czas na wpisywanie

# 10 pakietów po 1000 liczb
rddQueue = []
for i in range(10):
        rddQueue += [sc.parallelize(
            [j for j in range(1, 1001)], 10)]

In [None]:
inputStream = ssc.queueStream(rddQueue)

mappedStream = inputStream.map(lambda x: (x % 10, 1))
reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)

reducedStream.pprint()

In [None]:
# tym razem trzeba trochę inaczej uruchomić i wprowadzić czas między
import time
ssc.start()
time.sleep(10)
ssc.stop(stopSparkContext=True, stopGraceFully=True)

## Stateful Wordcount 

Operacja `updateStateByKey` pozwala łączyć ze sobą wyniki otrzymywane na poszczególbych DStreamach. Dzięki tej operacji możesz w sposób ciągły uzupełniać informacje !

Aby Spark Streaming mógł łączyć dane z wielu batchy (stateful transformations) konieczne jest wskazanie lokalizacji gdzie zapisywane będą checkpointy.

1. Zdefiniuj stan podstawowy
2. wskaż funkcję łączącą 

----
- **checkpoint(directory)** - wskazuje gdzie zapisywane będą checkpointy z operacji na DStream'ach
- **updateStateByKey(updateFunc)** - zwraca nowy DStream zawierający informację o bieżącym stanie poszczególnych kluczy, stan każdego klucza odświeżany jest przy pomocy `updateFunc`

In [None]:
def updateFunc(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)

In [None]:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

spark = SparkSession\
        .builder\
        .master("local[2]") \ # TUTAJ WAŻNE ABY BYŁY PRZYNAJMNIEJ DWA PROCESORY
        .appName("Stream_stateful")\
        .getOrCreate()

sc = spark.sparkContext
ssc = StreamingContext(sc, 1)

In [None]:
# tutaj zdecydujcie czy puszczacie z pliku czy z konsoli (z konsoli dobrze widac jak dziala)
lines = ssc.socketTextStream("localhost", 9999)

words = lines.flatMap(lambda x: re.findall(r"[a-z']+", x.lower()))
pairs = words.map(lambda word: (word, 1))

runningCounts = pairs.updateStateByKey(updateFunc)

runningCounts.pprint()

In [None]:
ssc.start()
ssc.awaitTermination()

ssc.stop(True,True)

## Redukcja w oknach 

----
- **reduceByKeyAndWindow(func, invFunc, windowDuration, slideDuration)** - zwraca nowy DStream powstały w wyniku stosowania przyrostowo reduceByKey wewnątrz zdefiniowanego okna. Zredukowane wartości dla nowego okna obliczane są z wykorzystaniem wartości starego okna poprzez: 
1. zredukowanie (dodanie) nowych wartości, 
2. "odwrotne zredukowanie" (odjęcie) wartości które opuściły już okno

In [None]:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

spark = SparkSession\
        .builder\
        .master("local[2]") \ # TUTAJ WAŻNE ABY BYŁY PRZYNAJMNIEJ DWA PROCESORY
        .appName("Stream_windows")\
        .getOrCreate()

sc = spark.sparkContext
ssc = StreamingContext(sc, 2)
ssc.checkpoint("tmp")

In [None]:
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda x: re.findall(r"[a-z']+", x.lower()))
pairs = words.map(lambda word: (word,1))
# window length - długość trwania okna
# sliding interval - czas w którym wykonywana jest funkcja okna 
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)

windowedWordCounts.pprint()

In [None]:
ssc.start()
ssc.awaitTermination()
ssc.stop(True,True)

### Stream do DataFrame

lines = DataFrame reprezentujący nieograniczoną tabelę zawierającą dane strumieniowe. 
Zawiera ona jedną kolumnę o nazwie `value`. Każda nowa linia to wiersz w tabeli. 

<img src="https://spark.apache.org/docs/latest/img/structured-streaming-stream-as-a-table.png"/>

## SPARK Structured Streaming

----
- **SparkSession.readStream.format(source).option(key, value).load()** - tworzy strumieniowy DataFrame
- **DataFrame.writeStream.outputMode(mode).format(source).option(key, value).start()** - wysyła dane ze strumieniowego DataFrame'u "na zewnątrz"; `complete` mode - outputem jest cała zaktualizowana tabela, `append` mode - outputem są jedynie nowe wiersze, `update` mode - outputem są jedynie zaktualizowane wiersze

In [None]:
%%file spark2.py

from pyspark.sql import SparkSession
import pyspark.sql.functions as f

if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .master("local[2]") \
        .appName("Stream_DF")\
        .getOrCreate()
    print("="*50)
    print("Zaczynamy DataFrame")
    print("="*50)
    spark.sparkContext.setLogLevel("ERROR")
    lines = spark.readStream\
        .format("socket")\
        .option("host", "localhost")\
        .option("port", 9999)\
        .load()
    words = lines.select(f.explode(f.split(lines.value, " ")).alias("word"))
    wordCounts = words.groupBy("word").count()
    query = wordCounts.writeStream.outputMode("complete").format("console").start()
    query.awaitTermination()
    query.stop()

In [None]:
! spark-submit spark2.py