<a href="https://colab.research.google.com/github/jalorenzo/SparkNotebookColab/blob/master/BDF_09_Spark_Streaming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#00 - Configuration of Apache Spark on Collaboratory


###Installing Java, Spark, and Findspark


---


This code installs Apache Spark 3.0.1, Java 8, and [Findspark](https://github.com/minrk/findspark), a library that makes it easy for Python to find Spark.

In [None]:
import os

os.environ["SPARK_VERSION"] = "spark-3.5.0"
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget  http://apache.osuosl.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!echo $SPARK_VERSION-bin-hadoop3.tgz
!rm $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

### Set Environment Variables
Set the locations where Spark and Java are installed.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark/"
os.environ["DRIVE_DATA"] = "/content/gdrive/My Drive/Enseignement/2023-2024/ING3/HPDA/BigDataFrameworks/data/"

!rm /content/spark
!ln -s /content/$SPARK_VERSION-bin-hadoop3 /content/spark
!export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
!echo $SPARK_HOME
!env |grep  "DRIVE_DATA"

### Start a SparkSession
This will start a local Spark session.

In [None]:
!python -V

import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

# Example: shows the PySpark version
print("PySpark version {0}".format(sc.version))

# Example: parallelise an array and show the 2 first elements
sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)

In [None]:
from pyspark.sql import SparkSession
# We create a SparkSession object (or we retrieve it if it is already created)
spark = SparkSession \
.builder \
.appName("My application") \
.config("spark.some.config.option", "some-value") \
.master("local[4]") \
.getOrCreate()
# We get the SparkContext
sc = spark.sparkContext

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/gdrive')


---


# 09 - Spark Streaming

-   Scalable, *high-throughput*, fault-tolerant streaming processing

<img src="https://4.bp.blogspot.com/-HJ8x45gN3kE/WNCULL6J6eI/AAAAAAAAAZ0/1LfYt5IwE3sINSxWunqTrbqyrm7irZTCwCEw/s1600/spark.JPG" alt="Spark Streaming flow" style="width: 900px;"/>

-   Input from multiple sources: Kafka, Flume, Twitter, ZeroMQ, Kinesis or sockets TCP


## Spark Streaming APIs

- DStream API
      - Original API, based on RDDs
- Structured Streaming
      - Available from version 2.2, based on DataFrames


Spark Streaming page: <https://spark.apache.org/streaming/>
Main documentation (last version): <https://spark.apache.org/docs/latest/streaming-programming-guide.html>


## DStream API

Main abstraction: DStream (`discretized stream`).

-   Represents a continuous data stream

![dstreams](http://persoal.citius.usc.es/tf.pena/TCDM/figs/dstreams.png)

*Micro-batch* architecture

-   Received data are grouped into batches
-   Batches are created at regular intervals (batch interval)
-   Every batch has the form of an RDD, which can be processed by Spark
-   In addition, stateful transformations can be performed by
    -   Window operations
    -   Tracking of each key state

### DStream example: stateless online WordCount example

To run the example:

   1. In a terminal, access the Docker container with `docker exec -ti container_id /bin/bash` (run `docker ps` to know the container_id)
   2. Once in the container's terminal, use netcat as a server in the port 9999

    `$ nc -lk 9999`

   2. Run here the PySpark code that you will find below

   3. Write text lines in netcat's terminal. They will be picked up and processed by the script
    - Write repeated words, to make sure they are counted right

In [None]:
!sudo apt-get update && sudo apt-get install netcat

In [None]:
!nohup nc -lk 9999 &

In [None]:
from pyspark.streaming import StreamingContext
from operator import add

sc.setLogLevel("WARN")

# Streaming context with a batch interval of 5 sec.
ssc = StreamingContext(sc, 5)

# DStream that connects to localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

# Run a WordCount
counts = lines.flatMap(lambda line: line.split(" "))\
              .map(lambda word: (word, 1))\
              .reduceByKey(add)

counts.pprint()

ssc.start() # Start the computation
ssc.awaitTerminationOrTimeout(60) # Wait for it to finish (stops in 60 seconds)
ssc.stop()

### DStream example: stateful online WordCount example

Repeat the previous steps running the following code

 - Check that the number of words is accumulated between accesses

In [None]:
from pyspark.streaming import StreamingContext
from operator import add

sc.setLogLevel("WARN")

# Streaming context with a batch interval of 5 sec.
ssc = StreamingContext(sc, 5)

# DStream that connects to localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

ssc.checkpoint("/tmp/cpdir") # Enables checkpoint

def updateFunc(new_values, last_sum):
    return sum(new_values) + (last_sum or 0)

counts = lines.flatMap(lambda line: line.split(" "))\
              .map(lambda word: (word, 1))\
              .updateStateByKey(updateFunc)

counts.pprint()

ssc.start() # Start the computation
ssc.awaitTerminationOrTimeout(60) # Wait for it to finish (stops in 60 seconds)

## Structured Streaming

Utilises the structured API (DataFrames, DataSets and SQL)

- As they arrive to the system, it reads data, processes them and adds them to a DataFrame

Input sources:

- [Apache Kafka](https://kafka.apache.org/)
- Files (by continuously reading files from a directory
- Sockets

Sinks (data destination):

- Apache Kafka
- Files
- Other computations
- Memory (for debugging and testing)

Still under development

### Example: Process files in the  $DRIVE_DATA/by-day/ directory

In [None]:
!ls "$DRIVE_DATA/by-day"

In [None]:
# Check the format of a file
!head "$DRIVE_DATA"/by-day/2010-12-01.csv

In [None]:
# Create a DataFrame containing the data from one of the files
dfStatic = spark.read\
                  .format("csv")\
                  .option("header", "true")\
                  .option("inferSchema", "true")\
                  .load(os.environ["DRIVE_DATA"]+"/by-day/2010-12-01.csv")
dfStatic.printSchema()

In [None]:
# Obtain a DataFrame containing the purchases per hour and per client throughout that day
from pyspark.sql.functions import window, col, desc
purchasePerClientPerHourStatic =\
             dfStatic.select(
                                col("CustomerId"),
                                (col("UnitPrice")*col("Quantity")).alias("total_cost"),
                                col("InvoiceDate"))\
                       .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 hour"))\
                       .sum("total_cost")

purchasePerClientPerHourStatic.show(15, False)

In [None]:
# Because of the shuffling, the number of partitions is quite large
print(purchasePerClientPerHourStatic.rdd.getNumPartitions())

In [None]:
# Change the number of partitions to use and create the DataFrame again
spark.conf.set("spark.sql.shuffle.partitions", "4")
purchasePerClientPerHourStatic =\
                            dfStatic.select(
                                col("CustomerId"),
                                (col("UnitPrice")*col("Quantity")).alias("total_cost"),
                                col("InvoiceDate"))\
                            .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 hour"))\
                            .sum("total_cost")

In [None]:
print(purchasePerClientPerHourStatic.rdd.getNumPartitions())

In [None]:
# Define a DataFrame in Streaming that takes as a data source
# the files in the $DRIVE_DATA/by-day/ directory.
# Set it to read 1 file each time it is triggered
dfStreaming = spark.readStream\
                   .schema(dfStatic.schema)\
                   .option("maxFilesPerTrigger", 1)\
                   .format("csv")\
                   .option("header", "true")\
                   .load(os.environ["DRIVE_DATA"]+"/by-day/*.csv")

In [None]:
# From the previous Streaming DataFrame, we obtain the purchases per hour and per client
purchasePerClientPerHourStreaming = \
            dfStreaming.select(
                               col("CustomerId"),
                              (col("UnitPrice")*col("Quantity")).alias("total_cost"),
                               col("InvoiceDate"))\
                       .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 hour"))\
                       .sum("total_cost")

In [None]:
# Create a DataStreamWriter object to write the values of the previous DataFrame
# Values are written to a in-memory table
# The writing mode is "complete": the whole output is overwritten
# Data can be accessed using the purchases_per_hour table
# Data from the input are read every second
lookupPurchases = purchasePerClientPerHourStreaming\
                    .writeStream\
                    .format("memory")\
                    .queryName("purchases_per_hour")\
                    .outputMode("complete")\
                    .trigger(processingTime='1 seconds')
print(type(lookupPurchases))

In [None]:
# Methods defined for a DataStreamWriter
[method_name for method_name in dir(lookupPurchases)
 if callable(getattr(lookupPurchases, method_name))]

In [None]:
# Start access to the input data
lookupPurchases.start()

In [None]:
# Start showing each second the content of the table
from time import sleep
for x in range(20):
    spark.sql("""
            SELECT *
            FROM purchases_per_hour
            ORDER BY `sum(total_cost)` DESC
            """).show(15, truncate=False)
    sleep(1)