# Final Project

## Vindhya Hegde

### ST 590

# Purpose of the report

This purpose of this report is to create a machine learning model using PySpark's MLlib module and use it to forecast incoming data streams. The dataset utilized in this study is from the UCI machine learning repository dataset, which also includes information on time of day, temperature, humidity, and electricity consumption data from various areas of Tetouan city. This project aims to demonstrate the use of PySpark's robust tools and libraries to create and deploy a machine learning model in a streaming context.

# Fitting Your Model

The data (https://www4.stat.ncsu.edu/~online/datasets/power_ml_data.csv) is read into a standard pandas data frame using the pd.read_csv() function and converted this to a spark data frame. Imported neccessary libraries.

In [1]:
import pandas as pd
from pyspark.sql import SparkSession

# Read data into a pandas dataframe
df_pandas = pd.read_csv("https://www4.stat.ncsu.edu/~online/datasets/power_ml_data.csv")

# Create a Spark session
spark = SparkSession.builder.appName("PowerData").getOrCreate()

# Convert pandas dataframe to a Spark dataframe
df_spark = spark.createDataFrame(df_pandas)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


Found means, standard deviations, min, max, and median values for all the variables by using `.summary()`

In [2]:
df_spark.summary().show()

+-------+------------------+------------------+------------------+---------------------+------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|       Temperature|          Humidity|        Wind_Speed|General_Diffuse_Flows|     Diffuse_Flows|      Power_Zone_1|      Power_Zone_2|      Power_Zone_3|            Month|              Hour|
+-------+------------------+------------------+------------------+---------------------+------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|             47174|             47174|             47174|                47174|             47174|             47174|             47174|             47174|            47174|             47174|
|   mean|18.813219803281438| 68.28839827023359|1.9616214016194993|   182.53118043837654| 74.98721140882624|32335.168690495833| 21027.20497598019|17831.197607816728|6.510599058803578|11.4883834

– Found the correlations between all variables by using `.corr()`

In [3]:
from pyspark.sql.functions import corr
df_spark.select(corr("Temperature", "Humidity"), corr("Temperature", "Wind_Speed"), corr("Humidity", "Wind_Speed"), 
                corr("General_Diffuse_Flows", "Diffuse_Flows"),
               corr("Power_Zone_1", "Power_Zone_2"), corr("Power_Zone_1", "Power_Zone_3"), corr("Power_Zone_2", "Power_Zone_3"), 
                corr("Month", "Hour")).show()


+---------------------------+-----------------------------+--------------------------+------------------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------+
|corr(Temperature, Humidity)|corr(Temperature, Wind_Speed)|corr(Humidity, Wind_Speed)|corr(General_Diffuse_Flows, Diffuse_Flows)|corr(Power_Zone_1, Power_Zone_2)|corr(Power_Zone_1, Power_Zone_3)|corr(Power_Zone_2, Power_Zone_3)|   corr(Month, Hour)|
+---------------------------+-----------------------------+--------------------------+------------------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------+
|        -0.4601429412885557|           0.4764210955803555|      -0.13612147371170583|                        0.5645303710201623|              0.8346940579474785|              0.7506555678809356|              0.5723439832303366|-4.19363425066390...|


Created a one-way contingency table of the Month variable by using `groupBy()` and `.agg()`

In [4]:
from pyspark.sql.functions import count, col

# Create a one-way contingency table of the Month variable
month_oneway = df_spark.groupBy('Month').agg(count('*').alias('Count')).orderBy('Month')
month_oneway.show()

+-----+-----+
|Month|Count|
+-----+-----+
|    1| 4014|
|    2| 3588|
|    3| 4057|
|    4| 3893|
|    5| 3997|
|    6| 3913|
|    7| 4029|
|    8| 3999|
|    9| 3913|
|   10| 4026|
|   11| 3877|
|   12| 3868|
+-----+-----+



Created a one-way contingency table of the Hour variable by using `groupBy()` and `.agg()`


In [50]:
# Create a one-way contingency table of the Hour variable
hour_oneway = df_spark.groupBy('Hour').agg(count('*').alias('Count')).orderBy('Hour')
hour_oneway.show(25)

+----+-----+
|Hour|Count|
+----+-----+
| 0.0| 1950|
| 1.0| 1973|
| 2.0| 1973|
| 3.0| 1966|
| 4.0| 1986|
| 5.0| 1968|
| 6.0| 1992|
| 7.0| 1964|
| 8.0| 1957|
| 9.0| 1976|
|10.0| 1955|
|11.0| 1972|
|12.0| 1979|
|13.0| 1956|
|14.0| 1971|
|15.0| 1947|
|16.0| 1950|
|17.0| 1979|
|18.0| 1955|
|19.0| 1950|
|20.0| 1945|
|21.0| 1976|
|22.0| 1966|
|23.0| 1968|
+----+-----+



Created a two-way contingency table for the Month and Hour variables

In [6]:
from pyspark.sql.functions import count, col

# Create a two-way contingency table of the Month and Hour variables
two_way = df_spark.groupBy('Month', 'Hour').agg(count('*').alias('Count')).orderBy('Month', 'Hour')
two_way.show()

+-----+----+-----+
|Month|Hour|Count|
+-----+----+-----+
|    1|   0|  161|
|    1|   1|  165|
|    1|   2|  168|
|    1|   3|  162|
|    1|   4|  167|
|    1|   5|  168|
|    1|   6|  168|
|    1|   7|  168|
|    1|   8|  172|
|    1|   9|  174|
|    1|  10|  164|
|    1|  11|  174|
|    1|  12|  170|
|    1|  13|  165|
|    1|  14|  167|
|    1|  15|  163|
|    1|  16|  169|
|    1|  17|  170|
|    1|  18|  168|
|    1|  19|  162|
+-----+----+-----+
only showing top 20 rows



Grouped the data by Month and found the means of the numeric variables

In [7]:
from pyspark.sql.functions import mean

# Group the data by Month and find the means of the numeric variables
df_month_means = df_spark.groupBy('Month').agg(mean('Temperature').alias('Temperature_mean'),
                                               mean('Humidity').alias('Humidity_mean'),
                                               mean('Wind_Speed').alias('Wind_Speed_mean'),
                                               mean('General_Diffuse_Flows').alias('General_Diffuse_Flows_mean'),
                                               mean('Diffuse_Flows').alias('Diffuse_Flows_mean'),
                                               mean('Power_Zone_1').alias('Power_Zone_1_mean'),
                                               mean('Power_Zone_2').alias('Power_Zone_2_mean'),
                                               mean('Power_Zone_3').alias('Power_Zone_3_mean'),
                                               mean('Hour').alias('Hour_mean'))

df_month_means.show()


+-----+------------------+-----------------+-------------------+--------------------------+------------------+------------------+------------------+------------------+------------------+
|Month|  Temperature_mean|    Humidity_mean|    Wind_Speed_mean|General_Diffuse_Flows_mean|Diffuse_Flows_mean| Power_Zone_1_mean| Power_Zone_2_mean| Power_Zone_3_mean|         Hour_mean|
+-----+------------------+-----------------+-------------------+--------------------------+------------------+------------------+------------------+------------------+------------------+
|    1|12.734699053313424|68.25854758345791| 0.7022234678624802|        103.95965819631209| 69.79882635774754| 31052.98442787007|19407.916365649475| 17736.35168477324| 11.51270553064275|
|    2|12.656535117056842|66.49092530657751| 1.1139765886287718|        125.47113545150515|  92.3306151059088|30973.863159715755|18774.586005972385|17309.707870419465|11.497491638795987|
|    3|14.584054720236646|71.11588365787539| 1.0060172541286598| 

Grouped the data by Month and found the standard deviations of the numeric variables

In [8]:
from pyspark.sql.functions import stddev

# Group the data by Month and find the standard deviations of the numeric variables
month_SD = df_spark.groupBy('Month').agg(stddev('Temperature').alias('Temperature_SD'),
                                                 stddev('Humidity').alias('Humidity_SD'),
                                                 stddev('Wind_Speed').alias('Wind_Speed_SD'),
                                                 stddev('General_Diffuse_Flows').alias('General_Diffuse_Flows_SD'),
                                                 stddev('Diffuse_Flows').alias('Diffuse_Flows_SD'),
                                                 stddev('Power_Zone_1').alias('Power_Zone_1_SD'),
                                                 stddev('Power_Zone_2').alias('Power_Zone_2_SD'),
                                                 stddev('Power_Zone_3').alias('Power_Zone_3_SD'),
                                                stddev('Hour').alias('Hour_SD'))

month_SD.show()

+-----+------------------+------------------+------------------+------------------------+------------------+------------------+------------------+------------------+------------------+
|Month|    Temperature_SD|       Humidity_SD|     Wind_Speed_SD|General_Diffuse_Flows_SD|  Diffuse_Flows_SD|   Power_Zone_1_SD|   Power_Zone_2_SD|   Power_Zone_3_SD|           Hour_SD|
+-----+------------------+------------------+------------------+------------------------+------------------+------------------+------------------+------------------+------------------+
|    1|3.2406352202253177| 12.15616997395505|1.6117952014385526|      166.16470976772894|  131.459171585262| 7402.323410683091| 4515.295696366688| 4436.997404934671|6.8933864667013784|
|    2| 2.619715133289512|12.411941927246787|1.9811568615798605|      206.73018017500877|169.15551710161583| 6874.584790780019| 4390.391100519847|4353.9759463094815| 6.900280948950495|
|    3|3.7588517619095954|13.918146099336553|1.9009817904056068|      260.1

Used a spark SQL data frame method to cast the variable as a DoubleType using `.withColumn()`

In [9]:
from pyspark.sql.types import DoubleType

df_spark = df_spark.withColumn("Hour", df_spark["Hour"].cast(DoubleType()))
print(df_spark)

DataFrame[Temperature: double, Humidity: double, Wind_Speed: double, General_Diffuse_Flows: double, Diffuse_Flows: double, Power_Zone_1: double, Power_Zone_2: double, Power_Zone_3: double, Month: bigint, Hour: double]


In the next code, The sequence of the transform were as follows:

+ `Binarizer()` - In this code, doing the transformation `Binarizer()` with inputCol = "Hour" and outputCol = "day_or_night" 

+ `OneHotEncoder()` - In this code, created `OneHotEncoder()` estimator to convert Month into vector using inputCol = "Month" and outputCol= "MonthVec" 

+ `SQLTransformer()` - In this code, doing the transformation `SQLTransformer()` and select the features from the dataset and get Power_Zone_3 as label

+ `VectorAssembler()` - In this code, doing the transformation `VectorAssembler()` outputCol = "features" 

+ `PCA()` - In this code, doing the transformation `PCA()` with inputCol = "features" and outputCol = "pca_features" 

+ `VectorAssembler()` - Again used `VectorAssembler()` for selecting inputCol = "pca_features", "day_or_night", "Power_Zone_1", "Power_Zone_2", "MonthVec" and outputCol = "features_new" where it was selected as predictors

This was done by first fitting the transformer using `.fit()` and then using `.transform()` to transform it.

In [10]:
from pyspark.ml.feature import Binarizer, SQLTransformer
binarizer = Binarizer(threshold=6.5, inputCol="Hour", outputCol="day_or_night")
binarizer.transform(df_spark)

from pyspark.ml.feature import OneHotEncoder
# create OneHotEncoder estimator
encoder = OneHotEncoder(inputCols=["Month"], outputCols=["MonthVec"])
# fit and transform the data
encoder.fit(binarizer.transform(df_spark)).transform(binarizer.transform(df_spark))

#sqltransformer
sqltrans = SQLTransformer(
    statement = """
                 SELECT Temperature, Humidity, Wind_Speed, General_Diffuse_Flows, Diffuse_Flows,Power_Zone_1,Power_Zone_2, day_or_night, MonthVec,
                 Power_Zone_3 as label FROM __THIS__
                 """
)
sqltrans.transform(encoder.fit(binarizer.transform(df_spark)).transform(binarizer.transform(df_spark)))

#vector assembler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["Temperature", "Humidity", "Wind_Speed", "General_Diffuse_Flows", "Diffuse_Flows"], outputCol="features")
assembler.transform(sqltrans.transform(encoder.fit(binarizer.transform(df_spark)).transform(binarizer.transform(df_spark))))

#pca
from pyspark.ml.feature import PCA
pca = PCA(k=5, inputCol="features", outputCol="pca_features")
assembled_data = assembler.transform(sqltrans.transform(encoder.fit(binarizer.transform(df_spark)).transform(binarizer.transform(df_spark))))
pca_fit = pca.fit(assembled_data)
pca_fit.transform(assembled_data)

#vector assembler2
assembler2 = VectorAssembler(inputCols=["pca_features", "day_or_night", "Power_Zone_1", 
                                        "Power_Zone_2", "MonthVec"],
                             outputCol="features_new")
assembler2.transform(pca_fit.transform(assembled_data))

DataFrame[Temperature: double, Humidity: double, Wind_Speed: double, General_Diffuse_Flows: double, Diffuse_Flows: double, Power_Zone_1: double, Power_Zone_2: double, day_or_night: double, MonthVec: vector, label: double, features: vector, pca_features: vector, features_new: vector]

Imported necessary libraries to fit an elastic net model

In [45]:
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In this section elastic net model was fit. The steps involved:

+ Defined the Linear Regression function by `LinearRegression()`
+ Defined the pipeline and stages included transformations
+ Defined the parameter grid for crossvaliadtion by using `ParamGridBuilder()` and `.addGrid()` to specify the tuning parameter values `regParam` and `elasticNetParam`. Then used the `.build()` method to build the grid.
+ Defined the evaluator with `RegressionEvaluator()` with RMSE metric
+ Defined the cross validator
+ Trained the model using cross validation
+ Fit training data for RMSE using `.fit()` 
+ Done predictions on `df_spark` data using `.transform()`
+ Calculated RMSE on `df_spark` data using `.evaluate()`
+ Printed the results

In [12]:
lr = LinearRegression(featuresCol="features_new", labelCol="label", elasticNetParam=0.5)

# Build the pipeline
pipeline = Pipeline(stages=[binarizer, encoder, sqltrans, assembler, pca_fit, assembler2, lr])

# Define the hyperparameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.98, 0.99, 1]) \
    .addGrid(lr.elasticNetParam, [0, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.98, 0.99, 1]) \
    .build()

# Define the evaluator
evaluator = RegressionEvaluator(metricName="rmse")

# Define the cross-validator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)
# Fit the cross-validator
cvModel = crossval.fit(df_spark)

# Make predictions on the training set
predictions1 = cvModel.transform(df_spark)

In [51]:
# Evaluate RMSE on the training set
rmse = evaluator.evaluate(predictions1)
print("RMSE: {:.2f}".format(rmse))

RMSE: 2124.15


Printed residulas using `.withColumn()` by subtracting label and predictions

In [16]:
residuals = predictions1.withColumn("residuals", predictions1.label - predictions1.prediction)
residuals.select("residuals", "label", "prediction").show(5)

+------------------+-----------+------------------+
|         residuals|      label|        prediction|
+------------------+-----------+------------------+
|37.593120697845734|20240.96386|20203.370739302154|
|2118.8633006890086|20131.08434|18012.221039310993|
|2109.3808865785104|19668.43373| 17559.05284342149|
|1959.3565317569119|18899.27711|16939.920578243087|
|2103.9381192680667|18442.40964|16338.471520731935|
+------------------+-----------+------------------+
only showing top 5 rows



# Streaming Part

Another file was downloaded https://www4.stat.ncsu.edu/~online/datasets/power_streaming_data.csv

This downloaded file was stored in stream_folder1.

Schema was setup for the stream and made variables a double variable. 

In [38]:
from pyspark.sql.types import StructType, StructField, DoubleType, StringType

schema = StructType([
    StructField("Temperature", DoubleType(), True),
    StructField("Humidity", DoubleType(), True),
    StructField("Wind_Speed", DoubleType(), True),
    StructField("General_Diffuse_Flows", DoubleType(), True),
    StructField("Diffuse_Flows", DoubleType(), True),
    StructField("Power_Zone_1", DoubleType(), True),
    StructField("Power_Zone_2", DoubleType(), True),
    StructField("Power_Zone_3", DoubleType(), True),
    StructField("Month", DoubleType(), True),
    StructField("Hour", DoubleType(), True)])


## Reading a Stream

readStream was setup and added header = True. Created a folder stream_folder2 where .csv will be sent.

In [39]:
stream_data = spark \
    .readStream \
    .schema(schema) \
    .option("header", "true") \
    .csv("stream_folder2")

## Transform/Aggregation Step

Applied the model transformer to obtain predictions from the incoming data and on the resulting predictions also created a residual `.withColumn()`

In [49]:
predictions2 = cvModel.transform(stream_data).withColumn("residuals", col("label") - col("prediction"))
predictions2

DataFrame[Temperature: double, Humidity: double, Wind_Speed: double, General_Diffuse_Flows: double, Diffuse_Flows: double, Power_Zone_1: double, Power_Zone_2: double, day_or_night: double, MonthVec: vector, label: double, features: vector, pca_features: vector, features_new: vector, prediction: double, residuals: double]

In the next code, created second pipeline. The sequence of the transform were as follows:

+ `Binarizer()` - In this code, doing the transformation `Binarizer()` with inputCol = "Hour" and outputCol = "day_or_night" 

+ `OneHotEncoder()` - In this code, created `OneHotEncoder()` estimator to convert Month into vector using inputCol = "Month" and outputCol= "MonthVec" 

+ `VectorAssembler()` - In this code, doing the transformation `VectorAssembler()` outputCol = "features" 

+ `PCA()` - In this code, doing the transformation `PCA()` with inputCol = "features" and outputCol = "pca_features" 

+ `VectorAssembler()` - Again used `VectorAssembler()` for selecting inputCol = "pca_features", "day_or_night", "Power_Zone_1", "Power_Zone_2", "MonthVec" and outputCol = "features_new" where it was selected as predictors


+ `SQLTransformer()` - In this code, doing the transformation `SQLTransformer()` and selected all the features from the dataset and get Power_Zone_3 as label

This was done by first fitting the transformer using `.fit()` and then using `.transform()` to transform it.

In [42]:
#pipeline 2 use this

binarizer2 = Binarizer(threshold=6.5, inputCol="Hour", outputCol="day_or_night")
binarizer2.transform(df_spark)

# create OneHotEncoder estimator
encoder2 = OneHotEncoder(inputCols=["Month"], outputCols=["MonthVec"])
# fit and transform the data
encoder2.fit(binarizer2.transform(df_spark)).transform(binarizer2.transform(df_spark))

sqltrans2 = SQLTransformer(
    statement = """
                 SELECT Temperature, Humidity, Wind_Speed, General_Diffuse_Flows, Diffuse_Flows,Power_Zone_1,Power_Zone_2, day_or_night, MonthVec,
                 Power_Zone_3, Power_Zone_3 as label FROM __THIS__
                 """
)
sqltrans2.transform(encoder2.fit(binarizer2.transform(df_spark)).transform(binarizer2.transform(df_spark)))

assembler4 = VectorAssembler(inputCols=["Temperature", "Humidity", "Wind_Speed", "General_Diffuse_Flows", "Diffuse_Flows"], outputCol="features")
assembler4.transform(sqltrans2.transform(encoder2.fit(binarizer2.transform(df_spark)).transform(binarizer2.transform(df_spark))))

pca2 = PCA(k=5, inputCol="features", outputCol="pca_features")
assembled_data2 = assembler4.transform(sqltrans2.transform(encoder2.fit(binarizer2.transform(df_spark)).transform(binarizer2.transform(df_spark))))
pca_fit2 = pca2.fit(assembled_data2)
pca_fit2.transform(assembled_data2)

assembler5 = VectorAssembler(inputCols=["pca_features", "day_or_night", "Power_Zone_1", 
                                        "Power_Zone_2", "MonthVec"],
                             outputCol="features_new")
assembler5.transform(pca_fit2.transform(assembled_data2))

# Define second pipeline
pipeline2 = Pipeline(stages=[binarizer2, encoder2, assembler4, pca2, assembler5, sqltrans2])

Fitted second pipeline on the data frame `df_spark`

In [48]:
streamdf = pipeline2.fit(df_spark)

Applied second pipeline on the streaming data source `stream_data`

In [47]:
transformed_data = streamdf.transform(stream_data)
transformed_data

DataFrame[Temperature: double, Humidity: double, Wind_Speed: double, General_Diffuse_Flows: double, Diffuse_Flows: double, Power_Zone_1: double, Power_Zone_2: double, day_or_night: double, MonthVec: vector, Power_Zone_3: double, label: double]

## Writing Step

Combined it with a `.join()` transformed_data and predictions2 with inner join on label. Wrote a stream with `.writeStream` to the console using the append output mode. Started the query with `.start()`

In [35]:
# Start the streaming query
stream_query = transformed_data \
    .join(predictions2,"label","inner")\
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .start()

## Produce Data

The streaming data downloaded and placed in stream_folder1.

Wrote a loop of 50 iterations to:

+ Randomly sampled three rows and output those to a .csv file in the folder
+ Wrote a Pause for 10 seconds in between outputting of data sets
+ Submitted this loop in a python console.
+ While the loop ran, checked log (command prompt or terminal window) and the output being made

In [None]:
#Read in some data to sample from
import pandas as pd
power_stream = pd.read_csv("stream_folder1/power_streaming_data.csv")

#Now a for loop to sample a few rows and output them to a data set
#Pause for 10 sec 
import numpy as np
import time

for i in range(0,50):
    #randomly sample a few rows
    temp = power_stream.loc[np.random.randint(power_stream.shape[0], size = 3)]
    temp["timestamp"] = [time.strftime("%H:%M:%S", time.localtime())]*3
    temp.to_csv("stream_folder2/power_stream" + str(i) + ".csv", index = False, header = False)
    time.sleep(10)

stopped the query using `.stop()`

In [36]:
stream_query.stop()