## Working with Json Array streaming from Kafka
Here, we have some instructions on how to handle array of json. Make sure your <b>Kafka Producer</b> is publishing an Array of objects.

<b>Revised by Chee-Ming Ting, 5 May 2022</b>: 
This demo uses LT2-Producer to generate and publish data, where "Clicks" and "'Impressions" data are sent in String type. 
Therefore, the defined schema used in the from_jason should match the String type.

The from_json() function has a constrait to be followed to convert column value to a dataframe:
<b>Whatever datatype you define in the schema should match with the value present in the json, if there is any column's mismatch value leads to null in all column values.</b>


### Step 1 : Initialize Spark Session 

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql import functions as F
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .appName("Clickstream Analysis in Spark") \
    .getOrCreate()


### Step 2 : Read Stream from the Kafka Topic

In [2]:
topic = "clickstream"

#configuration
hostip = "10.156.3.124" #change me

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", f'{hostip}:9092') \
  .option("subscribe", topic) \
  .load()

In [3]:
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

### Defining the Schema and Parsing the data
Here, since we are receiving data as an <strong>array</strong> compared with a single object in the previous example, we need to use <code>ArrayType</code> while defining our schema.

Before Running this, make sure that the <strong>LT2-Producer.ipynb</strong> is producing data in the following or similar format:
<code>
[{'Clicks': 0, 'Impressions': 3, 'ts': 1603072527}, {'Clicks': 0, 'Impressions': 3, 'ts': 1603072527}, {'Clicks': 0, 'Impressions': 3, 'ts': 1603072527}, {'Clicks': 0, 'Impressions': 3, 'ts': 1603072527}, {'Clicks': 0, 'Impressions': 11, 'ts': 1603072527}, {'Clicks': 1, 'Impressions': 11, 'ts': 1603072527}]
</code>

In [4]:
#Define the schema for the structured datastream received
# schema = ArrayType(StructType([    
#     StructField('Clicks', IntegerType(), True), 
#     StructField('Impressions', IntegerType(), True),
#     StructField('ts', TimestampType(), True)            
# ]))

# Since "Clicks" and "'Impressions" sent from LT2-Producer in string type, 
# ensure data type defined in schema to match the string type when using the from_json
# "Clicks" and "'Impressions" can be coverted back to int type later
schema = ArrayType(StructType([    
    StructField('Clicks', StringType(), True), 
    StructField('Impressions', StringType(), True),
    StructField('ts', TimestampType(), True)            
]))

In [5]:
df = df.select(F.from_json(F.col("value").cast("string"), schema).alias('parsed_value'))

In [6]:
df.printSchema()

root
 |-- parsed_value: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Clicks: string (nullable = true)
 |    |    |-- Impressions: string (nullable = true)
 |    |    |-- ts: timestamp (nullable = true)



You can notice the schema above, the <strong>Columns</strong> are nested. We can use the <code>explode</code> function to flatten it.

In [7]:
df = df.select(F.explode(F.col("parsed_value")).alias('unnested_value'))      

In [8]:
df.printSchema()

root
 |-- unnested_value: struct (nullable = true)
 |    |-- Clicks: string (nullable = true)
 |    |-- Impressions: string (nullable = true)
 |    |-- ts: timestamp (nullable = true)



After using the <strong>.explode()</strong>, the schema looks normal again, we can now proceed with the rest of the operations.

In [9]:
df_formatted = df.select(
                    F.col("unnested_value.Clicks").alias("Clicks"),
                    F.col("unnested_value.Impressions").alias("Impressions"),
                    F.col("unnested_value.ts").alias("ts")
                )

In [10]:
df_formatted.printSchema()

root
 |-- Clicks: string (nullable = true)
 |-- Impressions: string (nullable = true)
 |-- ts: timestamp (nullable = true)



Convert columns "Clicks" & "Impressions" back to integer type

In [11]:
df_formatted = df_formatted.withColumn("Clicks", F.col("Clicks").cast("int")).withColumn("Impressions", F.col("Impressions").cast("int"))

In [12]:
df_formatted.printSchema()

root
 |-- Clicks: integer (nullable = true)
 |-- Impressions: integer (nullable = true)
 |-- ts: timestamp (nullable = true)



In [13]:
query_formatted = df_formatted \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [14]:
query_formatted.stop()

In [15]:
#Using the .minute function, we can perform the following aggregation 
grouped_by_min = df_formatted.groupBy(F.minute("ts").alias("minute_bin"))\
                    .agg(F.sum("Impressions").alias("Total Impressions"))   

In [16]:
grouped_by_min.printSchema()

root
 |-- minute_bin: integer (nullable = true)
 |-- Total Impressions: long (nullable = true)



In [17]:
query_group = grouped_by_min \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("impressions_minute_bin") \
    .trigger(processingTime='5 seconds') \
    .start()

In [20]:
spark.sql("select * from impressions_minute_bin").show()

+----------+-----------------+
|minute_bin|Total Impressions|
+----------+-----------------+
|        12|               20|
+----------+-----------------+



In [38]:
query_group.stop()