# Final Project
Yi Ren

## Introduction

The data is modified from the [UCI machine learning repository](https://archive.ics.uci.edu/ml/datasets/Power+consumption+of+Tetouan+city). The study was about relating power consumption from different zones of Tetouan city to various factors like time of day, temperature, and humidity.

This project contains two parts: modeling part and streaming part:

For modeling part, I did some summarization and used pyspark’s MLlib module to set up a pipeline for Elastic Net fit including [PCA](https://statisticsbyjim.com/basics/principal-component-analysis/) transformer. Then I used cross validation to select the best model and evaluate model performace via RMSE metric.

For streaming part, I used model transformer to obtain predictions from the incoming data. Then I created second pipeline that would take all the predictor columns from original data with transforamtion. After combining two dataframes, I wrote those out to the console. Finally, I can monitor the prediction on incoming data by randomly sampling three rows and output those to a .csv file in the folder.

## Fitting model part
### Data Preparation

Read data into a standard pandas data frame

In [1]:
import pandas as pd
power_ml_data = pd.read_csv("power_ml_data.csv", sep = ",")

Convert from pandas to pandas-on-Spark

In [2]:
import pyspark.pandas as ps
ps_power_ml = ps.from_pandas(power_ml_data)

  fields = [
  for column, series in pdf.iteritems():


Convert to spark SQL dataframe

In [3]:
power_ml = ps_power_ml.to_spark()
power_ml.show(5)



+-----------+--------+----------+---------------------+-------------+------------+------------+------------+-----+----+
|Temperature|Humidity|Wind_Speed|General_Diffuse_Flows|Diffuse_Flows|Power_Zone_1|Power_Zone_2|Power_Zone_3|Month|Hour|
+-----------+--------+----------+---------------------+-------------+------------+------------+------------+-----+----+
|      6.559|    73.8|     0.083|                0.051|        0.119|  34055.6962| 16128.87538| 20240.96386|    1|   0|
|      6.414|    74.5|     0.083|                 0.07|        0.085| 29814.68354| 19375.07599| 20131.08434|    1|   0|
|      6.313|    74.5|      0.08|                0.062|          0.1| 29128.10127| 19006.68693| 19668.43373|    1|   0|
|      6.121|    75.0|     0.083|                0.091|        0.096| 28228.86076| 18361.09422| 18899.27711|    1|   0|
|      5.921|    75.7|     0.081|                0.048|        0.085|  27335.6962| 17872.34043| 18442.40964|    1|   0|
+-----------+--------+----------+-------

### Data Summarization
#### Find means, standard deviations, min, max, and median values

In [4]:
ps_power_ml.loc[:, ~ps_power_ml.columns.isin(["Month", "Hour"])] \
           .describe() \
           .loc[["mean", "std", "min", "max", "50%"]]

Unnamed: 0,Temperature,Humidity,Wind_Speed,General_Diffuse_Flows,Diffuse_Flows,Power_Zone_1,Power_Zone_2,Power_Zone_3
mean,18.81322,68.288398,1.961621,182.53118,74.987211,32335.16869,21027.204976,17831.197608
std,5.813341,15.56033,2.349351,264.431856,124.256146,7130.013305,5199.787153,6622.59047
min,3.247,11.34,0.05,0.004,0.011,13895.6962,8560.081466,5935.17407
50%,18.78,69.89,0.086,4.779,4.252,32259.82301,20802.9106,16405.28211
max,40.01,94.8,6.483,1163.0,936.0,52146.85905,37408.86076,47598.32636


- From the numeric summary, we can observe that except humidity, all the other variables are right skewed.   

- Comparing with temperature, humidity and wind speed, general diffuse flows and diffuse flows have the most spread in the distributions.   

- As for 3 power zones, zone 1 has the widest range with highest variance. While zone 2 has the narroest range and zone 3 has the lowest variance. 

#### Find the correlations between all variables except Month and Hour

In [5]:
ps_power_ml.loc[:, ~ps_power_ml.columns.isin(["Month", "Hour"])] \
           .corr()

  fields = [
  for column, series in pdf.iteritems():


Unnamed: 0,Temperature,Humidity,Wind_Speed,General_Diffuse_Flows,Diffuse_Flows,Power_Zone_1,Power_Zone_2,Power_Zone_3
Temperature,1.0,-0.460143,0.476421,0.459602,0.195625,0.441446,0.384301,0.490752
Humidity,-0.460143,1.0,-0.136121,-0.467282,-0.258042,-0.28909,-0.297019,-0.234228
Wind_Speed,0.476421,-0.136121,1.0,0.132304,-0.000727,0.166322,0.146338,0.279112
General_Diffuse_Flows,0.459602,-0.467282,0.132304,1.0,0.56453,0.189994,0.158798,0.064942
Diffuse_Flows,0.195625,-0.258042,-0.000727,0.56453,1.0,0.082885,0.047379,-0.036761
Power_Zone_1,0.441446,-0.28909,0.166322,0.189994,0.082885,1.0,0.834694,0.750656
Power_Zone_2,0.384301,-0.297019,0.146338,0.158798,0.047379,0.834694,1.0,0.572344
Power_Zone_3,0.490752,-0.234228,0.279112,0.064942,-0.036761,0.750656,0.572344,1.0


- From the correlation matrix, we observed the power zone 1 vs. zone 2 has very strong positive correlation (> 0.8). And zone 1 vs. zone 3 has strong positive correaltion (> 0.6). In the contrast, neither general diffuse flows nor diffuse flows is correalted to power zones (< 0.19).

- Highly correlated predictors can lead to collinearity issues and this can greatly increase the model variance, especially in the context of regression. In some cases, there could be relationships between multiple predictor variables and this is called multicollinearity.

- Two highly correlated variables can have nearly the same ability to predict the outcome value for an observation, which will affect the interpretability.  

#### One-way contingency table
##### of the Month variable

In [8]:
ps_power_ml["Month"].value_counts()

3     4057
7     4029
10    4026
1     4014
8     3999
5     3997
9     3913
6     3913
4     3893
11    3877
12    3868
2     3588
Name: Month, dtype: int64

From the table, March has the highest counts while Februray has the least counts among all 12 month.

##### of the Hour variable

In [9]:
ps_power_ml["Hour"].value_counts()

6     1992
4     1986
17    1979
12    1979
9     1976
21    1976
2     1973
1     1973
11    1972
14    1971
23    1968
5     1968
22    1966
3     1966
7     1964
8     1957
13    1956
18    1955
10    1955
19    1950
0     1950
16    1950
15    1947
20    1945
Name: Hour, dtype: int64

From the table, 6am has the highest counts while 8pm has the least counts among all 24 hours.

#### Two-way contingency table for the Month and Hour variables

In [10]:
power_ml.crosstab("Month", "Hour").show()

+----------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|Month_Hour|  0|  1| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19|  2| 20| 21| 22| 23|  3|  4|  5|  6|  7|  8|  9|
+----------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|         5|164|166|172|159|175|166|159|172|167|171|162|160|163|165|174|161|162|171|174|164|172|163|167|168|
|        10|171|168|163|171|166|161|175|165|159|176|165|162|175|170|173|169|167|172|162|163|170|164|168|171|
|         1|161|165|164|174|170|165|167|163|169|170|168|162|168|172|166|166|165|162|167|168|168|168|172|174|
|         6|160|164|159|169|155|167|162|158|161|161|165|162|169|159|156|167|166|165|167|167|163|167|160|164|
|         9|164|161|159|161|174|159|162|168|166|169|162|163|160|158|163|163|167|164|167|158|162|159|162|162|
|         2|142|151|145|141|155|154|153|146|153|152|144|148|147|149|151|151|145|148|156|147|157|153|147|153|
|        12|154|159

From the two-way contingency table, July at 6pm has the highest counts, which is 176. While Feburary at 11am has the lowest counts.

#### Group the data by Month and find the means of the numeric variables

In [6]:
ps_power_ml.loc[:, ps_power_ml.columns != "Hour"] \
           .groupby("Month") \
           .mean()

Unnamed: 0_level_0,Temperature,Humidity,Wind_Speed,General_Diffuse_Flows,Diffuse_Flows,Power_Zone_1,Power_Zone_2,Power_Zone_3
Month,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
1,12.734699,68.258548,0.702223,103.959658,69.798826,31052.984428,19407.916366,17736.351685
3,14.584055,71.115884,1.006017,181.401719,93.155905,31162.869031,18459.612113,16945.4628
2,12.656535,66.490925,1.113977,125.471135,92.330615,30973.86316,18774.586006,17309.70787
7,27.200593,57.599484,4.641782,294.112037,75.410538,35805.530436,24130.028182,28175.034099
6,22.132706,68.76126,1.561346,277.434533,103.227789,34573.227026,20649.03459,20416.130091
5,20.301401,68.609322,2.307473,274.500026,122.765576,32379.460464,19973.085387,17604.282564
4,16.414755,75.408176,0.22299,157.722243,83.494537,31143.206766,17600.306571,18574.918338
9,22.640565,66.868306,2.947096,202.201634,49.070622,33415.103456,20189.459837,14928.41553
10,20.476249,71.524016,2.784221,115.814556,46.628719,32806.992796,21457.890001,13266.437337
8,25.740415,66.022621,4.533251,227.178635,67.105847,36436.261651,24657.024552,24684.368961


- July has the greatest average value for temperature and the greatest general diffuse flows.

- April has the greatest average value for humidity and least average value for wind speed.

- May has the least average value for diffuse flows. While December has the least diffuse flows.

- May has the greatest average value for wind speed.   

- Auguest has the greatest average value for power consumption for zone 1 and zone 2.  

- July has the greatest average value for power consumption for zone 3.  

#### Group the data by Month and find the standard deviations of the numeric variables

In [7]:
ps_power_ml.loc[:, ps_power_ml.columns != "Hour"] \
           .groupby("Month").std()

Unnamed: 0_level_0,Temperature,Humidity,Wind_Speed,General_Diffuse_Flows,Diffuse_Flows,Power_Zone_1,Power_Zone_2,Power_Zone_3
Month,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
1,3.240635,12.15617,1.611795,166.16471,131.459172,7402.323411,4515.295696,4436.997405
3,3.758852,13.918146,1.900982,260.148889,151.167923,6782.136539,4185.117595,4256.765546
2,2.619715,12.411942,1.981157,206.73018,169.155517,6874.584791,4390.391101,4353.975946
7,3.856707,18.849986,1.110539,331.733608,95.044517,6966.074191,4968.511101,6913.958361
6,2.689724,14.972905,2.235411,328.27721,143.497896,7317.808097,4465.664309,5596.702926
5,3.299952,16.436023,2.408328,331.998897,171.585943,6809.332811,4182.543672,4353.394234
4,2.806221,14.312655,0.820343,246.173501,123.912352,6496.700167,3835.629384,4556.263192
9,2.877585,15.992826,2.293412,270.172564,67.523539,6471.3669,4205.504219,3425.85632
10,2.987293,13.980791,2.398703,185.043298,69.421308,6479.133482,4612.671175,3087.805818
8,2.949887,18.482551,1.3009,289.906192,90.66254,7054.722275,5163.442765,6520.955955


- July has the greatest standard deviation for temperature, humidity, general diffuse flows and of course diffuse flows.  
- May has the greatest standard deviaion for wind speed.   
- Janurary has the greatest standard deviation for power consumption for zone 1.  
- December has the greatest standard deviation for power consumption for zone 2.   
- July has the greatest standard deviation for power consumption for zone 3.  

#### Cast Hour as a DoubleType

In [10]:
power_ml = power_ml.withColumn("Hour",power_ml.Hour.cast('double'))
power_ml.printSchema()

root
 |-- Temperature: double (nullable = false)
 |-- Humidity: double (nullable = false)
 |-- Wind_Speed: double (nullable = false)
 |-- General_Diffuse_Flows: double (nullable = false)
 |-- Diffuse_Flows: double (nullable = false)
 |-- Power_Zone_1: double (nullable = false)
 |-- Power_Zone_2: double (nullable = false)
 |-- Power_Zone_3: double (nullable = false)
 |-- Month: long (nullable = false)
 |-- Hour: double (nullable = false)



### MLlib functions to set up pipeline for Elastic Net fit 

One-hot encode the Month column

In [8]:
from pyspark.ml.feature import OneHotEncoder, SQLTransformer, Binarizer, VectorAssembler
onehot = OneHotEncoder(inputCol = "Month", outputCol = "Month_ind")
onehotTrans = onehot.fit(power_ml)
onehotTrans.transform(power_ml)

DataFrame[Temperature: double, Humidity: double, Wind_Speed: double, General_Diffuse_Flows: double, Diffuse_Flows: double, Power_Zone_1: double, Power_Zone_2: double, Power_Zone_3: double, Month: bigint, Hour: bigint, Month_ind: vector]

Binarize the Hour column by 6.5

In [11]:
binaryTrans = Binarizer(threshold = 6.5, inputCol = "Hour", outputCol = "Hour_ind")
binaryTrans.transform(
    onehotTrans.transform(power_ml))

DataFrame[Temperature: double, Humidity: double, Wind_Speed: double, General_Diffuse_Flows: double, Diffuse_Flows: double, Power_Zone_1: double, Power_Zone_2: double, Power_Zone_3: double, Month: bigint, Hour: double, Month_ind: vector, Hour_ind: double]

Put the predictors into features

In [12]:
assembler = VectorAssembler(inputCols = ["Temperature", "Humidity", "Wind_Speed", "General_Diffuse_Flows", "Diffuse_Flows"], outputCol = "features", handleInvalid = "keep")
assembler.transform(
    binaryTrans.transform(
        onehotTrans.transform(power_ml)
    )
)

DataFrame[Temperature: double, Humidity: double, Wind_Speed: double, General_Diffuse_Flows: double, Diffuse_Flows: double, Power_Zone_1: double, Power_Zone_2: double, Power_Zone_3: double, Month: bigint, Hour: double, Month_ind: vector, Hour_ind: double, features: vector]

PCA fit on the Temperature, Humidity, Wind_Speed, General_Diffuse_Flows and Diffuse_Flows columns

In [13]:
from pyspark.ml.feature import PCA
pca = PCA(k = 4, inputCol = "features", outputCol = "pcaFeatures")
pcaTrans = pca.fit(
    assembler.transform(
        binaryTrans.transform(
            onehotTrans.transform(power_ml)
        )
    )
)

Select fitted PCA features, Hour_ind, Power_Zone_1, Power_Zone_2 and Month_ind as predictors

In [14]:
sqlTrans = SQLTransformer(
    statement = """
                SELECT pcaFeatures, Hour_ind, Power_Zone_1, Power_Zone_2, Month_ind, Power_Zone_3 as label FROM __THIS__
                """
)

Put all the predictors into features

In [15]:
assembler2 = VectorAssembler(inputCols = ["pcaFeatures", "Hour_ind", "Power_Zone_1", "Power_Zone_2", "Month_ind"], outputCol = "features2", handleInvalid = "keep")

In [16]:
lr_data = assembler2.transform(
                sqlTrans.transform(
                    pcaTrans.transform(
                        assembler.transform(
                            binaryTrans.transform(
                                onehotTrans.transform(power_ml)
                        )
                    )
                )
        )
)
lr_data.show(10)

+--------------------+--------+------------+------------+--------------+-----------+--------------------+
|         pcaFeatures|Hour_ind|Power_Zone_1|Power_Zone_2|     Month_ind|      label|           features2|
+--------------------+--------+------------+------------+--------------+-----------+--------------------+
|[1.79440486365694...|     0.0|  34055.6962| 16128.87538|(12,[1],[1.0])|20240.96386|(19,[0,1,2,3,5,6,...|
|[1.80604083009822...|     0.0| 29814.68354| 19375.07599|(12,[1],[1.0])|20131.08434|(19,[0,1,2,3,5,6,...|
|[1.81022976305638...|     0.0| 29128.10127| 19006.68693|(12,[1],[1.0])|19668.43373|(19,[0,1,2,3,5,6,...|
|[1.79866765174087...|     0.0| 28228.86076| 18361.09422|(12,[1],[1.0])|18899.27711|(19,[0,1,2,3,5,6,...|
|[1.86328720163796...|     0.0|  27335.6962| 17872.34043|(12,[1],[1.0])|18442.40964|(19,[0,1,2,3,5,6,...|
|[1.87820674500460...|     0.0| 26624.81013| 17416.41337|(12,[1],[1.0])|18130.12048|(19,[0,1,2,3,5,6,...|
|[1.91529298717954...|     0.0| 25998.98734| 1

Configure a piepline

In [17]:
from pyspark.ml.regression import LinearRegression
en = LinearRegression(featuresCol = "features2", labelCol = "label", elasticNetParam = 0.5)

In [18]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = [onehotTrans, binaryTrans, assembler, pcaTrans, sqlTrans, assembler2, en])

Create ParamGrid for cross validation

In [19]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid = ParamGridBuilder() \
    .addGrid(en.regParam, [0, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.98, 0.99, 1]) \
    .addGrid(en.elasticNetParam, [0, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.98, 0.99, 1]) \
    .build()

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator
crossval = CrossValidator(estimator = pipeline,
                          estimatorParamMaps = paramGrid,
                          evaluator = RegressionEvaluator(metricName = "rmse"),
                          numFolds = 5) 

Fit the model

In [21]:
cvModel = crossval.fit(power_ml)

In [22]:
predsCV = cvModel.transform(power_ml)

Find the training set RMSE

In [23]:
RegressionEvaluator().evaluate(cvModel.transform(power_ml))

2124.2926308738606

Create a residual column

In [24]:
predsCV.withColumn("residuals", predsCV.label - predsCV.prediction) \
       .select("label", "prediction", "residuals") \
       .show(5)

+-----------+------------------+------------------+
|      label|        prediction|         residuals|
+-----------+------------------+------------------+
|20240.96386|20206.777266530193| 34.18659346980712|
|20131.08434| 18021.43345775746| 2109.650882242542|
|19668.43373|17568.233859829135|2100.1998701708653|
|18899.27711|16948.979778817422|1950.2973311825772|
|18442.40964|16347.521708381111|2094.8879316188904|
+-----------+------------------+------------------+
only showing top 5 rows



## Streaming Part
### Reading a Stream

Create spark session

In [29]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Setup the schema for the stream

In [25]:
from pyspark.sql.types import StructType
myschema = StructType() \
                .add("Temperature", "double") \
                .add("Humidity", "double") \
                .add("Wind_Speed", "double") \
                .add("General_Diffuse_Flows", "double") \
                .add("Diffuse_Flows", "double") \
                .add("Power_Zone_1", "double") \
                .add("Power_Zone_2", "double") \
                .add("Power_Zone_3", "double") \
                .add("Month", "double") \
                .add("Hour", "double")

Read in a stream

In [38]:
sourceStream = spark.readStream.schema(myschema).csv("csv_power", header = True)

### Transform/Aggregation Step

Use model transformer to obtain predictions and create a residual column

In [39]:
from pyspark.sql.functions import col
residuals = cvModel.transform(sourceStream).withColumn("residuals", col("label") - col("prediction"))
modelDF = residuals.select("label", "prediction", "residuals")

Create pipeline2 to perform all the transformation

In [40]:
sqlTrans2 = SQLTransformer(
    statement = """
                SELECT Temperature, Humidity, Wind_Speed, General_Diffuse_Flows, Diffuse_Flows, pcaFeatures, Hour_ind, Power_Zone_1, Power_Zone_2, Month_ind, Hour_ind, pcaFeatures, Power_Zone_3 as label FROM __THIS__
                """
)

In [41]:
assembler3 = VectorAssembler(inputCols = ["pcaFeatures", "Hour_ind", "Power_Zone_1", "Power_Zone_2", "Month_ind"], outputCol = "features3", handleInvalid = "keep")

In [42]:
pipeline2 = Pipeline(stages = [onehotTrans, binaryTrans, assembler, pcaTrans, sqlTrans2, assembler3])

Create dataframe from the same stream using pipeline2

In [43]:
transDF = pipeline2.fit(sourceStream).transform(sourceStream)

### Writing Step
Join two data frames together based on the label and write stream to the console using the append

In [58]:
joinquery = modelDF \
                .join(transDF, "label", "inner") \
                .writeStream.outputMode("append") \
                .format("console") \
                .start()

In [59]:
joinquery.stop()

Now we are ready to monitor the output! 

Simply copy everything from .py file to the console and check the terminal window. The demonstration of streaming analysis is shown as well.