# ST 590 (Final Project) Sneha Rani

In this project, we will be working with a dataset downloaded from UCI machine learning repository and then later modified for this project purposes. The data is based on relating power consumption from different zones of Tetouan city to various factors like time of day, temperature, and humidity. Here is an overview of the dataset:

| Column name | Description | Data Type |
| --- | --- | --- |
| Temperature | Weather Temperature of Tetouan city | (Double) |
| Humidity | Weather Humidity of Tetouan city | (Double) |
| Wind_Speed | Wind Speed of Tetouan city | (Double) |
| General_Diffuse_Flows | General diffuse flows | (Double) |
| Diffuse_Flows | Diffuse flows | (Double) |
| Power_Zone_1 | power consumption of zone 1 of Tetouan city | (Double) |
| Power_Zone_2 | power consumption of zone 2 of Tetouan city | (Double) |
| Power_Zone_3 | power consumption of zone 3 of Tetouan city | (Double) |
| Month | Months in a year | (Integer) |
| Hour | Hours in a day | (Float) |

This project is divided into two parts. 
- In this part, we are dealing with batch/static data. We will first download the data and read as a spark SQL dataframe. After that, we will summarize the data using various numerical summary methods including correlation, means, standard deviation etc. This will give us some basic information about our data. Then we will create a pipeline for a series of transformations. Then we will fit Elastic Net model with some regularization parameters, the best parameters will be chosen using cross validation. Finally, we will estimate the training RMSE on this data.
- Second part of the project is based on streaming data. The streaming data will be randomly sampled from three rows of the dataset iteratively. This streaming data will then be used for model transformation and making predictions.  

## Part-1: Model Fitting

We will begin with reading the dataset, importing required libraries, and finally starting a spark session to work on.

In [1]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sn

# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

data = pd.read_csv("power_ml_data.csv") #read csv file
data=data.astype({'Hour':'float'}) #change integer to float type
data.head()

Unnamed: 0,Temperature,Humidity,Wind_Speed,General_Diffuse_Flows,Diffuse_Flows,Power_Zone_1,Power_Zone_2,Power_Zone_3,Month,Hour
0,6.559,73.8,0.083,0.051,0.119,34055.6962,16128.87538,20240.96386,1,0.0
1,6.414,74.5,0.083,0.07,0.085,29814.68354,19375.07599,20131.08434,1,0.0
2,6.313,74.5,0.08,0.062,0.1,29128.10127,19006.68693,19668.43373,1,0.0
3,6.121,75.0,0.083,0.091,0.096,28228.86076,18361.09422,18899.27711,1,0.0
4,5.921,75.7,0.081,0.048,0.085,27335.6962,17872.34043,18442.40964,1,0.0


Now that we have the data downloaded, we want to first define the datatypes that are compatible with spark so we will define a schema for this dataset with appropriate datatypes.

In [2]:
# Import required Spark libraries
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, DoubleType, FloatType

# Setup the expected schema
expected_schema = StructType([
    StructField("Temperature", DoubleType(), True),
    StructField("Humidity", DoubleType(), True),
    StructField("Wind_Speed", DoubleType(), True),
    StructField("General_Diffuse_Flows", DoubleType(), True),
    StructField("Diffuse_Flows", DoubleType(), True),
    StructField("Power_Zone_1", DoubleType(), True),
    StructField("Power_Zone_2", DoubleType(), True),
    StructField("Power_Zone_3", DoubleType(), True),
    StructField("Month", IntegerType(), True),
    StructField("Hour", DoubleType(), True),
])

Now that we have our schema ready, we can create a spark SQL dataframe with this schema.

In [3]:
# Convert pandas dataframe to Spark SQL dataframe 
spark_data = spark.createDataFrame(data, schema=expected_schema)
spark_data.show(5)

  for column, series in pdf.iteritems():


+-----------+--------+----------+---------------------+-------------+------------+------------+------------+-----+----+
|Temperature|Humidity|Wind_Speed|General_Diffuse_Flows|Diffuse_Flows|Power_Zone_1|Power_Zone_2|Power_Zone_3|Month|Hour|
+-----------+--------+----------+---------------------+-------------+------------+------------+------------+-----+----+
|      6.559|    73.8|     0.083|                0.051|        0.119|  34055.6962| 16128.87538| 20240.96386|    1| 0.0|
|      6.414|    74.5|     0.083|                 0.07|        0.085| 29814.68354| 19375.07599| 20131.08434|    1| 0.0|
|      6.313|    74.5|      0.08|                0.062|          0.1| 29128.10127| 19006.68693| 19668.43373|    1| 0.0|
|      6.121|    75.0|     0.083|                0.091|        0.096| 28228.86076| 18361.09422| 18899.27711|    1| 0.0|
|      5.921|    75.7|     0.081|                0.048|        0.085|  27335.6962| 17872.34043| 18442.40964|    1| 0.0|
+-----------+--------+----------+-------

Our spark SQL dataframe is ready! <br>
<br>
Now let's look at the summary of this dataset, to get an overview of the dataset.

### Data Summary

Here is the summary of the dataset. 
- Rows of the summary table contain `count`, `mean`, `standard deviations`, `minimum value`, and `maximum value`.
- Columns of the summary table contain all the variables of the dataset
<br>
<br>
(You might notice that the summary is created in two parts. This is done only for the readability purposes and to make the results look clean.)

In [4]:
spark_data.select("Temperature", "Humidity", "Wind_Speed","General_Diffuse_Flows","Diffuse_Flows",\
                 "Power_Zone_1").describe().show()

+-------+------------------+------------------+------------------+---------------------+------------------+------------------+
|summary|       Temperature|          Humidity|        Wind_Speed|General_Diffuse_Flows|     Diffuse_Flows|      Power_Zone_1|
+-------+------------------+------------------+------------------+---------------------+------------------+------------------+
|  count|             47174|             47174|             47174|                47174|             47174|             47174|
|   mean|18.813219803281438| 68.28839827023359|1.9616214016194993|   182.53118043837654| 74.98721140882624|32335.168690495833|
| stddev| 5.813341359553941|15.560330479134192|2.3493511404671956|   264.43185588803766|124.25614632084834| 7130.013305333635|
|    min|             3.247|             11.34|              0.05|                0.004|             0.011|        13895.6962|
|    max|             40.01|              94.8|             6.483|               1163.0|             936.0|    

In [5]:
spark_data.select("Power_Zone_2", "Power_Zone_3","Month","Hour").describe().show()

+-------+------------------+------------------+-----------------+------------------+
|summary|      Power_Zone_2|      Power_Zone_3|            Month|              Hour|
+-------+------------------+------------------+-----------------+------------------+
|  count|             47174|             47174|            47174|             47174|
|   mean| 21027.20497598019|17831.197607816728|6.510599058803578|11.488383431551279|
| stddev|5199.7871528123005|6622.5904698693585|3.437367004368705| 6.921920450693015|
|    min|       8560.081466|        5935.17407|                1|               0.0|
|    max|       37408.86076|       47598.32636|               12|              23.0|
+-------+------------------+------------------+-----------------+------------------+



### Correlation Matrix

Now let us look at the correlation between all the numeric variables. For this purpose, we will create a vector for all the variables using `VectorAssembler()`. This vectorized column will then be used to compute correlation. Since, the output is a dense matrix, we will then convert this dense matrix into a dataframe.

In [6]:
from pyspark.ml.stat import Correlation
from pyspark.ml.linalg import DenseMatrix, Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import *

# Create vector for all the columns using VectorAssembler
assembler = VectorAssembler(inputCols=spark_data.columns, outputCol="features",handleInvalid='keep')
df = assembler.transform(spark_data).select("features")

# correlation will be in Dense Matrix
correlation = Correlation.corr(df,"features","pearson").collect()[0][0]

# To convert Dense Matrix into DataFrame
rows = correlation.toArray().tolist()
df = spark.createDataFrame(rows,spark_data.columns)
df.show()

+-------------------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|        Temperature|            Humidity|          Wind_Speed|General_Diffuse_Flows|       Diffuse_Flows|        Power_Zone_1|        Power_Zone_2|        Power_Zone_3|               Month|                Hour|
+-------------------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                1.0| -0.4601429412885585|  0.4764210955803574|   0.4596021000162658| 0.19562514523387217|  0.4414462233573754|  0.3843011135514114| 0.49075247369111336|  0.2848137789728194|  0.1991337035789466|
|-0.4601429412885585|                 1.0|-0.13612147371171135|  -0.4672824842880118|-0.25804180556209755|-0.28909012471237544| -0.2970193920319654|-0.2

Now we have the correlation matrix. Rows of the correlation matrix are listed same as the columns of the correlation matrix.<br><br>
We can see that `Power_Zone_1`, `Power_Zone_2` and `Power_Zone_3` have relatively stronger correlation. `Wind_Speed` and `Diffuse_Flows` have relatively strong negative correlation. Power zones 1 & 2  have relatively stronger correlation with `Hour`. Apart from these, rest of the predictors do not seem to be strongly correlated.


### One-way contigency tables for `Month` & `Hour`

Now, let us look at the contigency tables for `Month` and `Hour` both individually (one-way contingency table) and together(two-way contingency table).

In [7]:
# create dummy column with all zeros
spark_data1=spark_data.withColumn("dummy", lit(0))

# create one-way contigency table for the 12 months 
spark_data1.crosstab("dummy","Month").show()

+-----------+----+----+----+----+----+----+----+----+----+----+----+----+
|dummy_Month|   1|  10|  11|  12|   2|   3|   4|   5|   6|   7|   8|   9|
+-----------+----+----+----+----+----+----+----+----+----+----+----+----+
|          0|4014|4026|3877|3868|3588|4057|3893|3997|3913|4029|3999|3913|
+-----------+----+----+----+----+----+----+----+----+----+----+----+----+



One-way contigency table above for `Month` shows the number of observations spread across 12 months.

In [8]:
# create one-way contigency table for 24 hours
spark_data1.crosstab("dummy","Hour").show()

+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|dummy_Hour| 0.0| 1.0|10.0|11.0|12.0|13.0|14.0|15.0|16.0|17.0|18.0|19.0| 2.0|20.0|21.0|22.0|23.0| 3.0| 4.0| 5.0| 6.0| 7.0| 8.0| 9.0|
+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|         0|1950|1973|1955|1972|1979|1956|1971|1947|1950|1979|1955|1950|1973|1945|1976|1966|1968|1966|1986|1968|1992|1964|1957|1976|
+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+



One-way contigency table above for `Hour` shows the number of observations spread across 24 hours.

### Two-way contigency tables for `Month` & `Hour`

In [9]:
spark_data.crosstab("Month","Hour").show()

+----------+---+---+----+----+----+----+----+----+----+----+----+----+---+----+----+----+----+---+---+---+---+---+---+---+
|Month_Hour|0.0|1.0|10.0|11.0|12.0|13.0|14.0|15.0|16.0|17.0|18.0|19.0|2.0|20.0|21.0|22.0|23.0|3.0|4.0|5.0|6.0|7.0|8.0|9.0|
+----------+---+---+----+----+----+----+----+----+----+----+----+----+---+----+----+----+----+---+---+---+---+---+---+---+
|         5|164|166| 172| 159| 175| 166| 159| 172| 167| 171| 162| 160|163| 165| 174| 161| 162|171|174|164|172|163|167|168|
|        10|171|168| 163| 171| 166| 161| 175| 165| 159| 176| 165| 162|175| 170| 173| 169| 167|172|162|163|170|164|168|171|
|         1|161|165| 164| 174| 170| 165| 167| 163| 169| 170| 168| 162|168| 172| 166| 166| 165|162|167|168|168|168|172|174|
|         6|160|164| 159| 169| 155| 167| 162| 158| 161| 161| 165| 162|169| 159| 156| 167| 166|165|167|167|163|167|160|164|
|         9|164|161| 159| 161| 174| 159| 162| 168| 166| 169| 162| 163|160| 158| 163| 163| 167|164|167|158|162|159|162|162|
|         2|142|

Two-way contigency table above shows the number of observations spread across month and hours. 

### Means of all numeric variables grouped by `Month`

Now, we will look at the means of all the numeric variables across twelve months.

(You will notice that the summary is created in two parts. This is done only for the readability purposes and to make the results look clean.)

In [10]:
spark_data.groupBy("Month").avg("Temperature", "Humidity", "Wind_Speed","General_Diffuse_Flows","Diffuse_Flows").show()

+-----+------------------+-----------------+-------------------+--------------------------+------------------+
|Month|  avg(Temperature)|    avg(Humidity)|    avg(Wind_Speed)|avg(General_Diffuse_Flows)|avg(Diffuse_Flows)|
+-----+------------------+-----------------+-------------------+--------------------------+------------------+
|    1|12.734699053313424|68.25854758345791| 0.7022234678624802|        103.95965819631209| 69.79882635774754|
|    2|12.656535117056842|66.49092530657751| 1.1139765886287718|        125.47113545150515|  92.3306151059088|
|    3|14.584054720236646|71.11588365787539| 1.0060172541286598|        181.40171949716375| 93.15590460931698|
|    5|20.301401050788073|68.60932199149357|   2.30747285464099|        274.50002601951394|122.76557593194893|
|    4|16.414754687901336|75.40817621371694|  0.222989725147702|         157.7222427433853| 83.49453686103197|
|    6| 22.13270636340399|68.76125990288782| 1.5613462816253483|         277.4345330948132|  103.227789164323|
|

In [11]:
spark_data.groupBy("Month").avg("Power_Zone_1","Power_Zone_2", "Power_Zone_3", "Hour").show()

+-----+------------------+------------------+------------------+------------------+
|Month| avg(Power_Zone_1)| avg(Power_Zone_2)| avg(Power_Zone_3)|         avg(Hour)|
+-----+------------------+------------------+------------------+------------------+
|    1| 31052.98442787007|19407.916365649475| 17736.35168477324| 11.51270553064275|
|    2|30973.863159715755|18774.586005972385|17309.707870419465|11.497491638795987|
|    3|31162.869031165836|18459.612112786035|16945.462800258043|11.479171801824007|
|    5| 32379.46046425318|19973.085386786082|17604.282564152116|11.466599949962472|
|    4|31143.206765884937|17600.306571434827|18574.918338348314| 11.48214744413049|
|    6|34573.227025573666|20649.034589747287| 20416.13009093538|11.456682852031689|
|    7| 35805.53043592462| 24130.02818170255|   28175.034099032| 11.49565649044428|
|    8| 36436.26165143788|24657.024551832943|24684.368960552885|11.506626656664166|
|    9|33415.103455862554|20189.459836739075|14928.415530158953|  11.5323281

### Standard Deviation of all numeric variables grouped by `Month`

Now, we will look at the standard deviation of all the numeric variables across twelve months.

(You will notice that the summary is created in two parts. This is done only for the readability purposes and to make the results look clean.)

In [12]:
spark_data.groupBy("Month").agg({"Temperature":"stddev", "Humidity": "stddev", 
                                 "Wind_Speed": "stddev","General_Diffuse_Flows": "stddev",
                                 "Diffuse_Flows":"stddev"}).show()               

+-----+------------------+-----------------------------+------------------+---------------------+-------------------+
|Month|stddev(Wind_Speed)|stddev(General_Diffuse_Flows)|  stddev(Humidity)|stddev(Diffuse_Flows)|stddev(Temperature)|
+-----+------------------+-----------------------------+------------------+---------------------+-------------------+
|    1|1.6117952014385526|           166.16470976772894| 12.15616997395505|     131.459171585262| 3.2406352202253177|
|    2|1.9811568615798605|           206.73018017500877|12.411941927246787|   169.15551710161583|  2.619715133289512|
|    3|1.9009817904056068|           260.14888917166206|13.918146099336553|   151.16792344997845| 3.7588517619095954|
|    5|2.4083281081182726|           331.99889739474145|16.436022749116013|   171.58594340324657| 3.2999517830264136|
|    4|0.8203433573472342|           246.17350131285014|14.312654855624078|   123.91235156483745|  2.806220593833569|
|    6|2.2354105856986264|             328.277209820488|

In [13]:
spark_data.groupBy("Month").agg({"Power_Zone_1":"stddev", "Power_Zone_2": "stddev", 
                                 "Power_Zone_3": "stddev","Hour": "stddev"}).show()

+-----+--------------------+--------------------+--------------------+------------------+
|Month|stddev(Power_Zone_3)|stddev(Power_Zone_1)|stddev(Power_Zone_2)|      stddev(Hour)|
+-----+--------------------+--------------------+--------------------+------------------+
|    1|   4436.997404934671|   7402.323410683091|   4515.295696366688|6.8933864667013784|
|    2|  4353.9759463094815|   6874.584790780019|   4390.391100519847| 6.900280948950495|
|    3|    4256.76554626955|   6782.136538982632|   4185.117594550106| 6.934685602294895|
|    5|   4353.394233957315|   6809.332811001886|  4182.5436720014095|6.9047355594893345|
|    4|   4556.263192196545|   6496.700166744174|  3835.6293841116035|  6.91394782479513|
|    6|   5596.702925763702|   7317.808097492525|   4465.664309494708| 6.933494099313091|
|    7|    6913.95836073139|   6966.074191037386|    4968.51110111139| 6.927292532034068|
|    8|   6520.955954895705|   7054.722275396859|   5163.442764556622| 6.950722974866449|
|    9|  3

We have looked at the summary of the dataset.

Now we will move to performing some transformations in Spark and use `MLlib` functions to make predictions. Transformations are needed before we input the data into a model. 

Here, we are fitting Elastic Net Model. In the fitting of linear or logistic regression models, the elastic net is a regularized regression method that linearly combines the L1 and L2 penalties of the lasso and ridge methods. We will tune our regularization parameters by using 5-fold cross validation.

First, let us start with importing required libraries.

In [14]:
# Import required libraries
from pyspark.ml.feature import SQLTransformer, PCA, Binarizer, VectorAssembler, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

Now that we have imported the required libraries, we need to perform some transformations on the dataset. These transformations will then be added to the pipeline. 

Now, we will define a series of transformations before we fit them into a pipeline. The stages of the transformations are explained as follows:

1) `SQLTransformer`: First, we will use `SQLTransformer` to obtain the variables that we will use in the analysis. We will use `Temperature`, `Humidity`, `Wind_Speed`, `General_Diffuse_Flows`, `Diffuse_Flows`, `Power_Zone_1`, `Power_Zone_2`, `Month`, and `Hour` as predictors and `Power_zone_3` as the response variable. In order to fit data using `MLlib`, we need to define our predictors and response variables as `features` and `label` respectively. Here, we will define `Power_zone_3` as `label` to meet this requirement.

2) `Binarizer`: Since `Hour` variable denotes time and its value doesn't indicate the magnitude, we need to transform this variable using `Binarizer`. We will set a threshold of 6.5 to convert this variable.

3) `OneHotEncoder`: `Month` variable denotes time-period and its value doesn't indicate the magnitude. We will use `Month` variable to transform it into one-hot encoded vector. The one-hot encoding represents each category as a binary vector with a single "1" in the position corresponding to the category index and "0" elsewhere. 

4) `VectorAssembler`: This stage is a preparatory stage for the next stage (PCA transformation) where the variables are required in the vector form. Here, the input variables will be `Temperature`, `Humidity`, `Wind_Speed`, `General_Diffuse_Flows`, and `Diffuse_Flows`, which then will be converted into a vector.

5) `PCA`: This transformation method will train a model to project vectors to a lower dimensional space of the top k principal components.

6) `VectorAssembler`: This stage is defined again to create `features` vector as a requirement of `MLlib`. This vector will be created from binarized `Hour`, encoded `Month` and `PCA_features` obtained from `PCA`.

7) `LinearRegression`: Finally, we will create an instance for linear regression model that will be fit to the data. Also, we will create a parameter grid to define multiple values for regularizing parameters and elastic net parameters which will be tuned later and the best parameters will be used for predictions.

Lastly, we will create a pipeline that incorporates above transformation stages.

In [15]:
# SQLTransformer to get select variables from dataset
sqlTrans = SQLTransformer(statement="SELECT Temperature, Humidity, Wind_Speed, General_Diffuse_Flows, \
                            Diffuse_Flows, Power_Zone_1, Power_Zone_2, Month, Hour, Power_Zone_3 as label FROM __THIS__")

# Binarizer for hour column
binaryTrans = Binarizer(threshold = 6.5, inputCol = "Hour", outputCol = "Hour_binarized")

# OneHotEncoder to transform Month variable into a one-hot encoded vector
encoder = OneHotEncoder(inputCol="Month", outputCol="Month_encode")

# VectorAssembler for vector being created for PCA transformation
assembler1 = VectorAssembler(inputCols=["Temperature", "Humidity", "Wind_Speed", "General_Diffuse_Flows", "Diffuse_Flows"], outputCol = "Features1")

# PCA transformer on Features1
pca = PCA(k=2, inputCol="Features1", outputCol="PCA_features")

# VectorAssembler to get MLlib requisite 'features' column
assembler2 = VectorAssembler(inputCols=["PCA_features","Hour_binarized","Power_Zone_1","Power_Zone_2","Month_encode"], outputCol="features")

# Create Linear Regression model instance
lr = LinearRegression(featuresCol="features", labelCol="label")

# Define paramter grid for elastic net 
lr_paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.98, 0.99, 1])\
                                .addGrid(lr.elasticNetParam, [0, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.98, 0.99, 1]).build() 

Here is the pipeline with above-mentioned transformation stages.

In [16]:
# Create the pipeline with the stages
lr_pipeline = Pipeline(stages=[sqlTrans, binaryTrans, encoder, assembler1, pca, assembler2, lr ])

We have the pipeline ready!

Now we will use this linear regression pipeline within the CV calculation. For this purpose, we will first define evaluator for cross-validation using RegressionEvaluator with metric rmse. This cross-validator will then be used to fit on the training dataset and the predictions will be made using the same dataset.

In [17]:
# RegressionEvaluator for cross-validation
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")

# Create the CrossValidator with 5 folds
crossval = CrossValidator(estimator=lr_pipeline,
                                estimatorParamMaps=lr_paramGrid,
                                evaluator=evaluator,
                                numFolds=5)

# Fit the CrossValidator model on the train data & make predictions on test data
cvModel = crossval.fit(spark_data)
predictions = cvModel.transform(spark_data)

# Calculate the training RMSE
rmse = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse").evaluate(predictions)
print(f"Training RMSE for our Elastic Net model: {rmse}")

Training RMSE for our Elastic Net model: 2147.097345981367


We have the training RMSE value for the model. Since we do not have any other models to compare, it is difficult to determine how our model performed in terms of accuracy. However, goals of this project are to be able to use the streaming data to make predictions, therefore, we are not evaluating model performance for this once. 

Now let us create a `residual` column that is calculated by subtracting `prediction` column from `label` (Power Zone 3 values) column.

In [18]:
# Create a residual column
predictions.withColumn("residual", col("label") - col("prediction")).show(1)

+-----------+--------+----------+---------------------+-------------+------------+------------+-----+----+-----------+--------------+--------------+--------------------+--------------------+--------------------+-----------------+------------------+
|Temperature|Humidity|Wind_Speed|General_Diffuse_Flows|Diffuse_Flows|Power_Zone_1|Power_Zone_2|Month|Hour|      label|Hour_binarized|  Month_encode|           Features1|        PCA_features|            features|       prediction|          residual|
+-----------+--------+----------+---------------------+-------------+------------+------------+-----+----+-----------+--------------+--------------+--------------------+--------------------+--------------------+-----------------+------------------+
|      6.559|    73.8|     0.083|                0.051|        0.119|  34055.6962| 16128.87538|    1| 0.0|20240.96386|           0.0|(12,[1],[1.0])|[6.559,73.8,0.083...|[1.79440486365695...|(17,[0,1,3,4,6],[...|20880.29948217905|-639.3356221790491|
+---

This concludes the first part of the our project where we made predictions on the batch data. Now we will utilize these functions on streaming data that will be generated randomly from the existing data and will be fed to the functions iteratively.

Let's move on to the model fitting on streaming data!

## Part-2: Streaming Part

In this part of the project, we will use streaming queries to run the functions that we used for the batch data. Let's first start with importing required libraries.

In [19]:
# Import required libraries
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, DoubleType, FloatType
from pyspark.sql.functions import explode, split, col

#### Reading a stream

Our first task is to initialize the Spark session (even though we started the Spark session in the first part, for the sake of individuality, we have started the session again--not necessary, however). Now, we will define the schema same as did in the first part. Finally, let's create a stream object that reads in the streamed data in the form of .csv files into a folder named `csv_files_folder`.

In [20]:
#Reading a stream

# Initialize SparkSession
spark = SparkSession.builder.appName("Power Consumption Streaming").getOrCreate()

# Setup my schema (same as model fitting part)
my_schema = StructType([
    StructField("Temperature", DoubleType(), True),
    StructField("Humidity", DoubleType(), True),
    StructField("Wind_Speed", DoubleType(), True),
    StructField("General_Diffuse_Flows", DoubleType(), True),
    StructField("Diffuse_Flows", DoubleType(), True),
    StructField("Power_Zone_1", DoubleType(), True),
    StructField("Power_Zone_2", DoubleType(), True),
    StructField("Power_Zone_3", DoubleType(), True),
    StructField("Month", IntegerType(), True),
    StructField("Hour", DoubleType(), True),
])

# Setup the readStream to read in a stream into a folder
stream_df = spark.readStream.schema(my_schema).option("header","true").csv("csv_files_folder")

##### Transform/Aggregation

Now that we have set up our schema and readStream, we will perform our transformation/aggregation in two steps:

- First, we will first use the model transfomer(`cvModel`) from the first part on the data being read in a stream. In addition to this, we will create a `residual` column (`label` minus `prediction`). This step gives us our first dataframe.

- Second, we will create a second pipeline that includes all the stages(`SQLTransfromer`, `binarizer`, `OneHotEncoder`, `VectorAssembler`, & `PCA`) done above except for the linear regression transformation. The difference here is that we will return all of the columns from the original data and the transformations done (including the `label` and `features` columns). This step gives us our second dataframe. For this, we will fit the pipeline on the original data obtained from the UCI repository and then we will perform transformations using streaming data that will be produced separately.

Both of the resulting dataframes will then be joined by using `.join()` method. We will use inner join to selects records that have matching values based on `label` variable in both the tables.

Now let's do the first step to get our first dataframe.

In [21]:
# Transform/Aggregation Step-1

# Use the model transformer to obtain predictions from the incoming data
stream_predictions = cvModel.transform(stream_df)

# Create a residual column (label minus prediction)
stream_residuals = stream_predictions.withColumn("residual", col("label") - col("prediction"))

We have created our first dataframe `stream_residuals`!

Let's do the second step!

In [32]:
# Transform/Aggregation Step-2

# SQL Transformer to get all the vector and non-vector variables from dataset
sqlTrans2= SQLTransformer(statement="SELECT features, Month_encode, Hour_binarized, Temperature, Humidity, Wind_Speed, General_Diffuse_Flows, \
                            Diffuse_Flows, Power_Zone_1, Power_Zone_2, Month, Hour, label FROM __THIS__")

# Second pipeline excluding model fitting
pipeline2 = Pipeline(stages=[sqlTrans, binaryTrans, encoder, assembler1, pca, assembler2, sqlTrans2])

# Train the pipeline on the original data that is our batch data from part 1
pipeline2_model = pipeline2.fit(spark_data)

# Transform the streaming data with the second pipeline model
transformed_stream = pipeline2_model.transform(stream_df)

We have the second dataframe `transformed_stream`!

Let's join these two dataframes using `inner` join!

In [33]:
# Join the two streaming DataFrames on the 'label' column
joined_stream = stream_residuals.join(transformed_stream, on="label", how="inner")

Now we have our final dataframe from the streaming data.

We need to write this final stream into the console to start the query.

In [None]:
# Write the joined_stream DataFrame to the console
query = joined_stream.writeStream.outputMode("append").format("console").start()

# Start the query and wait for it to finish
query.awaitTermination()

The above query will be executed on the streaming data that is produced iteratively from the `power_streaming_data.csv`. The output from this query will be shown in the console.

In [None]:
# We need to stop the query once we are done executing
query.stop()

### Produce Data

**Below-mentioned code will be submitted in Python console in another window to simulate the idea of getting new data in.**

Here we will produce our streaming data by sampling three rows from the regular dataframe `power_streaming_data.csv` and output those rows in the .csv file form and store those files in the `csv_files_folder`. These files will then be used in model fitting and creating predictions. The predictions will be available on the console.

In [None]:
import pandas as pd
import os
import time
from pathlib import Path

# Read the streaming data into a pandas DataFrame
streaming_data = pd.read_csv("power_streaming_data.csv")

# Create the output directory if it doesn't exist
output_dir = "csv_files_folder"
Path(output_dir).mkdir(parents=True, exist_ok=True)

for i in range(50):
    # Randomly sample three rows from the DataFrame
    sample_data = streaming_data.sample(n=3)

    # Write the sampled rows to a .csv file in the cv_files_folder without indices
    sample_data.to_csv(os.path.join(output_dir, f"sample_{i}.csv"), index=False)

    # Pause for 10 seconds before the next iteration
    time.sleep(10)

This concludes our model fitting part on the streaming data!