# Hands-on with Spark (Structured Streaming)

![spark](https://cdn-images-1.medium.com/max/300/1*c8CtvqKJDVUnMoPGujF5fA.png)

In the previous lesson, we learnt about Spark SQL, Dataframes and Pandas API. In this lesson, we will continue with the Structured Streaming.

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive.

You can use the Dataset/DataFrame API to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides **fast, scalable, fault-tolerant, end-to-end exactly-once** stream processing without the user having to reason about streaming.

Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees.

## Installing and Initializing Spark

First, like previously, we'll need to install Spark and its dependencies:

1.   Java 8
2.   Apache Spark with Hadoop
3.   Findspark (used to locate the Spark in the system)


In [1]:
import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'

# set the options to connect to our Kafka cluster
options = {
    # "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.scram.ScramLoginModule required username="YnJhdmUtZmlzaC0xMTQ2MyQSvwXBuLOQsV1W7YffuC8cDaZcA3fKQwakMhnQGgg" password="MDUxNjc4YzEtYzYxNy00NTE1LWEwNWYtMDBhODRlZmE0OGJm";',
    # "kafka.sasl.mechanism": "SCRAM-SHA-256",
    # "kafka.security.protocol" : "SASL_SSL",
    "kafka.bootstrap.servers": 'localhost:9092',
    "subscribe": 'pizza-orders',
}

In [None]:
# # findspark is only required if you are using a standalone Spark installation (downloaded tar.gz)
# import findspark
# findspark.init()

The below cell may take a few minutes to run. It is slow because:
- Package downloading: Your PYSPARK_SUBMIT_ARGS includes a Kafka package that needs to be downloaded from Maven repositories on first run
- JVM cold start: Initial JVM startup is always slow
- Jupyter overhead: Running in a notebook adds some initialization overhead

In [None]:
# We will only use 2 cores below to speed up creation of the spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("LearnSparkStreaming").master("local[2]").getOrCreate()

:: loading settings :: url = jar:file:/opt/homebrew/Caskroom/miniconda/base/envs/kafka/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/germayne/.ivy2/cache
The jars for the packages stored in: /Users/germayne/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e6aab70d-b453-41bd-8361-ba03fcce1386;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
downloading https://repo1.maven.org/maven2/org/apache/sp

25/03/18 21:54:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark

## Read and Analyze Kafka stream in "Batch" Mode

Let's start with reading and analyzing our `pizza-orders` kafka topic in the usual "batch" mode of Spark SQL and Dataframes. This is akin to the batch queries we did in the previous lesson. In this case we are taking the messages with the earliest to latest offsets of the topic as a single "batch".

In [23]:
pizza_df = spark.read.format('kafka')\
    .options(**options)\
    .load()

In [5]:
pizza_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [25]:
pizza_df.show()

+--------------------+--------------------+------------+---------+------+--------------------+-------------+
|                 key|               value|       topic|partition|offset|           timestamp|timestampType|
+--------------------+--------------------+------------+---------+------+--------------------+-------------+
|[7B 22 73 68 6F 7...|[7B 22 69 64 22 3...|pizza-orders|        0|     0|2025-03-18 21:52:...|            0|
|[7B 22 73 68 6F 7...|[7B 22 69 64 22 3...|pizza-orders|        0|     1|2025-03-18 21:52:...|            0|
|[7B 22 73 68 6F 7...|[7B 22 69 64 22 3...|pizza-orders|        0|     2|2025-03-18 21:52:...|            0|
|[7B 22 73 68 6F 7...|[7B 22 69 64 22 3...|pizza-orders|        0|     3|2025-03-18 21:53:...|            0|
|[7B 22 73 68 6F 7...|[7B 22 69 64 22 3...|pizza-orders|        0|     4|2025-03-18 21:53:...|            0|
|[7B 22 73 68 6F 7...|[7B 22 69 64 22 3...|pizza-orders|        0|     5|2025-03-18 21:53:...|            0|
|[7B 22 73 68 6F 7.

In [24]:
pizza_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show()

+--------------------+--------------------+
|                 key|               value|
+--------------------+--------------------+
|{"shop": "Circula...|{"id": 1, "shop":...|
|{"shop": "Luigis ...|{"id": 2, "shop":...|
|{"shop": "Luigis ...|{"id": 3, "shop":...|
|{"shop": "Circula...|{"id": 4, "shop":...|
|{"shop": "Its-a m...|{"id": 5, "shop":...|
|{"shop": "Circula...|{"id": 6, "shop":...|
|{"shop": "Marios ...|{"id": 7, "shop":...|
|{"shop": "Ill Mak...|{"id": 8, "shop":...|
|{"shop": "Mammami...|{"id": 9, "shop":...|
+--------------------+--------------------+



In [7]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StringType, IntegerType, LongType, DoubleType, StructType, ArrayType, StructField

In [8]:
pizza_schema = StructType([
  StructField("pizzaName", StringType()),
  StructField("additionalToppings", ArrayType(StringType())),
])

order_schema = StructType([
  StructField("address", StringType()),
  StructField("id", IntegerType()),
  StructField("name", StringType()),
  StructField("phoneNumber", StringType()),
  StructField("shop", StringType()),
  StructField("cost", DoubleType()),
  StructField("pizzas", ArrayType(pizza_schema)),
  StructField("timestamp", LongType()),
])

In [9]:
parsed_df = pizza_df.select("timestamp", from_json(col("value").cast("string"), order_schema).alias("value"))

In [10]:
parsed_df.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- value: struct (nullable = true)
 |    |-- address: string (nullable = true)
 |    |-- id: integer (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- phoneNumber: string (nullable = true)
 |    |-- shop: string (nullable = true)
 |    |-- cost: double (nullable = true)
 |    |-- pizzas: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- pizzaName: string (nullable = true)
 |    |    |    |-- additionalToppings: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |-- timestamp: long (nullable = true)



In [11]:
parsed_df.show(truncate=False)

+-----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|timestamp              |value                                                                                                                                                                                                                                                                                                                        |
+-----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

We can use _dot notation_ to select the field within a `Struct`:

In [12]:
parsed_df.select("value.cost").show()

+-----+
| cost|
+-----+
|  3.3|
|45.29|
|24.15|
|12.82|
| 4.53|
|24.47|
|27.58|
|39.86|
|38.72|
+-----+



                                                                                

Computing the "total revenue" per shop:

In [13]:
parsed_df.groupBy("value.shop").sum("value.cost").show(truncate=False)

[Stage 3:>                                                          (0 + 1) / 1]

+------------------------------------+-----------------------+
|shop                                |sum(value.cost AS cost)|
+------------------------------------+-----------------------+
|Ill Make You a Pizza You Cant Refuse|39.86                  |
|Luigis Pizza                        |69.44                  |
|Mammamia Pizza                      |38.72                  |
|Its-a me! Mario Pizza!              |4.53                   |
|Marios Pizza                        |27.58                  |
|Circular Pi Pizzeria                |40.59                  |
+------------------------------------+-----------------------+



                                                                                

> 1. Count the no. of orders by shop.
> 2. Compute the avg revenue by shop and sort by highest to lowest.

In [14]:
from pyspark.sql.functions import min, max

In [15]:
parsed_df.select(min("timestamp"), max("timestamp")).show(truncate=False)

+-----------------------+-----------------------+
|min(timestamp)         |max(timestamp)         |
+-----------------------+-----------------------+
|2025-03-18 21:52:55.545|2025-03-18 21:53:09.599|
+-----------------------+-----------------------+



## Read and Analyze Kafka stream in "Streaming" Mode

The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended. This leads to a new stream processing model that is very similar to a batch processing model. You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the *unbounded input table*. Let’s understand this model in more detail.

Consider the input data stream as the “Input Table”. Every data item that is arriving on the stream is like a new row being appended to the Input Table.

![concept](https://spark.apache.org/docs/latest/img/structured-streaming-stream-as-a-table.png)

A query on the input will generate the “Result Table”. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.

![result table](https://spark.apache.org/docs/latest/img/structured-streaming-model.png)

To illustrate the use of this model, let’s understand the model in context of a word count model. The first lines DataFrame is the input table, and the final wordCounts DataFrame is the result table. Note that the query on streaming lines DataFrame to generate wordCounts is exactly the same as it would be a static DataFrame. However, when this query is started, Spark will continuously check for new data from the socket connection. If there is new data, Spark will run an “incremental” query that combines the previous running counts with the new data to compute updated counts, as shown below.

![example](https://spark.apache.org/docs/latest/img/structured-streaming-example-model.png)




Event-time is the time embedded in the data itself. For many applications, you may want to operate on this event-time.

For example, if you want to get the number of events generated by IoT devices every minute, then you probably want to use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them.

This event-time is very naturally expressed in this model – each event from the devices is a row in the table, and event-time is a column value in the row. This allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the event-time column – each time window is a group and each row can belong to multiple windows/groups. Therefore, such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream, making the life of the user much easier.

Furthermore, this model naturally handles data that has arrived later than expected based on its event-time. Since Spark is updating the Result Table, it has full control over updating old aggregates when there is late data, as well as cleaning up old aggregates to limit the size of intermediate state data.

In [34]:
pizza_df = spark.readStream.format('kafka')\
    .options(**options)\
    .load()

In [35]:
pizza_df.isStreaming

True

In [36]:
pizza_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [54]:
parsed_df = pizza_df.select("timestamp", from_json(col("value").cast("string"), order_schema).alias("value"))

In [41]:
parsed_df.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- value: struct (nullable = true)
 |    |-- address: string (nullable = true)
 |    |-- id: integer (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- phoneNumber: string (nullable = true)
 |    |-- shop: string (nullable = true)
 |    |-- cost: double (nullable = true)
 |    |-- pizzas: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- pizzaName: string (nullable = true)
 |    |    |    |-- additionalToppings: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |-- timestamp: long (nullable = true)



In [None]:
query = parsed_df \
    .writeStream \
    .format("console") \
    .start()

query.awaitTermination() # stops the script from exiting
# query.isActive
# query.recentProgress
# query.stop()

25/03/18 22:08:40 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/z9/_l65135549l_v6k4d3tz369r0000gn/T/temporary-5fa09916-1c31-4cf2-bb84-be9dae426409. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/03/18 22:08:40 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+
|timestamp|value|
+---------+-----+
+---------+-----+



In [None]:
query.stop()

In [None]:
query.recentProgress

[{'id': '590d73ca-0eaa-4c3b-a9d7-47794ea5a5cd',
  'runId': '88fc1108-a1e5-43f6-8593-a3f277ef2afe',
  'name': None,
  'timestamp': '2025-03-18T14:08:40.658Z',
  'batchId': 0,
  'numInputRows': 0,
  'inputRowsPerSecond': 0.0,
  'processedRowsPerSecond': 0.0,
  'durationMs': {'addBatch': 23,
   'getBatch': 1,
   'latestOffset': 3254,
   'queryPlanning': 7,
   'triggerExecution': 3482,
   'walCommit': 90},
  'stateOperators': [],
  'sources': [{'description': 'KafkaV2[Subscribe[pizza-orders]]',
    'startOffset': None,
    'endOffset': {'pizza-orders': {'0': 9}},
    'latestOffset': {'pizza-orders': {'0': 9}},
    'numInputRows': 0,
    'inputRowsPerSecond': 0.0,
    'processedRowsPerSecond': 0.0,
    'metrics': {'avgOffsetsBehindLatest': '0.0',
     'maxOffsetsBehindLatest': '0',
     'minOffsetsBehindLatest': '0'}}],
  'sink': {'description': 'org.apache.spark.sql.execution.streaming.ConsoleTable$@6f418af6',
   'numOutputRows': 0}},
 {'id': '590d73ca-0eaa-4c3b-a9d7-47794ea5a5cd',
  'runI

-------------------------------------------
Batch: 19
-------------------------------------------
+--------------------+--------------------+
|           timestamp|               value|
+--------------------+--------------------+
|2025-03-18 22:15:...|{4298 Schroeder C...|
+--------------------+--------------------+

-------------------------------------------
Batch: 19
-------------------------------------------
-------------------------------------------
Batch: 19
-------------------------------------------
-------------------------------------------
Batch: 19
-------------------------------------------
-------------------------------------------
Batch: 19
-------------------------------------------
+--------------------+--------------------+
|           timestamp|               value|
+--------------------+--------------------+
|2025-03-18 22:15:...|{4298 Schroeder C...|
+--------------------+--------------------+

-------------------------------------------
Batch: 19
--------------

                                                                                

-------------------------------------------
Batch: 22
-------------------------------------------
-------------------------------------------
Batch: 22
-------------------------------------------
-------------------------------------------
Batch: 22
-------------------------------------------
+--------------------+--------------------+
|           timestamp|               value|
+--------------------+--------------------+
|2025-03-18 22:15:...|{94556 Chaney Cam...|
+--------------------+--------------------+

+--------------------+--------------------+
|           timestamp|               value|
+--------------------+--------------------+
|2025-03-18 22:15:...|{94556 Chaney Cam...|
+--------------------+--------------------+

+--------------------+--------------------+
|           timestamp|               value|
+--------------------+--------------------+
|2025-03-18 22:15:...|{94556 Chaney Cam...|
+--------------------+--------------------+

-------------------------------------------

                                                                                

-------------------------------------------
Batch: 23
-------------------------------------------
+--------------------+--------------------+
|           timestamp|               value|
+--------------------+--------------------+
|2025-03-18 22:15:...|{3903 Jose Fords\...|
+--------------------+--------------------+

-------------------------------------------
Batch: 23
-------------------------------------------
-------------------------------------------
Batch: 23
-------------------------------------------
+--------------------+--------------------+
|           timestamp|               value|
+--------------------+--------------------+
|2025-03-18 22:15:...|{3903 Jose Fords\...|
+--------------------+--------------------+

+--------------------+--------------------+
|           timestamp|               value|
+--------------------+--------------------+
|2025-03-18 22:15:...|{3903 Jose Fords\...|
+--------------------+--------------------+

-------------------------------------------

                                                                                

-------------------------------------------
Batch: 24
-------------------------------------------
-------------------------------------------
Batch: 24
-------------------------------------------
-------------------------------------------
Batch: 24
-------------------------------------------
-------------------------------------------
Batch: 24
-------------------------------------------
-------------------------------------------
Batch: 24
-------------------------------------------
-------------------------------------------
Batch: 24
-------------------------------------------
+--------------------+--------------------+
|           timestamp|               value|
+--------------------+--------------------+
|2025-03-18 22:15:...|{06867 Sean Sprin...|
+--------------------+--------------------+

+--------------------+--------------------+
|           timestamp|               value|
+--------------------+--------------------+
|2025-03-18 22:15:...|{06867 Sean Sprin...|
+--------------

### In the next few cells, we will provide a complete example of counting the number of pizza shops with orders, as the orders stream in.

In [None]:
# First check if any streams are active
spark.streams.active

In [None]:
# If there are any active streams, stop them
for q in spark.streams.active:
    print(f"Stopping query: {q.name}")
    q.stop()

In [None]:
# Complete example with counting
pizza_df = spark.readStream.format('kafka')\
    .options(**options)\
    .load()

parsed_df = pizza_df.select("timestamp", from_json(col("value").cast("string"), order_schema).alias("value")) #.groupBy("value.shop").count()

shop_counts = parsed_df.groupBy("value.shop").count()

# outputMode("complete") rewrites all aggregated results every trigger — good for full snapshots.
# outputMode("update") - only updated rows are written
query = shop_counts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()

In [None]:
query.stop()

You have come to the end of this exercise.

To delete the Kafka topic `pizza-orders`, use the command below in your terminal:

`./kafka-topics.sh --delete --topic pizza-orders --bootstrap-server localhost:9092`

To exit Kafka in your terminal, type `exit`.