In [None]:
import findspark
findspark.init('/Users/air/Desktop/spark')

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder\
        .appName("new")\
        .getOrCreate()
# otrzymanie obiektu SparkContext
sc = spark.sparkContext

In [None]:
sc

In [None]:
import re
# Word Count on RDD 
sc.textFile("RDD_input") \
.map(lambda x: re.findall(r"[a-z']+", x.lower())) \
.flatMap(lambda x: [(y, 1) for y in x]) \
.reduceByKey(lambda x,y: x + y) \
.collect()

## SPARK STREAMING

Część Sparka odpowiedzialna za przetwarzanie danych w czasie rzeczywistym. 


<img src="https://spark.apache.org/docs/latest/img/streaming-arch.png"/>

Dane mogą pochodzić z różnych źródeł np. sokety TCP, Kafka, etc. 
Korzystając z poznanych już metod `map, reduce, join, oraz window` można w łatwy sposób generować przetwarzanie strumienia tak jaby był to nieskończony ciąg RDD. 
Ponadto nie ma problemu aby wywołać na strumieniu operacje ML czy wykresy. 

Cała procedura przedstawia się następująco: 

<img src="https://spark.apache.org/docs/latest/img/streaming-flow.png"/>

SPARK STREAMING w tej wersji wprowadza abstrakcje zwaną `discretized stream` *DStream* (reprezentuje sekwencję RDD).

Operacje na DStream można wykonywać w API JAVA, SCALA, Python, R (nie wszystkie możliwości są dostępne dla Pythona). 


Spark Streaming potrzebuje minium 2 rdzenie.

----
- **StreamingContext(sparkContext, batchDuration)** - reprezentuje połączenie z klastrem i służy do tworzenia DStreamów, `batchDuration` wskazuje na granularność batch'y (w sekundach)
- **socketTextStream(hostname, port)** - tworzy DStream na podstawie danych napływających ze wskazanego źródła TCP
- **flatMap(f), map(f), reduceByKey(f)** - działają analogicznie jak w przypadku RDD z tym że tworzą nowe DStream'y
- **pprint(n)** - printuje pierwsze `n` (domyślnie 10) elementów z każdego RDD wygenerowanego w DStream'ie
- **StreamingContext.start()** - rozpoczyna działania na strumieniach
- **StreamingContext.awaitTermination(timeout)** - oczekuje na zakończenie działań na strumieniach
- **StreamingContext.stop(stopSparkContext, stopGraceFully)** - kończy działania na strumieniach

Obiekt StreamingContext można wygenerować za pomocą obiektu SparkContext.

In [None]:
import findspark
findspark.init('/Users/air/Desktop/spark')

import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread
# and batch interval of 1 second

sc = SparkContext("local[2]", "NetworkWordCount")

ssc = StreamingContext(sc, 5)

Po wygenerowaniu obiektu `ssc` musisz wskazaź źródło i utworzyć na jego podstawie DStream. Określić wszystkie transformacje. Uruchomić metodę `start()`, która powoduje nasłuchiwanie. Włączyć oczekiwanie na zakończenie procesu `awaitTermination()` bądź zatrzymać nasłuch ręcznie `stop()`. 

- po rozpoczęciu nasłuchu nie można już ustawić nowych przekształceń !
- po zatrzymaniu nie można zrestartować
- tylko jeden StreamingContext aktywny na JVM 

In [None]:
# DStream
lines = ssc.socketTextStream("localhost", 9999)
# podziel każdą linię na wyrazy
# DStream jest mapowany na kolejny DStream
# words = lines.flatMap(lambda line: line.split(" "))

words = lines.flatMap(lambda x: re.findall(r"[a-z']+", x.lower()))

# zliczmy każdy wyraz w każdym batchu
# DStream jest mapowany na kolejny DStream
# pairs = words.map(lambda word: (word, 1))

# DStream jest mapowany na kolejny DStream                  
# wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts = words.map(lambda word: (word,1)).reduceByKey(lambda x,y: x+y)
# wydrukuj pierwszy elemnet
wordCounts.pprint()

In [None]:
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate
ssc.stop(True,True)

In [None]:
# w konsoli linuxowej netcat Nmap for windows
!nc -lk 9999

In [None]:
import findspark
findspark.init('/Users/air/Desktop/spark')

import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread
# and batch interval of 1 second

sc = SparkContext("local[2]", "NetworkWordCount2")
ssc = StreamingContext(sc, 2)

In [None]:
%%file stream.py

from socket import *
import time

rdd = list()
with open("RDD_input", 'r') as ad:
    for line in ad:
        rdd.append(line)

HOST = 'localhost'
PORT = 9999
ADDR = (HOST, PORT)
tcpSock = socket(AF_INET, SOCK_STREAM)
tcpSock.bind(ADDR)
tcpSock.listen(5)


while True:
    c, addr = tcpSock.accept()
    print('got connection')
    for line in rdd:
        try:
            c.send(line.encode())
            time.sleep(1)
        except:
            break
    c.close()
    print('disconnected')


In [None]:
# uruchom w konsoli 
!python stream.py

In [None]:
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda x: re.findall(r"[a-z']+", x.lower()))
wordCounts = words.map(lambda word: (word,1)).reduceByKey(lambda x,y: x+y)
# wydrukuj pierwszy elemnet
wordCounts.pprint()

In [None]:
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate
ssc.stop(True,True)

<img src="https://spark.apache.org/docs/latest/img/streaming-dstream.png"/>

<img src="https://spark.apache.org/docs/latest/img/streaming-dstream-ops.png"/>

W celach ćwiczeniowych i testowania przetwarzania strumieniowego warto wykorzystać Obiekt Kolejki wygenerowany z obiektów RDD.
Każdy RDD wepchany (pushed) do kolejki traktowany jest jako batch w DStream i przetwarzany jest jako strumień. 

In [None]:
import findspark
findspark.init('/Users/air/Desktop/spark')

import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread
# and batch interval of 1 second

sc = SparkContext("local[2]", "NetworkWordCount3")
ssc = StreamingContext(sc, 2)

In [None]:
[j for j in range(1, 1001)]

In [None]:
rddQueue = []
for i in range(10):
        rddQueue += [sc.parallelize(
            [j for j in range(1, 1001)], 10)]


In [None]:
rddQueue

In [None]:
inputStream = ssc.queueStream(rddQueue)

mappedStream = inputStream.map(lambda x: (x % 10, 1))
reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)

reducedStream.pprint()
 

In [None]:
# run  
import time
ssc.start()
time.sleep(10)
ssc.stop(stopSparkContext=True, stopGraceFully=True)

## Stateful Wordcount 

Operacja `updateStateByKey` pozwala łączyć ze sobą wyniki otrzymywane na poszczególbych DStreamach. Dzięki tej operacji możesz w sposób ciągły uzupełniać informacje !

Aby Spark Streaming mógł łączyć dane z wielu batchy (stateful transformations) konieczne jest wskazanie lokalizacji gdzie zapisywane będą checkpointy.

1. Zdefiniuj stan podstawowy
2. wskaż funkcję łączącą 

----
- **checkpoint(directory)** - wskazuje gdzie zapisywane będą checkpointy z operacji na DStream'ach
- **updateStateByKey(updateFunc)** - zwraca nowy DStream zawierający informację o bieżącym stanie poszczególnych kluczy, stan każdego klucza odświeżany jest przy pomocy `updateFunc`

In [None]:
def updateFunc(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)

In [None]:
import findspark
findspark.init('/Users/air/Desktop/spark')

import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread
# and batch interval of 1 second

sc = SparkContext("local[2]", "NetworkWC3")
ssc = StreamingContext(sc, 5)
ssc.checkpoint("tmp")

In [None]:
lines = ssc.socketTextStream("localhost", 9999)

words = lines.flatMap(lambda x: re.findall(r"[a-z']+", x.lower()))
pairs = words.map(lambda word: (word, 1))

runningCounts = pairs.updateStateByKey(updateFunc)

runningCounts.pprint()

In [None]:
ssc.start()
ssc.awaitTermination()

ssc.stop(True,True)

> Zadanie - Korzystając z danych kolejki rddQueue dodaj wszystkie elementy do siebie 

## Redukcja w oknach 

----
- **reduceByKeyAndWindow(func, invFunc, windowDuration, slideDuration)** - zwraca nowy DStream powstały w wyniku stosowania przyrostowo reduceByKey wewnątrz zdefiniowanego okna. Zredukowane wartości dla nowego okna obliczane są z wykorzystaniem wartości starego okna poprzez: 
1. zredukowanie (dodanie) nowych wartości, 
2. "odwrotne zredukowanie" (odjęcie) wartości które opuściły już okno

In [1]:
import findspark
findspark.init('/Users/air/Desktop/spark')

import re
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "NetworkWC4")
ssc = StreamingContext(sc, 2)
ssc.checkpoint("tmp")

In [2]:
lines = ssc.socketTextStream("localhost", 9999)

In [3]:
words = lines.flatMap(lambda x: re.findall(r"[a-z']+", x.lower()))
pairs = words.map(lambda word: (word,1))

In [4]:
# window length - długość trwania okna
# sliding interval - czas w którym wykonywana jest funkcja okna  

windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)

windowedWordCounts.pprint()

In [5]:
ssc.start()
ssc.awaitTermination()
ssc.stop(True,True)

-------------------------------------------
Time: 2021-05-11 20:18:10
-------------------------------------------

-------------------------------------------
Time: 2021-05-11 20:18:20
-------------------------------------------

-------------------------------------------
Time: 2021-05-11 20:18:30
-------------------------------------------
('is', 5)
('engine', 1)
('compatible', 1)
('hadoop', 3)
('run', 1)
('in', 2)
('clusters', 1)
('yarn', 1)
("spark's", 1)
('mode', 1)
...

-------------------------------------------
Time: 2021-05-11 20:18:40
-------------------------------------------
('is', 8)
('engine', 1)
('compatible', 1)
('hadoop', 3)
('run', 1)
('in', 3)
('clusters', 1)
('yarn', 1)
("spark's", 1)
('mode', 1)
...

-------------------------------------------
Time: 2021-05-11 20:18:50
-------------------------------------------
('is', 13)
('engine', 2)
('compatible', 2)
('hadoop', 5)
('run', 2)
('in', 4)
('clusters', 2)
('yarn', 2)
("spark's", 2)
('mode', 2)
...

----------------

KeyboardInterrupt: 