# Apache Kafka PySpark streaming

Welcome to the Kafka Streaming lab class! In this class, we will be learning how to use Apache Kafka to process and analyze real-time data streams. Kafka is a distributed streaming platform that is used for building real-time data pipelines and streaming applications. It is highly scalable and fault-tolerant, making it an ideal choice for handling large volumes of data. Let's get started!


In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.jars.packages", "spark-sql-kafka-0-10_2.12").getOrCreate()

In [9]:
df = spark.readStream.format("kafka") \
      .option("kafka.bootstrap.servers", "kafka:9092") \
      .option("subscribe", "page_view_events") \
      .load()

df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



## Exercise 2
By default the spark-sql-kafka returns the `key` and `value` fields as a binary.
1. Cast both of them to `string`
4. Create a query named `q1` with the outputMode set to `append` and format set to `memory`
6. Define `StructType` that contains following fields: `ts: long`, `state: string`, `city: string`
8. Create a DataFrame with the `ts`, `state` and `city` fields that are encoded in JSON inside `value` field.

In [10]:
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.printSchema()

In [None]:
query = df.writeStream.outputMode("append").format("memory").queryName("q1").start()

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, LongType

schema = StructType(
    [
        StructField("ts", LongType(), True),
        StructField("state", StringType(), True),
        StructField("city", StringType(), True)
    ]
)

In [12]:
from pyspark.sql.functions import col, from_json

df = spark.sql("SELECT value FROM q1") \
    .withColumn("value", from_json("value", schema)) \
    .select(col("value.*"))

IllegalArgumentException: Cannot start query with name q1 as a query with that name is already active in this SparkSession

## Exercise 3

Print the count of the events in each state sorted by the count every 10 seconds. Restart the kafka instance and find a place where the pyspark has reconnected with the kafka.

In [6]:
from IPython.display import display, clear_output
from time import sleep

while True:
    # clear_output(wait=True)
    display(query.status)
    display(df.groupBy("state").count().sort(col("count").desc()).show())
    display(df.count())
    sleep(10)

{'message': 'Getting offsets from KafkaV2[Subscribe[page_view_events]]',
 'isDataAvailable': False,
 'isTriggerActive': True}

+-----+-----+
|state|count|
+-----+-----+
+-----+-----+



None

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

+-----+-----+
|state|count|
+-----+-----+
|   KY|    1|
|   IL|    1|
+-----+-----+



None

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

+-----+-----+
|state|count|
+-----+-----+
|   KY|    1|
|   IL|    1|
|   AL|    1|
|   CA|    1|
|   AZ|    1|
|   NY|    1|
+-----+-----+



None

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

+-----+-----+
|state|count|
+-----+-----+
|   CA|    3|
|   NY|    3|
|   NC|    2|
|   KY|    1|
|   IL|    1|
|   AL|    1|
|   NM|    1|
|   AZ|    1|
|   WI|    1|
+-----+-----+



None

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

+-----+-----+
|state|count|
+-----+-----+
|   CA|    4|
|   NY|    4|
|   NC|    2|
|   WI|    2|
|   GA|    2|
|   KY|    1|
|   AL|    1|
|   IL|    1|
|   AZ|    1|
|   NM|    1|
|   MI|    1|
+-----+-----+



None

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

+-----+-----+
|state|count|
+-----+-----+
|   CA|    4|
|   NY|    4|
|   NC|    2|
|   WI|    2|
|   GA|    2|
|   KY|    1|
|   AL|    1|
|   IL|    1|
|   AZ|    1|
|   NM|    1|
|   MI|    1|
+-----+-----+



None

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

+-----+-----+
|state|count|
+-----+-----+
|   CA|    4|
|   NY|    4|
|   NC|    2|
|   WI|    2|
|   GA|    2|
|   KY|    1|
|   AL|    1|
|   IL|    1|
|   AZ|    1|
|   NM|    1|
|   MI|    1|
+-----+-----+



None

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

+-----+-----+
|state|count|
+-----+-----+
|   CA|    4|
|   NY|    4|
|   NC|    2|
|   WI|    2|
|   GA|    2|
|   KY|    1|
|   AL|    1|
|   IL|    1|
|   AZ|    1|
|   NM|    1|
|   MI|    1|
+-----+-----+



None

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

+-----+-----+
|state|count|
+-----+-----+
|   CA|    7|
|   NY|    5|
|   WI|    4|
|   TX|    3|
|   NC|    2|
|   GA|    2|
|   PA|    2|
|   FL|    2|
|   MI|    2|
|   AZ|    2|
|   VA|    2|
|   MO|    2|
|   KY|    1|
|   NM|    1|
|   IL|    1|
|   NV|    1|
|   MD|    1|
|   AL|    1|
|   IN|    1|
|   KS|    1|
+-----+-----+
only showing top 20 rows



None

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

+-----+-----+
|state|count|
+-----+-----+
|   CA|    7|
|   NY|    5|
|   WI|    4|
|   TX|    3|
|   PA|    2|
|   AZ|    2|
|   MI|    2|
|   MO|    2|
|   FL|    2|
|   AL|    2|
|   NC|    2|
|   GA|    2|
|   VA|    2|
|   KY|    1|
|   NM|    1|
|   IL|    1|
|   IN|    1|
|   HI|    1|
|   NV|    1|
|   LA|    1|
+-----+-----+
only showing top 20 rows



None

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

+-----+-----+
|state|count|
+-----+-----+
|   CA|    7|
|   NY|    5|
|   WI|    4|
|   NC|    3|
|   VA|    3|
|   MI|    3|
|   AL|    3|
|   TX|    3|
|   MO|    2|
|   GA|    2|
|   FL|    2|
|   KY|    2|
|   PA|    2|
|   AZ|    2|
|   IL|    2|
|   NM|    1|
|   IN|    1|
|   NV|    1|
|   KS|    1|
|   MD|    1|
+-----+-----+
only showing top 20 rows



None

{'message': 'Getting offsets from KafkaV2[Subscribe[page_view_events]]',
 'isDataAvailable': False,
 'isTriggerActive': True}

+-----+-----+
|state|count|
+-----+-----+
|   CA|    7|
|   NY|    5|
|   WI|    4|
|   NC|    4|
|   KY|    3|
|   FL|    3|
|   AL|    3|
|   VA|    3|
|   TX|    3|
|   MI|    3|
|   IL|    2|
|   MO|    2|
|   GA|    2|
|   PA|    2|
|   AZ|    2|
|   NM|    1|
|   IN|    1|
|   NV|    1|
|   KS|    1|
|   MD|    1|
+-----+-----+
only showing top 20 rows



None

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

+-----+-----+
|state|count|
+-----+-----+
|   CA|    7|
|   NY|    6|
|   WI|    4|
|   NC|    4|
|   KY|    3|
|   MI|    3|
|   PA|    3|
|   AL|    3|
|   VA|    3|
|   TX|    3|
|   FL|    3|
|   IL|    2|
|   MO|    2|
|   GA|    2|
|   AZ|    2|
|   NM|    1|
|   IN|    1|
|   NV|    1|
|   KS|    1|
|   MD|    1|
+-----+-----+
only showing top 20 rows



None

KeyboardInterrupt: 

# Exercise 4

Create a expression that groups the events into 10 seconds time windows. Print the last few rows, restart the kafka and show that no data was lost.

In [7]:
from pyspark.sql.functions import window
import pandas as pd

while True:
    #clear_output(wait=True)
    #display(query.status)
    display(pd.DataFrame(df
        .selectExpr("from_unixtime(CAST(ts/1000 as BIGINT)) as ts")
        .groupBy(window("ts", "10 seconds"))
        .count()
        .sort("window")
        .tail(15)
    ))
    sleep(10)

Unnamed: 0,0,1
0,"(2023-01-17 08:15:50, 2023-01-17 08:16:00)",3
1,"(2023-01-17 08:16:00, 2023-01-17 08:16:10)",3
2,"(2023-01-17 08:16:10, 2023-01-17 08:16:20)",7
3,"(2023-01-17 08:16:20, 2023-01-17 08:16:30)",7
4,"(2023-01-17 08:16:30, 2023-01-17 08:16:40)",6
5,"(2023-01-17 08:16:40, 2023-01-17 08:16:50)",4
6,"(2023-01-17 08:16:50, 2023-01-17 08:17:00)",6
7,"(2023-01-17 08:17:00, 2023-01-17 08:17:10)",7
8,"(2023-01-17 08:17:10, 2023-01-17 08:17:20)",5
9,"(2023-01-17 08:17:20, 2023-01-17 08:17:30)",4


Unnamed: 0,0,1
0,"(2023-01-17 08:15:50, 2023-01-17 08:16:00)",3
1,"(2023-01-17 08:16:00, 2023-01-17 08:16:10)",3
2,"(2023-01-17 08:16:10, 2023-01-17 08:16:20)",7
3,"(2023-01-17 08:16:20, 2023-01-17 08:16:30)",7
4,"(2023-01-17 08:16:30, 2023-01-17 08:16:40)",6
5,"(2023-01-17 08:16:40, 2023-01-17 08:16:50)",4
6,"(2023-01-17 08:16:50, 2023-01-17 08:17:00)",6
7,"(2023-01-17 08:17:00, 2023-01-17 08:17:10)",7
8,"(2023-01-17 08:17:10, 2023-01-17 08:17:20)",5
9,"(2023-01-17 08:17:20, 2023-01-17 08:17:30)",4


Unnamed: 0,0,1
0,"(2023-01-17 08:16:00, 2023-01-17 08:16:10)",3
1,"(2023-01-17 08:16:10, 2023-01-17 08:16:20)",7
2,"(2023-01-17 08:16:20, 2023-01-17 08:16:30)",7
3,"(2023-01-17 08:16:30, 2023-01-17 08:16:40)",6
4,"(2023-01-17 08:16:40, 2023-01-17 08:16:50)",4
5,"(2023-01-17 08:16:50, 2023-01-17 08:17:00)",6
6,"(2023-01-17 08:17:00, 2023-01-17 08:17:10)",7
7,"(2023-01-17 08:17:10, 2023-01-17 08:17:20)",5
8,"(2023-01-17 08:17:20, 2023-01-17 08:17:30)",4
9,"(2023-01-17 08:17:30, 2023-01-17 08:17:40)",4


Unnamed: 0,0,1
0,"(2023-01-17 08:16:00, 2023-01-17 08:16:10)",3
1,"(2023-01-17 08:16:10, 2023-01-17 08:16:20)",7
2,"(2023-01-17 08:16:20, 2023-01-17 08:16:30)",7
3,"(2023-01-17 08:16:30, 2023-01-17 08:16:40)",6
4,"(2023-01-17 08:16:40, 2023-01-17 08:16:50)",4
5,"(2023-01-17 08:16:50, 2023-01-17 08:17:00)",6
6,"(2023-01-17 08:17:00, 2023-01-17 08:17:10)",7
7,"(2023-01-17 08:17:10, 2023-01-17 08:17:20)",5
8,"(2023-01-17 08:17:20, 2023-01-17 08:17:30)",4
9,"(2023-01-17 08:17:30, 2023-01-17 08:17:40)",4


Unnamed: 0,0,1
0,"(2023-01-17 08:16:00, 2023-01-17 08:16:10)",3
1,"(2023-01-17 08:16:10, 2023-01-17 08:16:20)",7
2,"(2023-01-17 08:16:20, 2023-01-17 08:16:30)",7
3,"(2023-01-17 08:16:30, 2023-01-17 08:16:40)",6
4,"(2023-01-17 08:16:40, 2023-01-17 08:16:50)",4
5,"(2023-01-17 08:16:50, 2023-01-17 08:17:00)",6
6,"(2023-01-17 08:17:00, 2023-01-17 08:17:10)",7
7,"(2023-01-17 08:17:10, 2023-01-17 08:17:20)",5
8,"(2023-01-17 08:17:20, 2023-01-17 08:17:30)",4
9,"(2023-01-17 08:17:30, 2023-01-17 08:17:40)",4


Unnamed: 0,0,1
0,"(2023-01-17 08:16:30, 2023-01-17 08:16:40)",6
1,"(2023-01-17 08:16:40, 2023-01-17 08:16:50)",4
2,"(2023-01-17 08:16:50, 2023-01-17 08:17:00)",6
3,"(2023-01-17 08:17:00, 2023-01-17 08:17:10)",7
4,"(2023-01-17 08:17:10, 2023-01-17 08:17:20)",5
5,"(2023-01-17 08:17:20, 2023-01-17 08:17:30)",4
6,"(2023-01-17 08:17:30, 2023-01-17 08:17:40)",4
7,"(2023-01-17 08:17:40, 2023-01-17 08:17:50)",3
8,"(2023-01-17 08:17:50, 2023-01-17 08:18:00)",5
9,"(2023-01-17 08:18:00, 2023-01-17 08:18:10)",6


Unnamed: 0,0,1
0,"(2023-01-17 08:16:40, 2023-01-17 08:16:50)",4
1,"(2023-01-17 08:16:50, 2023-01-17 08:17:00)",6
2,"(2023-01-17 08:17:00, 2023-01-17 08:17:10)",7
3,"(2023-01-17 08:17:10, 2023-01-17 08:17:20)",5
4,"(2023-01-17 08:17:20, 2023-01-17 08:17:30)",4
5,"(2023-01-17 08:17:30, 2023-01-17 08:17:40)",4
6,"(2023-01-17 08:17:40, 2023-01-17 08:17:50)",3
7,"(2023-01-17 08:17:50, 2023-01-17 08:18:00)",5
8,"(2023-01-17 08:18:00, 2023-01-17 08:18:10)",6
9,"(2023-01-17 08:18:10, 2023-01-17 08:18:20)",8


KeyboardInterrupt: 