# **Assignment 4: PySpark Structured Streaming Using Kafka Source**

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-kafka-streaming").\
        master("spark://spark-master:7077").\
        config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0"). \
        config("spark.executor.memory", "512m").\
        getOrCreate()

## ==== Q2 ==============================================

#### **2.1**: Create a streaming DataFrame in Spark that reads data from a Kafka topic named "topic_test" and starts processing from the beginning of the topic's log using the earliest available offset. Consult the [documentation](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#reading-data-from-kafka). [10 points]

In [None]:
#Answer to 2.1
df_streamed_raw = (spark
  .readStream
  # Add your code here
  .load())

In [None]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import col

# convert byte stream to string
df_streamed_kv = (df_streamed_raw
    .withColumn("key", df_streamed_raw["key"].cast(StringType()))
    .withColumn("value", df_streamed_raw["value"].cast(StringType())))

test_query = (df_streamed_kv 
              .writeStream \
              .format("memory") # output to memory \
              .outputMode("update") # only write updated rows to the sink \
              .queryName("test_query_table")  # Name of the in memory table \
              .start())

#### If all goes well, the following cell should display a table populated with values being streamed from you Kafka producer. NOTE: If you recently ran the producer, it may take a while before the table is populated. Keep rerunning the cell to check for updates:

In [None]:
spark.sql("select * from test_query_table").show()

In [None]:
test_query.stop()

In [None]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, BooleanType, LongType, IntegerType

event_schema = StructType([
    StructField("ip_address", StringType()),
    StructField("date_time", StringType()),
    StructField("request_type", StringType()),
    StructField("request_arg", StringType()),
    StructField("status_code", StringType()),
    StructField("response_size", StringType()),
    StructField("referrer", StringType()),
    StructField("user_agent", StringType())
])

# Parse the events from JSON format
df_parsed = (df_streamed_kv
           # Sets schema for event data
           .withColumn("value", from_json("value", event_schema))
          )

In [None]:
df_formatted = (df_parsed.select(
    col("key").alias("event_key")
    ,col("topic").alias("event_topic")
    ,col("timestamp").alias("event_timestamp")
    ,col("value.ip_address").alias("ip_address")
    ,col("value.date_time").alias("date_time")
    ,col("value.request_type").alias("request_type")
    ,col("value.request_arg").alias("request_arg")
    ,col("value.status_code").alias("status_code")
    ,col("value.response_size").cast(IntegerType()).alias("response_size")
    ,col("value.referrer").alias("referrer")
    ,col("value.user_agent").alias("user_agent")
))

#### **Q2.2**: Read up on output sinks in the documentation. Select the appropriate Sink and write the streaming data received from the Kafka topic "topic_test" to an in-memory table named "access_log_table" and configure the output mode to append the table with only the changed rows. [5 points]spark.sql("select count(1) from query_sma").show()

In [None]:
# Answer to 2.2
query = (df_formatted
 .writeStream
 # Add your code here
 .start())

In [None]:
# Given
for s in spark.streams.active:
    print(f"ID:{s.id} | NAME:{s.name}")

## ==== Q3 ==============================================

#### **Q3.1**: Write code to continuously count the number of rows in access_log_table while the streaming query is active. Update this output every 5 seconds. HINT: You can use the clear_output function from the IPython.display module to clear the output console. [15 points]

In [None]:
from IPython.display import clear_output
from time import sleep

try:
    #=========== 3.1 =============================
    pass
    #=========== 3.1 =============================
        
except KeyboardInterrupt:
#     test_query.stop()
    print("stream process interrupted")

#### **Q3.2**: Count the number of requests per type (i.e GET, POST etc) so far. [15 points]

#### **Q3.3**: Compute the simple moving average of request size with a window size of the latest 10 requests. [20 Points]

#### **Q3.4**: Find the sizes of the request that are more than one moving standard deviation above the moving average [20 Points]