# EX9-STREAM: Spark Structured Streaming + Kafka

Your assignment: complete the `TODO`'s and include also the **output of each cell**.

#### You may need to read the [Structured Streaming API Documentation](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/index.html) to complete this lab.

### Before starting this exercise: (1) start `kafka` stack; (2) start `kafkafakestream` stack.

### Step 1: **[PLAN A]** Start Spark Session

In [1]:
from pyspark.sql import SparkSession

try:
    spark.stop()
except NameError:
    print("SparkContext not defined")

# cluster mode (and minio for distributed file system)
spark = SparkSession.builder \
            .appName("Spark SQL basic example") \
            .master("local[*]") \
	    	.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.4") \
            .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
            .config("spark.hadoop.fs.s3a.access.key", "pdm_minio") \
            .config("spark.hadoop.fs.s3a.secret.key", "pdm_minio") \
            .config("spark.hadoop.fs.s3a.path.style.access", "true") \
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
	    	.getOrCreate()

SparkContext not defined
:: loading settings :: url = jar:file:/app/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c7e477e8-605e-4f97-9abb-3dfa35f353e9;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.4 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.4 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.5 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-

### Step 2: **[PLAN A]** Create stream of pizza orders from Kafka

In [4]:
# Read from Kafka
df_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "pizza") \
    .option("startingOffsets", "earliest") \
    .load()

from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType, ArrayType
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, TimestampType

schema = StructType([
    StructField("id", IntegerType()),
    StructField("shop", StringType()),
    StructField("name", StringType()),
    StructField("phoneNumber", StringType()),
    StructField("address", StringType()),
    StructField("pizzas", ArrayType(
        StructType([
            StructField("pizzaName", StringType()),
            StructField("additionalToppings", ArrayType(StringType()))
        ])
    )),
    StructField("timestamp", LongType())  # This is in epoch millis
])


df_stream = df_stream.selectExpr("CAST(value AS STRING) as json_str")
df_stream = df_stream.select(from_json(col("json_str"), schema).alias("data")).select("data.*")

df_stream_writer = df_stream.writeStream.format("console").outputMode("append")
df_stream_writer = df_stream_writer.trigger(processingTime="1 second")
df_stream_query = df_stream_writer.start()
df_stream_query.awaitTermination(10)

25/05/20 03:02:15 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-ef44f5fd-93e4-408e-97b2-1573dbc3b1f0. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/05/20 03:02:15 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/05/20 03:02:15 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+---+--------------------+---------------+--------------------+--------------------+--------------------+-------------+
| id|                shop|           name|         phoneNumber|             address|              pizzas|    timestamp|
+---+--------------------+---------------+--------------------+--------------------+--------------------+-------------+
|  0|        Marios Pizza|    Jason Brown|       (851)502-9074|2701 Samuel Summi...|[{Marinara, [🍓 s...|1747710107388|
|  1|        Marios Pizza|     Edward Liu|   (366)611-5493x353|561 Lester Points...|[{Diavola, [🥓 ba...|1747710109268|
|  2|        Marios Pizza|Joshua Peterson|        933-884-1198|90464 Amanda Port...|[{Marinara, [🍓 s...|1747710114098|
|  3|        Marios Pizza|    Maria Lopez|  783-390-4640x10333|0329 James Drive ...|[{Salami, [🌶️ ho...|1747710114833|
|  4|Circular Pi Pizzeria|    Chris Lyons|001-371-729-3812x...|3856

25/05/20 03:02:19 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 3700 milliseconds


False

### Step 3: Explore the example above, change parameters, see the results

This is a open exercise (show your work and explain the output). Fake kafka stream has other options concerning subject, number of messages, waiting time, etc.