# Consumer notebook

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, IntegerType, FloatType
import numpy as np 

spark = SparkSession\
    .builder\
    .master("local")\
    .getOrCreate()

sc = spark.sparkContext

Precise the schema for the DataFrame (**structured data streaming**)

In [2]:
schema = StructType([
    StructField("tgap", IntegerType(), True),
    StructField("Count", FloatType(), True),
    StructField("Speed", FloatType(), True),
    StructField("DeviceName", StringType(), True),
])

The following piece of code computes step by step the top 5 correlated sensors in DataFrame `df`, using the same methods and lines of code as **Task 1**.
- Compute the moving average & std for each timegap
- Compute the possible pairs & their covariance for each timegap
- Compute the pearson correlation coefficient of each pair for each timegap

In [3]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col, rank
from pyspark.sql import Window

def compute_analytics(df, epoch_id) :

    # Compute the average
    windowval = (Window.orderBy('tgap').partitionBy("DeviceName")
             .rangeBetween(Window.unboundedPreceding, 0))

    df = df.withColumn('cum_sum', F.sum('Count').over(windowval))
    df_analytics = df.withColumn('avg', df["cum_sum"]/(df["tgap"]))


    # Compute the std
    current_row_window = Window.partitionBy("deviceName").orderBy('tgap').rowsBetween(0, 0)

    avg_current_row = F.first(F.col("avg")).over(current_row_window)

    df_analytics = df_analytics.withColumn("stddev",
        F.sqrt(F.sum(
        F.pow(df_analytics["Count"]-avg_current_row, 2)
    ).over(window=windowval)))

    # Compute the pairs
    pairs_df = df_analytics.alias("sensor1").join(df_analytics.alias("sensor2"),\
                                   col("sensor1.tgap") == col("sensor2.tgap"))\
                                   .select(col("sensor1.tgap"),\
                                           col("sensor1.DeviceName").alias("s1_Name"),\
                                           col("sensor1.Count").alias("s1_Count"),\
                                           col("sensor1.avg").alias("s1_avg"),\
                                           col("sensor1.stddev").alias("s1_std"),\
                                           col("sensor2.DeviceName").alias("s2_Name"),\
                                           col("sensor2.Count").alias("s2_Count"),\
                                           col("sensor2.avg").alias("s2_avg"),\
                                           col("sensor2.stddev").alias("s2_std")\
                                   )\
                                   .where(col("s1_Name") < col("s2_Name"))\
                                   .orderBy("tgap")
    
    # Compute the covariance of each pair at each time gap
    current_row_window = Window.partitionBy("s1_Name", "s2_Name").orderBy('tgap').rowsBetween(0, 0)

    s1_avg_current_row = F.first(F.col("s1_avg")).over(current_row_window)
    s2_avg_current_row = F.first(F.col("s2_avg")).over(current_row_window)

    windowval = Window.partitionBy("s1_Name", "s2_Name").orderBy('tgap').rangeBetween(Window.unboundedPreceding, 0)

    pairs_covariance = pairs_df.withColumn("covariance",
                                        F.sum(
                                        (F.col("s1_Count") - s1_avg_current_row) * (F.col("s2_Count") - s2_avg_current_row)
                                       ).over(windowval))
    pairs_correl = pairs_covariance.withColumn("pearsonCoeff",
                                           col("covariance")/(col("s1_std")*col("s2_std"))
                                           )
    
    # Compute the top 5 pairs 
    window = Window.partitionBy("tgap").orderBy(col("pearsonCoeff").desc())

    # Add a new column with the rank of each row within the window
    ranked_df = pairs_correl.withColumn("rank", rank().over(window))

    # Filter for the top-5 pairs of sensors for each time gap
    top_5_pairs = ranked_df.filter(col("rank") <= 5).orderBy(F.desc("tgap"), F.desc("pearsonCoeff")) \
        .select("tgap", "s1_Name", "s2_Name", "pearsonCoeff")
    
    top_5_pairs.show()
                                           

In the following block of code :
- `columns_dict` is a dictionary to map the index of the column to the name that we want to give to the column
  - It is important to match with the name of the columns in the schema above
- `df` is a **Streaming DataFrame** that reads what is sent on port `9999` of `localhost`
- Initially, `df` contains only one column named `value`. As the data that is sent is structured and separed by `','`, we split the `value` columns by this character and we name each column as said above.
-  The last line casts the DataFrame to the above schema

Write the analytics each 20 seconds.
- Those analytics print the top 20 correlated pairs at each timestamp
- As a result, the top 5 correlated pairs are printed each 5 seconds.

In [4]:
columns_dict = {0: "tgap", 1: "Count", 2: "Speed", 3: "DeviceName"}
# reset the df object
df = None
df = spark.readStream\
.format("socket")\
.option("host", "localhost")\
.option("port", "9997")\
.load()


split_cols = F.split(df["value"], ",").alias("split_cols")
df = df.select("*", *[split_cols[i].alias(columns_dict[i]) for i in range(4)])
df = df.drop("value")

df = df.select([F.col(col).cast(schema[col].dataType) for col in schema.fieldNames()])


delta = 20 # seconds
query = df.writeStream \
    .foreachBatch(compute_analytics)\
    .trigger(processingTime=f"{delta} seconds")\
    .start()

query.awaitTermination()

+----+-------+-------+------------+
|tgap|s1_Name|s2_Name|pearsonCoeff|
+----+-------+-------+------------+
+----+-------+-------+------------+

+----+-------+-------+------------------+
|tgap|s1_Name|s2_Name|      pearsonCoeff|
+----+-------+-------+------------------+
| 479|CB02411| COM205| 0.958078823171938|
| 479|CB02411| CEK049|0.9421520774217793|
| 479|CB02411|  CJM90| 0.936941010568009|
| 479| CB2105| COM205| 0.922960860059877|
| 479| CEK049|  CJM90|0.9195009569801788|
| 478|CB02411| COM205|0.9580772442822112|
| 478|CB02411| CEK049|0.9421499259999186|
| 478|CB02411|  CJM90|0.9373907221525857|
| 478| CB2105| COM205|0.9255241663746389|
| 478| CEK049|  CJM90|0.9199400732754083|
| 477|CB02411| COM205|0.9580756586596386|
| 477|CB02411| CEK049|0.9421477654057824|
| 477|CB02411|  CJM90|0.9391747269513185|
| 477| CB2105| COM205|0.9255216360302251|
| 477| CEK049|  CJM90|0.9216870248580247|
| 476|CB02411| COM205|0.9580740662610626|
| 476|CB02411| CEK049|0.9421455955805882|
| 476|CB02411| 

KeyboardInterrupt: 