# Spark Preparation
We check if we are in Google Colab.  If this is the case, install all necessary packages.

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 3.2.1 with hadoop 3.2, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab.
Learn more from [A Must-Read Guide on How to Work with PySpark on Google Colab for Data Scientists!](https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/)

In [None]:
try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

In [None]:
if IN_COLAB:
    !apt-get install openjdk-8-jdk-headless -qq > /dev/null
    !wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
    !tar xf spark-3.2.1-bin-hadoop3.2.tgz
    !mv spark-3.2.1-bin-hadoop3.2 spark
    !pip install -q kafka-python
    !pip install -q findspark

In [None]:
if IN_COLAB:
  import os
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
  os.environ["SPARK_HOME"] = "/content/spark"

# Start a Local Cluster
Use findspark.init() to start a local cluster.  If you plan to use remote cluster, skip the findspark.init() and change the cluster_url according.

A special environment variable must be set before start the cluseter in order to allow Spark to use Kafka as a streming datasource.

In [None]:
# tell pyspark-shell to include spark-streaming-kafka package
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 pyspark-shell'

# then we start spark cluster
import findspark
findspark.init()

For Spark Streaming, we will need **at least 2 cores** for operation, receiving data (socket, kafka, etc.) and processing data.  We will use **'local[2]'** for our local cluster.

In [None]:
cluster_url = 'local[2]'

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master(cluster_url)\
        .appName("Spark Streaming")\
        .config('spark.ui.port', '4040')\
        .getOrCreate()
sc = spark.sparkContext

# Basic Structured Streaming Commands

Structured Streaming supports kafka datasource with 2 important parameters bootstrap server URL and topic to be subscribed.  Dataframe from the source contains several columns which can be seen from printSchema method.  

In this example, we receive sensor data from Kafka broker.  Data is a json format in the value field.

In [None]:
kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "moon:9092") \
    .option("subscribe", "data") \
    .load()

In [None]:
kafka_df.printSchema()

## Deserialize JSON data

Dataframe supports JSON deserialization.  We have to first define the structure and then use from_json function to process the raw data.  In this example, sensor data contains 3 fields, id, sensor_timestamp, and value.

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col, from_json, avg, count, window

In [None]:
schema = StructType([
    StructField('id', StringType(), True), 
    StructField('sensor_timestamp', StringType(), True), 
    StructField('value', IntegerType(), True)
])

To make this easier to understand, we perform deserialization into multiple steps, deserialize json format and then select only necessary columns.  In reality, these steps can be combined.

In [None]:
parse_data_df = kafka_df.withColumn("jsonData", from_json(col("value").cast('string'),schema))
parse_data_df.printSchema()

In [None]:
data_df = parse_data_df.select('timestamp', 'jsonData.*')
data_df.printSchema()

Calculate statistics including count and average with window operation on the data

In [None]:
window_stats = data_df.groupBy('id', window(data_df.timestamp, "40 seconds", "20 seconds")).agg(avg('value').alias('mean'), count('value').alias('count'))

## Trigger the stream processing

In [None]:
# Start running the query that prints the running counts to the console
query_window = window_stats \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .trigger(processingTime='20 seconds') \
    .start(truncate=False)


query_window.awaitTermination(100)

In [None]:
query_window.stop()
spark.stop()