# <center>Author: Victor Diallen

# Table of Contents :
* [1. Importing Libraries](#section1)
* [2. Creating Spark Session](#section2)
* [3. Kafka Spark Structured Stream Reading](#section3)
* [4. Preparing Dataframe](#section4)
* [5. Analyzing Data in Real Time](#section5)

<a id="section1"></a>
# Importing Libraries

In [None]:
# Import findspark and initialize
import findspark
findspark.init()

In [None]:
# Import required modules
import pyspark
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, from_json

In [None]:
# Conector
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

<a id="section2"></a>
# Criando a Sessão Spark

In [None]:
# Cria a sessão Spark
spark = SparkSession.builder.appName("RealTimeProject").getOrCreate()

<a id="section3"></a>
# Kafka Spark Structured Stream Reading

In [None]:
# Creates a subscription on the topic that has the data streaming we want to pull.
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "victordiallen") \
  .load()

## Defining Schema

In [None]:
# Define the data sche we want to get for analysis.
esquema_dados_temp = StructType([StructField("reading", 
                                             StructType([StructField("temperature", DoubleType(), True)]), True)])

In [None]:
# Define the global schema on data streaming
esquema_dados = StructType([ 
    StructField("id_sensor", StringType(), True), 
    StructField("id_equipment", StringType(), True), 
    StructField("sensor", StringType(), True), 
    StructField("data_event", StringType(), True), 
    StructField("pattern", esquema_dados_temp, True)
])

## Parsing Data

In [None]:
# Capture each row of data as string
df_conversao = df.selectExpr("CAST(value AS STRING)")

In [None]:
# JSON parsing to Dataframe
df_conversao = df_conversao.withColumn("jsonData", from_json(col("value"), esquema_dados)).select("jsonData.*")

In [None]:
df_conversao.printSchema()

<a id="section4"></a>
# Preparing Dataframe 

Esse dataframe está no formato que precisamos para análise.

In [None]:
# Renomeamos as colunas para simplificar nossa análise
df_conversao_temp_sensor = df_conversao.select(col("pattern.reading.temperature").alias("temperature"), 
                                               col("sensor"))

In [None]:
df_conversao_temp_sensor.printSchema()

<a id="section5"></a>
# Analyzing Data in Real Time

In [None]:
# Calculates the mean of temperatures by sensor
df_media_temp_sensor = df_conversao_temp_sensor.groupby("sensor").mean("temperature")

In [None]:
df_media_temp_sensor.printSchema()

In [None]:
# Renaming columns to simplify analysis
df_media_temp_sensor = df_media_temp_sensor.select(col("sensor").alias("sensor"), 
                                                   col("avg(temperature)").alias("temp_mean"))

In [None]:
df_media_temp_sensor.printSchema()

In [None]:
# Initialize query to streaming with console format
query = df_media_temp_sensor.writeStream.outputMode("complete").format("console").start()

In [None]:
# Execute streaming query and avoid process to be closed
query.awaitTermination()

In [None]:
query.status

In [None]:
query.lastProgress

In [None]:
query.explain()

In [None]:
# Object to initiate query to streaming with memory format (creates temporary table)
query_memoria = df_media_temp_sensor \
    .writeStream \
    .queryName("project") \
    .outputMode("complete") \
    .format("memory") \
    .start()

In [None]:
# Activated streams
spark.streams.active

In [None]:
# Keeps query executing for some time and applies SQL to real time data
from time import sleep

for x in range(10):
    
    spark.sql("select sensor, round(temp_mean, 2) as mean from project where temp_mean > 65").show()
    sleep(3)
    
query_memoria.stop()

# End
