# Exercises

In this notebook, we go through a simple example of how to use Spark Streaming with Kafka.

In [1]:
from time import sleep

import pyspark
from IPython.display import clear_output, display
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.streaming import StreamingQuery
from pyspark.sql.types import DateType, IntegerType, StringType, StructType

1. We create a SparkSession in a local setup. We add the required libraries and check if we can access the Spark Context.

In [2]:
# Create Spark Session
spark = (
    SparkSession.builder.master("local[1]")
    .appName("Tutorial App")
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1")
    .getOrCreate()
)

# Get Spark Context
print(spark.sparkContext)

<SparkContext master=local[1] appName=Tutorial App>


2. We can add the following messages in the console window with the Kafka producer. Later in Spark, we will be able to parse the data and apply simple transformations.

```bash
{"id":1,"name":"Simon","birth_date":"1998-07-01"}
{"id":2,"name":"Anne"}
{"id":3,"name":"John","birth_date":"2000-04-23"}
```

3. We have to read our data stream. To do that we will use the `readStream` function from the Spark session.
    * `format` will define our input source type. There are a few built-in sources:
        * Kafka source - we will be using Kafka to read our data stream,
        * File source - reads data from the given directory (files processed in order based on modification time)
        * Socket source
        * Rate source
        * Rate Micro-Batch source
        
        For more information on input sources, see: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources
        
    * Now we have to define options for reading data from Kafka.
        * We are running our broker on localhost with port 9092
        * We want to subscribe to the topic `events` that we created earlier
        * We define `startingOffsets` - `earliest` reads from starting offset while `latest` reads newly discovered events
        
        For more information on Spark Streaming and Kafka integration see: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

In [3]:
# Reading data stream
df_stream = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "events")
    .option("startingOffsets", "earliest")
    .load()
)

4. After loading data we can print its schema. Which columns should we consider metadata and which contain our actual data?
    * `value` will be our binary data other columns can be confidered matdata

In [4]:
# Printing schema
df_stream.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



5. We can extract information from binary data by defining data schema. In this example, we are reading JSON messages that are cast to string and parsed with the `from_json` function. Our custom schema is based on the data samples that we sent to a topic. We are also appending `timestamp` to a final DataFrame. This way we are able to see the time when the event was processed.

In [5]:
# Defining new custom data schema
schema = (
    StructType()
    .add("id", IntegerType())
    .add("name", StringType())
    .add("birth_date", DateType())
)

# Parsing binary data + timestamp,
df_stream = df_stream.selectExpr("CAST(value AS STRING)", "timestamp").select(
    from_json(col("value"), schema).alias("sample"), "timestamp"
)

# Selecting samples
df_stream = df_stream.select("sample.*", "timestamp")

6. We can print the schema again to show the changes in the DataFrame structure.

In [6]:
# Printing schema
df_stream.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- timestamp: timestamp (nullable = true)



7. Now we can run some simple transformations or filters. Remember that transformations defined in this step must be compatible with the output mode that you will use later. For more information see https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes.

In [7]:
df_stream = df_stream.where(col("name") != "John")

8. To see the transformations result we have to use the `writeStream` method. We can use various output sinks such as file or Kafka. For debugging purposes, we will use a memory sink. For more information about output sinks, see https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks. It is also essential to define a proper output mode:
    * `append` mode (default) - used when rows added to the table are not going to change. Only new rows will be added to the result table. This will ensure that every row will be processed only once;
    * `complete` mode - the whole result table will be outputted to the sink after every trigger;
    * `update` model - only rows that were updated since the last trigger will be outputted to a sink.

In [8]:
# Creating write stream
query = (
    df_stream.writeStream.outputMode("append")
    .format("memory")
    .queryName("query_name")
    .start()
)

`Trigger` is an additional parameter that we didn't use in the `writeStream` definition but is important in stream processing. This parameter defines query execution timing. Queries can be executed in a micro-batch manner or as a continuous process (experimental feature).

For more info about triggers see: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

9. Right now our query is running in the background. We are dealing with a data stream so we can't simply treat it as a batch of data where computations end after some time. To check the status of our computations in a Jupyter notebook, we can use this handy `check_query` function.

In [9]:
def check_query(query: StreamingQuery, sleep_time: int = 5) -> None:
    while True:
        clear_output(wait=True)
        display(query.status)
        display(spark.sql(f"SELECT * FROM {query.name}").show(100))
        sleep(sleep_time)

10. Now you can try to add your own examples with Kafka producer.

**Remember to stop the execution of the last cell before moving to the next exercises (the `check_query` function is running in an infinite loop).**

In [10]:
# Checking results in an infinite loop
check_query(query)

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

+---+-----+----------+--------------------+
| id| name|birth_date|           timestamp|
+---+-----+----------+--------------------+
|  1|Simon|2019-07-01|2022-12-28 10:11:...|
|  2| Anne|2015-12-15|2022-12-28 10:11:...|
|  1|Simon|2019-07-01|2022-12-28 10:11:...|
|  1|Simon|2019-07-01|2022-12-28 10:23:...|
|  1|Simon|2019-07-01|2022-12-28 10:35:...|
|  1|Simon|2019-07-01|2022-12-28 16:13:...|
|  1|Simon|2019-07-01|2022-12-28 16:13:...|
|  1|Simon|1998-07-01|2022-12-29 07:07:...|
|  2| Anne|      null|2022-12-29 07:07:...|
|  1|Johny|      null|2022-12-29 07:43:...|
+---+-----+----------+--------------------+



None

KeyboardInterrupt: 