# Final Project

## 1. Fitting Model

In [1]:
pip install seaborn

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [24]:
import pandas as pd
import pyspark.pandas as ps

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, DoubleType

from pyspark.ml import Pipeline
from pyspark.ml.feature import Binarizer, StringIndexer, OneHotEncoder, VectorAssembler, PCA
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from pyspark.ml import Transformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

In [3]:
# Load the data file
df_pd = pd.read_csv("https://www4.stat.ncsu.edu/~online/datasets/power_ml_data.csv")

# Convert the pandas DataFrame to a pandas-on-Spark DataFrame
df_sp = ps.from_pandas(df_pd)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/28 14:01:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/28 14:01:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### 1-1. Summarize the data set

In [4]:
# Display basic information
df_sp.info()

                                                                                

<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 47174 entries, 0 to 47173
Data columns (total 10 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   Temperature            47174 non-null  float64
 1   Humidity               47174 non-null  float64
 2   Wind_Speed             47174 non-null  float64
 3   General_Diffuse_Flows  47174 non-null  float64
 4   Diffuse_Flows          47174 non-null  float64
 5   Power_Zone_1           47174 non-null  float64
 6   Power_Zone_2           47174 non-null  float64
 7   Power_Zone_3           47174 non-null  float64
 8   Month                  47174 non-null  int64  
 9   Hour                   47174 non-null  int64  
dtypes: float64(8), int64(2)

In [5]:
# Check the means, standard deviations, min, max
# Use pandas-on-Spark `describe()` to quickly generate summary statistics for all variables
df_sp.describe()

25/04/28 14:02:01 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Unnamed: 0,Temperature,Humidity,Wind_Speed,General_Diffuse_Flows,Diffuse_Flows,Power_Zone_1,Power_Zone_2,Power_Zone_3,Month,Hour
count,47174.0,47174.0,47174.0,47174.0,47174.0,47174.0,47174.0,47174.0,47174.0,47174.0
mean,18.81322,68.288398,1.961621,182.53118,74.987211,32335.16869,21027.204976,17831.197608,6.510599,11.488383
std,5.813341,15.56033,2.349351,264.431856,124.256146,7130.013305,5199.787153,6622.59047,3.437367,6.92192
min,3.247,11.34,0.05,0.004,0.011,13895.6962,8560.081466,5935.17407,1.0,0.0
25%,14.42,58.32,0.078,0.062,0.122,26290.63291,16957.31707,13121.92771,4.0,5.0
50%,18.78,69.89,0.086,4.78,4.284,32261.59696,20804.86322,16405.28211,7.0,11.0
75%,22.91,81.5,4.915,319.0,101.0,37317.44681,24698.73418,21628.91566,9.0,17.0
max,40.01,94.8,6.483,1163.0,936.0,52146.85905,37408.86076,47598.32636,12.0,23.0


In [6]:
# Check the median
# We can use `median()` directly as in pandas to compute the median for each column
df_sp.median()

                                                                                

Temperature                 18.78000
Humidity                    69.89000
Wind_Speed                   0.08600
General_Diffuse_Flows        4.78000
Diffuse_Flows                4.28400
Power_Zone_1             32261.59696
Power_Zone_2             20804.86322
Power_Zone_3             16405.28211
Month                        7.00000
Hour                        11.00000
dtype: float64

* The mean values of General_Diffuse_Flows and Diffuse_Flows are significantly higher than their median values. 
* This indicates that the majority of the data points for these two variables are concentrated at lower values.

In [7]:
# Find the correlations between all variables
# We can use `.corr()` to analyze correlations between multiple variables
corr_matrix = df_sp.corr()
print(corr_matrix)



                       Temperature  Humidity  Wind_Speed  General_Diffuse_Flows  Diffuse_Flows  Power_Zone_1  Power_Zone_2  Power_Zone_3     Month      Hour
Temperature               1.000000 -0.460143    0.476421               0.459602       0.195625      0.441446      0.384301      0.490752  0.284814  0.199134
Humidity                 -0.460143  1.000000   -0.136121              -0.467282      -0.258042     -0.289090     -0.297019     -0.234228 -0.016762 -0.244204
Wind_Speed                0.476421 -0.136121    1.000000               0.132304      -0.000727      0.166322      0.146338      0.279112  0.168491  0.004289
General_Diffuse_Flows     0.459602 -0.467282    0.132304               1.000000       0.564530      0.189994      0.158798      0.064942 -0.020793  0.131171
Diffuse_Flows             0.195625 -0.258042   -0.000727               0.564530       1.000000      0.082885      0.047379     -0.036761 -0.130249  0.132257
Power_Zone_1              0.441446 -0.289090    0.166322  

                                                                                

In [8]:
# Create a one-way contingency table of the Month variable
# Use `.value_counts()` to calculate the frequency of each value,
# and `.sort_index()` to sort the result by increasing the index values
df_sp["Month"].value_counts().sort_index()



1     4014
2     3588
3     4057
4     3893
5     3997
6     3913
7     4029
8     3999
9     3913
10    4026
11    3877
12    3868
Name: Month, dtype: int64

In [9]:
#  Create a one-way contingency table of the Hour variable
# Use `.value_counts()` to calculate the frequency of each value,
# and `.sort_index()` to sort the result by increasing the index values
df_sp["Hour"].value_counts().sort_index()



0     1950
1     1973
2     1973
3     1966
4     1986
5     1968
6     1992
7     1964
8     1957
9     1976
10    1955
11    1972
12    1979
13    1956
14    1971
15    1947
16    1950
17    1979
18    1955
19    1950
20    1945
21    1976
22    1966
23    1968
Name: Hour, dtype: int64

In [10]:
# Create a two-way contingency table for the Month and Hour variables
# Group the data by the combination of "Month" and "Hour"
# Use '.size()' to count the number of rows in each group
# Use '.sort_index()' to sort the groups by index order
df_sp.groupby(["Month", "Hour"]).size().sort_index()

Month  Hour
1      0       161
       1       165
       2       168
       3       162
       4       167
       5       168
       6       168
       7       168
       8       172
       9       174
       10      164
       11      174
       12      170
       13      165
       14      167
       15      163
       16      169
       17      170
       18      168
       19      162
       20      172
       21      166
       22      166
       23      165
2      0       142
       1       151
       2       147
       3       148
       4       156
       5       147
       6       157
       7       153
       8       147
       9       153
       10      145
       11      141
       12      155
       13      154
       14      153
       15      146
       16      153
       17      152
       18      144
       19      148
       20      149
       21      151
       22      151
       23      145
3      0       174
       1       171
       2       167
       3       168


In [11]:
# Group the data by Month and find the means of the numeric variables
# Group the data by "Month" column
# Use '.mean()' to calculate the mean for each group
df_sp.groupby("Month").mean()



Unnamed: 0_level_0,Temperature,Humidity,Wind_Speed,General_Diffuse_Flows,Diffuse_Flows,Power_Zone_1,Power_Zone_2,Power_Zone_3,Hour
Month,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
1,12.734699,68.258548,0.702223,103.959658,69.798826,31052.984428,19407.916366,17736.351685,11.512706
2,12.656535,66.490925,1.113977,125.471135,92.330615,30973.86316,18774.586006,17309.70787,11.497492
3,14.584055,71.115884,1.006017,181.401719,93.155905,31162.869031,18459.612113,16945.4628,11.479172
4,16.414755,75.408176,0.22299,157.722243,83.494537,31143.206766,17600.306571,18574.918338,11.482147
5,20.301401,68.609322,2.307473,274.500026,122.765576,32379.460464,19973.085387,17604.282564,11.4666
6,22.132706,68.76126,1.561346,277.434533,103.227789,34573.227026,20649.03459,20416.130091,11.456683
7,27.200593,57.599484,4.641782,294.112037,75.410538,35805.530436,24130.028182,28175.034099,11.495656
8,25.740415,66.022621,4.533251,227.178635,67.105847,36436.261651,24657.024552,24684.368961,11.506627
9,22.640565,66.868306,2.947096,202.201634,49.070622,33415.103456,20189.459837,14928.41553,11.532328
10,20.476249,71.524016,2.784221,115.814556,46.628719,32806.992796,21457.890001,13266.437337,11.486587


In [12]:
# Group the data by Month and find the standard deviations of the numeric variables
# Group the data by "Month" column
# Use '.std()' to calculate the standard deviation for each group
df_sp.groupby("Month").std()

                                                                                

Unnamed: 0_level_0,Temperature,Humidity,Wind_Speed,General_Diffuse_Flows,Diffuse_Flows,Power_Zone_1,Power_Zone_2,Power_Zone_3,Hour
Month,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
1,3.240635,12.15617,1.611795,166.16471,131.459172,7402.323411,4515.295696,4436.997405,6.893386
2,2.619715,12.411942,1.981157,206.73018,169.155517,6874.584791,4390.391101,4353.975946,6.900281
3,3.758852,13.918146,1.900982,260.148889,151.167923,6782.136539,4185.117595,4256.765546,6.934686
4,2.806221,14.312655,0.820343,246.173501,123.912352,6496.700167,3835.629384,4556.263192,6.913948
5,3.299952,16.436023,2.408328,331.998897,171.585943,6809.332811,4182.543672,4353.394234,6.904736
6,2.689724,14.972905,2.235411,328.27721,143.497896,7317.808097,4465.664309,5596.702926,6.933494
7,3.856707,18.849986,1.110539,331.733608,95.044517,6966.074191,4968.511101,6913.958361,6.927293
8,2.949887,18.482551,1.3009,289.906192,90.66254,7054.722275,5163.442765,6520.955955,6.950723
9,2.877585,15.992826,2.293412,270.172564,67.523539,6471.3669,4205.504219,3425.85632,6.92219
10,2.987293,13.980791,2.398703,185.043298,69.421308,6479.133482,4612.671175,3087.805818,6.944604


### 1-2. Switch to Spark SQL

In [13]:
# Convert the DataFrame from pandas-on-Spark to a Spark SQL DataFrame
df_sql = df_sp.to_spark()



In [14]:
# Check the schema of the DataFrame
df_sql.printSchema()

root
 |-- Temperature: double (nullable = false)
 |-- Humidity: double (nullable = false)
 |-- Wind_Speed: double (nullable = false)
 |-- General_Diffuse_Flows: double (nullable = false)
 |-- Diffuse_Flows: double (nullable = false)
 |-- Power_Zone_1: double (nullable = false)
 |-- Power_Zone_2: double (nullable = false)
 |-- Power_Zone_3: double (nullable = false)
 |-- Month: long (nullable = false)
 |-- Hour: long (nullable = false)



In [15]:
# Use '.withColumn()' to overwrite existing columns with new types
# Use '.cast()' to convert the data type of a column; here, 'long' is cast to 'double'
df_sql = df_sql.withColumn("Month", col("Month").cast("double")) \
               .withColumn("Hour", col("Hour").cast("double"))

In [16]:
# Recheck the schema to confirm the type changes for Month and Hour
df_sql.printSchema()

root
 |-- Temperature: double (nullable = false)
 |-- Humidity: double (nullable = false)
 |-- Wind_Speed: double (nullable = false)
 |-- General_Diffuse_Flows: double (nullable = false)
 |-- Diffuse_Flows: double (nullable = false)
 |-- Power_Zone_1: double (nullable = false)
 |-- Power_Zone_2: double (nullable = false)
 |-- Power_Zone_3: double (nullable = false)
 |-- Month: double (nullable = false)
 |-- Hour: double (nullable = false)



### 1-3. Set up a pipeline

In [17]:
# Binarize the Hour column based on the column being less than 6.5 or not
# 'Binarizer' converts values into 0 or 1 based on a specified threshold
binarizer = Binarizer(threshold=6.5, inputCol="Hour", outputCol="Hour_binary")

# One-hot encode the Month column
# 'StringIndexer' converts categorical string values into numerical indices
indexer = StringIndexer(inputCol="Month", outputCol="Month_index", handleInvalid="keep")
# 'OneHotEncoder' transforms the indexed values into one-hot encoded vectors
encoder = OneHotEncoder(inputCol="Month_index", outputCol="Month_OHE")

# Perform PCA
# Reduce five variables into three principal components
pca_input = ["Temperature", "Humidity", "Wind_Speed", "General_Diffuse_Flows", "Diffuse_Flows"]
pca_assembler = VectorAssembler(inputCols=pca_input, outputCol="pca_input")
pca = PCA(k=3, inputCol="pca_input", outputCol="pca_output")

# Final feature assembler
features = ["pca_output", "Hour_binary", "Power_Zone_1", "Power_Zone_2", "Month_OHE"]
assembler = VectorAssembler(inputCols=features, outputCol="features")

In [18]:
# Create a Transformer class to be used in the pipeline
# This class renames a column in a Spark SQL DataFram 
class RenameColumn(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    def __init__(self, inputCol: str, outputCol: str):
        super().__init__()
        self.inputCol = inputCol
        self.outputCol = outputCol

    def _transform(self, dataset: DataFrame) -> DataFrame:
        return dataset.withColumnRenamed(self.inputCol, self.outputCol)

# Instantiate the Transformer to rename "Power_Zone_3" to "response"
renamer = RenameColumn(inputCol="Power_Zone_3", outputCol="response")

In [19]:
# Linear Regression model - ElasticNet
lr = LinearRegression(labelCol="response", featuresCol="features")

# Parameter grid
# 'ParamGridBuilder' generates a grid of parameter combinations for model selection
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.98, 0.99, 1]) \
    .addGrid(lr.elasticNetParam, [0, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.98, 0.99, 1]) \
    .build()

# Define an evaluator to measure model performance
# Here, the metric used is Root Mean Squared Error (RMSE)
evaluator = RegressionEvaluator(labelCol="response", predictionCol="prediction", metricName="rmse")

In [20]:
# Create a pipeline combining all preprocessing and modeling stages
pipeline = Pipeline(stages=[binarizer, indexer, encoder, pca_assembler, pca, assembler, renamer, lr])

# Set up a CrossValidator to perform k-fold cross-validation for model selection
cv = CrossValidator(estimator=pipeline, 
                    estimatorParamMaps=paramGrid, 
                    evaluator=evaluator, 
                    numFolds=5)

In [21]:
# Fit the pipeline model using the training data
cvModel = cv.fit(df_sql) 

# Make predictions on the training data
predictions = cvModel.transform(df_sql)

# Evaluate the model by calculating the training Root Mean Squared Error (RMSE)
rmse = evaluator.evaluate(predictions)
print(f"Training RMSE: {rmse:.4f}")

# Display the first five rows showing actual, predicted, and residual values
predictions = predictions.withColumn("residual", col("response") - col("prediction"))
predictions.select("response", "prediction", "residual").show(5)

25/04/28 14:02:23 WARN Instrumentation: [b49b0bd4] regParam is zero, which might cause numerical instability and overfitting.
25/04/28 14:02:24 WARN Instrumentation: [b49b0bd4] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
25/04/28 14:02:26 WARN Instrumentation: [cfa8833e] regParam is zero, which might cause numerical instability and overfitting.
25/04/28 14:02:27 WARN Instrumentation: [cfa8833e] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
25/04/28 14:02:28 WARN Instrumentation: [2fa03562] regParam is zero, which might cause numerical instability and overfitting.
25/04/28 14:02:28 WARN Instrumentation: [2fa03562] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
25/04/28 14:02:29 WARN Instrumentation: [6910c1ba] regParam is zero, which might cause numerical instability and overfitting.
25/04/28 14:02:31 WARN Instrumentation: [7a147494] regParam is z

Training RMSE: 2137.4072
+-----------+------------------+------------------+
|   response|        prediction|          residual|
+-----------+------------------+------------------+
|20240.96386|20799.716959339567|-558.7530993395667|
|20131.08434|18579.516457248305|1551.5678827516967|
|19668.43373| 18129.45214722877|  1538.98158277123|
|18899.27711|17515.275781662618| 1384.001328337381|
|18442.40964| 16918.01521781402| 1524.394422185982|
+-----------+------------------+------------------+
only showing top 5 rows



## 2. Streaming Part

### 2-1.  Reading a Stream

In [22]:
# Create a SparkSession for the streaming project
spark = SparkSession.builder \
                    .appName("StreamingProject") \
                    .getOrCreate()

25/04/28 14:13:37 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [25]:
# Setup the schema for the stream
# Define the schema for the incoming streaming data
schema = StructType() \
    .add("Temperature", DoubleType()) \
    .add("Humidity", DoubleType()) \
    .add("Wind_Speed", DoubleType()) \
    .add("General_Diffuse_Flows", DoubleType()) \
    .add("Diffuse_Flows", DoubleType()) \
    .add("Power_Zone_1", DoubleType()) \
    .add("Power_Zone_2", DoubleType()) \
    .add("Power_Zone_3", DoubleType()) \
    .add("Month", DoubleType()) \
    .add("Hour", DoubleType()) 

In [26]:
# Set up the readStream
# Read streaming CSV files from the "csv_file/" directory
# Use the predefined schema and specify that the files include a header
streaming_df = spark.readStream.option("header", True) \
                    .schema(schema) \
                    .csv("csv_file/") 

### 2-2.  Prediction

In [27]:
# Apply the full model to the streaming data
predicted_stream = cvModel.transform(streaming_df)

# Create a residual column and select response, prediction, and residual for output
predicted_stream = predicted_stream.withColumn("residual", col("response") - col("prediction")) \
                                   .select("response", "prediction", "residual")

### 2-3.  Preprocessing pipeline

In [28]:
# Define a preprocessing-only pipeline
pipeline_stream = Pipeline(stages=[binarizer, indexer, encoder, pca_assembler, pca, assembler, renamer])

# Fit the preprocessing pipeline on static data
preprocessing_model = pipeline_stream.fit(df_sql)

# Apply the fitted preprocessing pipeline to the streaming data
transformed_stream = preprocessing_model.transform(streaming_df)

### 2-4.  Join

In [29]:
# Join the preprocessed streaming data with the predicted results on the "response" column
final_df = transformed_stream.join(predicted_stream, on="response")

### 2-5.  Writing Step

In [32]:
# Write the final streaming DataFrame to the console in append mode
query = final_df.writeStream.outputMode("append").format("console").start()

25/04/28 14:21:22 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-3c10979f-c9e4-46ab-866b-929379a5e9bb. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/04/28 14:21:22 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+-----+----+-----------+-----------+--------------+--------------------+--------------------+--------------------+------------------+-------------------+
|   response|Temperature|Humidity|Wind_Speed|General_Diffuse_Flows|Diffuse_Flows|Power_Zone_1|Power_Zone_2|Month|Hour|Hour_binary|Month_index|     Month_OHE|           pca_input|          pca_output|            features|        prediction|           residual|
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+-----+----+-----------+-----------+--------------+--------------------+--------------------+--------------------+------------------+-------------------+
|14232.28916|      3.541|    80.8|     0.085|                0.055|        0.115|  22584.3038| 13798.17629|  1.0| 5.0|     

                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+-----+----+-----------+-----------+---------------+--------------------+--------------------+--------------------+------------------+-------------------+
|   response|Temperature|Humidity|Wind_Speed|General_Diffuse_Flows|Diffuse_Flows|Power_Zone_1|Power_Zone_2|Month|Hour|Hour_binary|Month_index|      Month_OHE|           pca_input|          pca_output|            features|        prediction|           residual|
+-----------+-----------+--------+----------+---------------------+-------------+------------+------------+-----+----+-----------+-----------+---------------+--------------------+--------------------+--------------------+------------------+-------------------+
|19732.21757|      25.85|   47.35|     4.905|                233.2|        221.3| 29884.38538|  20244.3038|  7.0| 7.0|  

[Stage 9798:====>                                              (17 + 128) / 200]

In [33]:
query.stop()

25/04/28 14:21:57 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 2, writer: ConsoleWriter[numRows=20, truncate=true]] is aborting.
25/04/28 14:21:57 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 2, writer: ConsoleWriter[numRows=20, truncate=true]] aborted.
25/04/28 14:21:57 WARN Shell: Interrupted while joining on: Thread[Thread-71400,5,]
java.lang.InterruptedException
	at java.base/java.lang.Object.wait(Native Method)
	at java.base/java.lang.Thread.join(Thread.java:1300)
	at java.base/java.lang.Thread.join(Thread.java:1375)
	at org.apache.hadoop.util.Shell.joinThread(Shell.java:1042)
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:1002)
	at org.apache.hadoop.util.Shell.run(Shell.java:900)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
	at org.apache.hadoop.fs