<a href="https://colab.research.google.com/github/momo54/CRATE/blob/master/Spark_Structured_Streaming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#  Spark Structured Streaming
## Introduction

In this lab, we want to understand basics of SPARK STRUCTURED STREAMING.

Just consider an infinite stream of events: 
* the stream of wikipedia updates, 
* the stream of twitter messages with an hashtag, 
* the stream of measurement from a sensor network,
* The stream can be also a combination of streams coming from many sources

Streams raises issues on Volumes and Velocity: possible to have very high volume very quickly...

You want to compute virtually anything on the stream:
* you want to translate wikipedia update from a language to another
* You want to compute the top10 of people contributing to wikipedia
* You want train a model to detect vandalism on Wikipedia
* You want to analyze tweets on your company to detect positive/Negative sentiments
* You want to detect anomaly on your sensor network to raise an alert
* use-case  are endless...

The challenge is to allow such computation with high volume and high velocity.

Main actors in Data Stream Processing are:
* Kafka, https://fr.wikipedia.org/wiki/Apache_Kafka 
* Storm, https://fr.wikipedia.org/wiki/Apache_Storm
* Flume, https://flume.apache.org/
* Flink, https://fr.wikipedia.org/wiki/Apache_Flink 
* Kinesis, 

SPark propose 2 models of STREAMING, DStream and Structured Streams. Structured is now the maintained model. Structured Streaming allows Exactly one semantics ie. one event is processed *exactly once*.

This is very important to handle fault tolerance as stream processing is distributed and many fault may happen. One key challenge for stream processing is to process all events exactly once, even if some part of processing have to be restarted due to a failure.

The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended (even if the table is not fully materialized, processed events are discarded when consumed and replaced by partial aggregates for exmample)

![stream model](https://spark.apache.org/docs/latest/img/structured-streaming-stream-as-a-table.png)

The Figure bloew illustrates the Wordcount example. You receive words incrementally:
* Each time words is consumed, it is appended to the input table
* This triggers the activation of wordcount program that count how many times a word is seen (notice that, once aggregation is done, we can discard inputs in the input table)
* Next, results are printed to the output....

It is possible:
* to print all the the results at each activation. It is the "complete" output mode -> make senses for aggregation queries (group by/Count)
* to print only new elements appended in the *result* table -> nice for select queries (select(*) where (age>12))
* to print only elements that have been updated in the *result* table


![stream model](https://spark.apache.org/docs/latest/img/structured-streaming-example-model.png)




## Installation

Lets install code and data.
Please note that we will not use clustering -> only one master
We just want to see the programming model, not the performances...

In [12]:
#install software
# No clustering for this install !!
!pip install pyspark
!pip install -q findspark
import findspark
findspark.init()

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 50 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 27.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=30d66e76230f17baff2e126f5a0e9f02ab0ead6cb1d101cc00772e714b779ab2
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [11]:
#get data
!wget -O data.txt "https://raw.githubusercontent.com/apache/spark/master/docs/graphx-programming-guide.md" 

# split the data in small files to simulate bag of words arrival
! split -l 100 --additional-suffix=".txt" data.txt data-
! ls -l



--2022-10-19 19:16:16--  https://raw.githubusercontent.com/apache/spark/master/docs/graphx-programming-guide.md
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 53517 (52K) [text/plain]
Saving to: ‘data.txt’


2022-10-19 19:16:16 (4.64 MB/s) - ‘data.txt’ saved [53517/53517]

total 140
-rw-r--r-- 1 root root  6378 Oct 19 19:16 data-aa.txt
-rw-r--r-- 1 root root  5156 Oct 19 19:16 data-ab.txt
-rw-r--r-- 1 root root  4682 Oct 19 19:16 data-ac.txt
-rw-r--r-- 1 root root  4940 Oct 19 19:16 data-ad.txt
-rw-r--r-- 1 root root  5164 Oct 19 19:16 data-ae.txt
-rw-r--r-- 1 root root  5160 Oct 19 19:16 data-af.txt
-rw-r--r-- 1 root root  5081 Oct 19 19:16 data-ag.txt
-rw-r--r-- 1 root root  5029 Oct 19 19:16 data-ah.txt
-rw-r--r-- 1 root root  6234 Oct 19 19:16 data-ai.t

## programming

Lets starts writing programs...

In [13]:
# start the spark session

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

We consider the directory streaming as a source for streaming ie. each time a new file appears in this directory -> trigger word count !!


In [17]:
# create an empty streaming directory
# consider the directory "streaming" as a source of streaming
# currently it is empty...
!mkdir streaming

mkdir: cannot create directory ‘streaming’: File exists


In [18]:
# Declare the directory "Streaming" as a source
lines = spark.readStream.text("streaming")

# and write the "wordcount program"
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()


In [69]:
# stopping all active queries
# you will understand ;-)
for q in spark.streams.active:
  print("stopping",q.name)
  q.stop()
spark.sql(f"drop table wordcount")

stopping wordcount


DataFrame[]

In [70]:
# This code works... but not in the jupyter console...
#query = wordCounts \
#    .writeStream \
#    .outputMode("complete") \
#    .format("console") \
#    .start()


# So we write ouput in the "results" directory

query = wordCounts \
    .writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("wordcount") \
    .start()


#from time import sleep
#for x in range(3):
#    spark.sql("select * from wordcount order by count DESC").show(3)
#    spark.sql("select count(*) from wordcount").show()  
#    sleep(2)


# Once running drop one by one data-??.txt files in the streaming directory
# and check the result directory
# stop execution when happy...
#query.awaitTermination()

print(query.isActive)

True


In [71]:
spark.sql("select * from wordcount order by count DESC").show(3)
spark.sql("select count(*) from wordcount").show()
!cp data-aa.txt streaming
# and wait a little...
!sleep 5s

+----+-----+
|word|count|
+----+-----+
+----+-----+

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [72]:
spark.sql("select * from wordcount order by count DESC").show(3)
spark.sql("select count(*) from wordcount").show()
!cp data-ab.txt streaming
# and wait a little
!sleep 5s


+----+-----+
|word|count|
+----+-----+
+----+-----+

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [73]:
spark.sql("select * from wordcount order by count DESC").show(3)
spark.sql("select count(*) from wordcount").show()
!cp data-ac.txt streaming
# and wait a little
!sleep 5s

+----+-----+
|word|count|
+----+-----+
|    |   73|
| the|   15|
|   a|    9|
+----+-----+
only showing top 3 rows

+--------+
|count(1)|
+--------+
|     611|
+--------+



In [77]:
spark.sql("select * from wordcount order by count DESC").show(3)
spark.sql("select count(*) from wordcount").show()

+----+-----+
|word|count|
+----+-----+
|    |  269|
| the|   94|
| and|   47|
+----+-----+
only showing top 3 rows

+--------+
|count(1)|
+--------+
|     803|
+--------+



In [78]:
# you understood ??

# stopping all active queries

for q in spark.streams.active:
  print("stopping",q.name)
  q.stop()

# clear the wordcount talbe
spark.sql(f"drop table wordcount")

#clear the streaming directory
!rm streaming/*

stopping wordcount


## CSV Files Now

In [79]:
#from https://che-kulhan.medium.com/how-to-use-pyspark-streaming-with-google-colaboratory-d08ded30cabf
!wget https://raw.githubusercontent.com/lawlesst/vivo-sample-data/master/data/csv/people.csv

--2022-10-19 20:43:19--  https://raw.githubusercontent.com/lawlesst/vivo-sample-data/master/data/csv/people.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4095 (4.0K) [text/plain]
Saving to: ‘people.csv’


2022-10-19 20:43:19 (35.8 MB/s) - ‘people.csv’ saved [4095/4095]



In [80]:
!split -l 10 people.csv people-

In [81]:
from pyspark.sql.types import *
schema = StructType([
    StructField("person_ID",IntegerType(),True), 
    StructField("last", StringType(), True),
    StructField("middle", StringType(), True),
    StructField("email", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("fax", StringType(), True),
    StructField("title", StringType(), True)])

In [91]:

!mkdir people
people_df = spark.readStream.format("csv")\
   .schema(schema) \
   .load("people")

print(people_df.isStreaming)

True


In [92]:
for q in spark.streams.active:
  print("stopping",q.name)
  q.stop()


In [93]:
results_df = people_df.select("*")
query2 = results_df.writeStream \
  .format("json") \
  .queryName("selectTable") \
  .option("checkpointLocation", "checkpoint") \
  .option("path", "results") \
  .outputMode("append") \
  .start() 

query2.awaitTermination()


StreamingQueryException: ignored

### Streaming Wikipedia events

Currently, Spark supports three kinds of streaming connection out of the box:

FROM:
https://github.com/donaghhorgan/COMP9033/blob/master/lectures/12%20-%20Streaming%20Data%20Analysis.pdf 

1. Connect to the Wikipedia RecentChanges stream using SSEClient.
2. Create a local socket connection on port 50000.
3. When a client (e.g. Spark) connects to the local socket, relay the next available event to it from the event stream.

In [None]:
!pip install sseclient
from pyspark.streaming import StreamingContext
from sseclient import SSEClient
import threading

In [None]:
events = SSEClient('https://stream.wikimedia.org/v2/stream/recentchange')
for e in events:
  print(e)

In [None]:
def relay():
    events = SSEClient('https://stream.wikimedia.org/v2/stream/recentchange')
    
    s = socket.socket()
    s.bind(('localhost', 50000))
    s.listen(1)
    while True:
        try:
            client, address = s.accept()
            for event in events:
                if event.event == 'message':
                    client.sendall(event.data)
                    break
        except:
            pass
        finally:
            client.close()
    

threading.Thread(target=relay).start()
sse=SSEClient('https://stream.wikimedia.org/v2/stream/recentchange')
sse.next

## Streaming analysis

Now that we have our stream relay set up, we can start to analyse its contents. First, let's initialise a [`SparkContext`](https://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.SparkContext) object, which will represent our connection to the Spark cluster. To do this, we must first specify the URL of the master node to connect to. As we're only running this notebook for demonstration purposes, we can just run the cluster locally, as follows:

In [None]:
import sys

out=sys.stdout

def print_row(row):
    print(row)
    out.write(row)

class RowPrinter:
    def open(self, partition_id, epoch_id):
        print("Opened %d, %d" % (partition_id, epoch_id))
        return True
    def process(self, row):
        out.write(row)
        print(row,"\n")
    def close(self, error):
        print("Closed with error: %s" % str(error))

## seems to work but nothing in console...
## confirmed -> console not redirected !!
results_df = people_df.select("*")
query = results_df.writeStream \
  .foreach(print_row) \
  .option("truncate","false") \
  .start()

query.awaitTermination()

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/pyspark/serializers.py", line 458, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 102, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/usr/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/python3.7/pickle.py", line 789, in save_tuple
    save(element)
  File "/usr/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/lib/python3.7/dist-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 784, in save_function
    *self._dynamic_function_reduce(obj), obj=obj
  Fi

PicklingError: ignored

In [None]:
## seems to work but nothing in console...
## confirmed -> console not redirected !!
#results_df = people_df.count()
query = people_df.select("*").groupby("last").count().writeStream \
  .format("console") \
  .outputMode("complete") \
  .start()

query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.7/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored

In [None]:
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 50000) \
    .load()
lines.writeStream.outputMode("append").format("console").start()

<pyspark.sql.streaming.StreamingQuery at 0x7f42b31d20d0>

Next, we create a [`StreamingContext`](https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext) object, which represents the streaming functionality of our Spark cluster. When we create the context, we must specify a batch duration time (in seconds), to tell Spark how often it should process data from the stream. Let's process the Wikipedia data in batches of one second:

In [None]:
ssc = StreamingContext(sc, 1)

Using our `StreamingContext` object, we can create a data stream from our local TCP relay socket with the [`socketTextStream`](https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext.socketTextStream) method:

In [None]:
stream = ssc.socketTextStream('localhost', 50000)

Even though we've created a data stream, nothing happens! Before Spark starts to consume the stream, we must first define one or more operations to perform on it. Let's count the number of edits made by different users in the last minute:

In [None]:
users = (
    stream.map(json.loads)                   # Parse the stream data as JSON
          .map(lambda obj: obj['user'])      # Extract the values corresponding to the 'user' key
          .map(lambda user: (user, 1))       # Give each user a count of one
          .window(60)                        # Create a sliding window, sixty seconds in length
          .reduceByKey(lambda a, b: a + b)   # Reduce all key-value pairs in the window by adding values
          .transform(                        # Sort by the largest count
              lambda rdd: rdd.sortBy(lambda kv: kv[1], ascending=False))
          .pprint()                          # Print the results
)

Again, nothing happens! This is because the `StreamingContext` must be started before the stream is processed by Spark. We can start data streaming using the [`start`](https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext.start) method of the `StreamingContext` and stop it using the [`stop`](https://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext.stop) method. Let's run the stream for two minutes (120 seconds) and then stop:

In [None]:
ssc.start()

time.sleep(120)

ssc.stop()

As can be seen, Spark counts the number of edits made by each user in the past sixty seconds and emits updates once per second (the original batch duration of the `StreamingContext`).