Data Source: https://archive.ics.uci.edu/ml/datasets/Combined+Cycle+Power+Plant

Features consist of hourly average ambient variables

Temperature (T) in the range 1.81°C and 37.11°C,
Ambient Pressure (AP) in the range 992.89-1033.30 milibar,
Relative Humidity (RH) in the range 25.56% to 100.16%
Exhaust Vacuum (V) in teh range 25.36-81.56 cm Hg
Net hourly electrical energy output (EP) 420.26-495.76 MW
The averages are taken from various sensors located around the plant that record the ambient variables every second. The variables are given without normalization.

Dataset Information:

The dataset contains 9568 data points collected from a Combined Cycle Power Plant over 6 years (2006-2011), when the power plant was set to work with full load. Features consist of hourly average ambient variables Temperature (T), Ambient Pressure (AP), Relative Humidity (RH) and Exhaust Vacuum (V) to predict the net hourly electrical energy output (EP) of the plant.
A combined cycle power plant (CCPP) is composed of gas turbines (GT), steam turbines (ST) and heat recovery steam generators. In a CCPP, the electricity is generated by gas and steam turbines, which are combined in one cycle, and is transferred from one turbine to another. While the Vacuum is colected from and has effect on the Steam Turbine, he other three of the ambient variables effect the GT performance.

In [None]:
!ls -ltr data/Folds5x2_pp.csv

-rwxrwxrwx 1 root root 318263 Oct 31 01:08 data/Folds5x2_pp.csv


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MySparkApp") \
    .getOrCreate()

# Load Data

In [None]:
df = spark.read.format("csv").option("header","true")\
.option("inferSchema","true").load("data/Power Plant Data.csv")

In [None]:
df.show()

+-----+-----+-------+-----+------+
|   AT|    V|     AP|   RH|    PE|
+-----+-----+-------+-----+------+
|14.96|41.76|1024.07|73.17|463.26|
|25.18|62.96|1020.04|59.08|444.37|
| 5.11| 39.4|1012.16|92.14|488.56|
|20.86|57.32|1010.24|76.64|446.48|
|10.82| 37.5|1009.23|96.62| 473.9|
|26.27|59.44|1012.23|58.77|443.67|
|15.89|43.96|1014.02|75.24|467.35|
| 9.48|44.71|1019.12|66.43|478.42|
|14.64| 45.0|1021.78|41.25|475.98|
|11.74|43.56|1015.14|70.72| 477.5|
|17.99|43.72|1008.64|75.04|453.02|
|20.14|46.93|1014.66|64.22|453.99|
|24.34| 73.5|1011.31|84.15|440.29|
|25.71|58.59|1012.77|61.83|451.28|
|26.19|69.34|1009.48|87.59|433.99|
|21.42|43.79|1015.76|43.08|462.19|
|18.21| 45.0|1022.86|48.84|467.54|
|11.04|41.74| 1022.6|77.51| 477.2|
|14.45|52.75|1023.97|63.59|459.85|
|13.97|38.47|1015.15|55.28| 464.3|
+-----+-----+-------+-----+------+
only showing top 20 rows



In [None]:
df.cache()

DataFrame[AT: double, V: double, AP: double, RH: double, PE: double]

# Convert Spark Dataframe to Pandas Dataframe

In [None]:
df.limit(10).toPandas().head()

Unnamed: 0,AT,V,AP,RH,PE
0,14.96,41.76,1024.07,73.17,463.26
1,25.18,62.96,1020.04,59.08,444.37
2,5.11,39.4,1012.16,92.14,488.56
3,20.86,57.32,1010.24,76.64,446.48
4,10.82,37.5,1009.23,96.62,473.9


## Verctorize the features

In [None]:
from pyspark.ml.feature import *

In [None]:
vectorizer = VectorAssembler()
vectorizer.setInputCols(["AT", "V", "AP", "RH"])
vectorizer.setOutputCol("features")

df_vect = vectorizer.transform(df)
df_vect.show(10, False)

+-----+-----+-------+-----+------+---------------------------+
|AT   |V    |AP     |RH   |PE    |features                   |
+-----+-----+-------+-----+------+---------------------------+
|14.96|41.76|1024.07|73.17|463.26|[14.96,41.76,1024.07,73.17]|
|25.18|62.96|1020.04|59.08|444.37|[25.18,62.96,1020.04,59.08]|
|5.11 |39.4 |1012.16|92.14|488.56|[5.11,39.4,1012.16,92.14]  |
|20.86|57.32|1010.24|76.64|446.48|[20.86,57.32,1010.24,76.64]|
|10.82|37.5 |1009.23|96.62|473.9 |[10.82,37.5,1009.23,96.62] |
|26.27|59.44|1012.23|58.77|443.67|[26.27,59.44,1012.23,58.77]|
|15.89|43.96|1014.02|75.24|467.35|[15.89,43.96,1014.02,75.24]|
|9.48 |44.71|1019.12|66.43|478.42|[9.48,44.71,1019.12,66.43] |
|14.64|45.0 |1021.78|41.25|475.98|[14.64,45.0,1021.78,41.25] |
|11.74|43.56|1015.14|70.72|477.5 |[11.74,43.56,1015.14,70.72]|
+-----+-----+-------+-----+------+---------------------------+
only showing top 10 rows



In [None]:
print(vectorizer.explainParams())

handleInvalid: How to handle invalid data (NULL and NaN values). Options are 'skip' (filter out rows with invalid data), 'error' (throw an error), or 'keep' (return relevant number of NaN in the output). Column lengths are taken from the size of ML Attribute Group, which can be set using `VectorSizeHint` in a pipeline before `VectorAssembler`. Column lengths can also be inferred from first rows of the data since it is safe to do so but only in case of 'error' or 'skip'). (default: error)
inputCols: input column names. (current: ['AT', 'V', 'AP', 'RH'])
outputCol: output column name. (default: VectorAssembler_b1257d13f464__output, current: features)


## Fit Linear Regression Model

In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
lr = LinearRegression()
print(lr.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
epsilon: The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber (default: 1.35)
featuresCol: features column name. (default: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label)
loss: The loss function to be optimized. Supported options: squaredError, huber. (default: squaredError)
maxBlockSizeInMB: maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. Default 0.0 represents choosing optimal value, depends on specific algorithm. Must be >= 0. (default: 0.0)
maxIter: max number of iterations (>= 0). (default: 100)
predic

In [None]:
lr.setLabelCol("PE")
lr.setFeaturesCol("features")
model = lr.fit(df_vect)

In [None]:
type(model)

pyspark.ml.regression.LinearRegressionModel

### View model summary

In [None]:
print("R2:", model.summary.r2)
print("Intercept: ", model.intercept, "Coefficients", model.coefficients)

R2: 0.9286960898122534
Intercept:  454.6092744523414 Coefficients [-1.9775131067284113,-0.23391642256928327,0.06208294364801217,-0.1580541029343498]


### Predict

In [None]:
df_pred = model.transform(df_vect)
df_pred.show()

+-----+-----+-------+-----+------+--------------------+------------------+
|   AT|    V|     AP|   RH|    PE|            features|        prediction|
+-----+-----+-------+-----+------+--------------------+------------------+
|14.96|41.76|1024.07|73.17|463.26|[14.96,41.76,1024...|467.26978995910457|
|25.18|62.96|1020.04|59.08|444.37|[25.18,62.96,1020...| 444.0773658973149|
| 5.11| 39.4|1012.16|92.14|488.56|[5.11,39.4,1012.1...|483.56264262613047|
|20.86|57.32|1010.24|76.64|446.48|[20.86,57.32,1010...| 450.5556682463947|
|10.82| 37.5|1009.23|96.62| 473.9|[10.82,37.5,1009....| 471.8254985835583|
|26.27|59.44|1012.23|58.77|443.67|[26.27,59.44,1012...| 442.3093914004435|
|15.89|43.96|1014.02|75.24|467.35|[15.89,43.96,1014...| 463.9649810634581|
| 9.48|44.71|1019.12|66.43|478.42|[9.48,44.71,1019....|478.17448242011676|
|14.64| 45.0|1021.78|41.25|475.98|[14.64,45.0,1021....| 472.0476219688437|
|11.74|43.56|1015.14|70.72| 477.5|[11.74,43.56,1015...| 473.0491644675578|
|17.99|43.72|1008.64|75.0

### Evaluate

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
evaluator = RegressionEvaluator()
print(evaluator.explainParams())

labelCol: label column name. (default: label)
metricName: metric name in evaluation - one of:
                       rmse - root mean squared error (default)
                       mse - mean squared error
                       r2 - r^2 metric
                       mae - mean absolute error
                       var - explained variance. (default: rmse)
predictionCol: prediction column name. (default: prediction)
throughOrigin: whether the regression is through the origin. (default: False)
weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0. (undefined)


In [None]:
evaluator = RegressionEvaluator(labelCol = "PE",
                                predictionCol = "prediction",
                                metricName = "rmse")
evaluator.evaluate(df_pred)

4.557126016749488

## Build a pipeline

In [None]:
from pyspark.ml.pipeline import Pipeline, PipelineModel

In [None]:
pipeline = Pipeline()
print(pipeline.explainParams())
pipeline.setStages([vectorizer, lr])
pipelineModel = pipeline.fit(df)

stages: a list of pipeline stages (undefined)


In [None]:
pipeline.getStages()

[VectorAssembler_b1257d13f464, LinearRegression_9b7f8e1ec74a]

In [None]:
lr_model = pipelineModel.stages[1]
lr_model .coefficients

DenseVector([-1.9775, -0.2339, 0.0621, -0.1581])

In [None]:
pipelineModel.transform(df).show()

+-----+-----+-------+-----+------+--------------------+------------------+
|   AT|    V|     AP|   RH|    PE|            features|        prediction|
+-----+-----+-------+-----+------+--------------------+------------------+
|14.96|41.76|1024.07|73.17|463.26|[14.96,41.76,1024...|467.26978995910457|
|25.18|62.96|1020.04|59.08|444.37|[25.18,62.96,1020...| 444.0773658973149|
| 5.11| 39.4|1012.16|92.14|488.56|[5.11,39.4,1012.1...|483.56264262613047|
|20.86|57.32|1010.24|76.64|446.48|[20.86,57.32,1010...| 450.5556682463947|
|10.82| 37.5|1009.23|96.62| 473.9|[10.82,37.5,1009....| 471.8254985835583|
|26.27|59.44|1012.23|58.77|443.67|[26.27,59.44,1012...| 442.3093914004435|
|15.89|43.96|1014.02|75.24|467.35|[15.89,43.96,1014...| 463.9649810634581|
| 9.48|44.71|1019.12|66.43|478.42|[9.48,44.71,1019....|478.17448242011676|
|14.64| 45.0|1021.78|41.25|475.98|[14.64,45.0,1021....| 472.0476219688437|
|11.74|43.56|1015.14|70.72| 477.5|[11.74,43.56,1015...| 473.0491644675578|
|17.99|43.72|1008.64|75.0

In [None]:
evaluator.evaluate(pipelineModel.transform(df))

4.557126016749488

## Save the pipeline to disk to persist the model

In [None]:
pipelineModel.save("/tmp/lr-pipeline")

In [None]:
!tree /tmp/lr-pipeline

/bin/bash: line 1: tree: command not found


### Load the persisted model from the disk

In [None]:
saved_model = PipelineModel.load("/tmp/lr-pipeline")
saved_model.stages[1].coefficients

DenseVector([-1.9775, -0.2339, 0.0621, -0.1581])

In [None]:
saved_model.transform(df).show()

+-----+-----+-------+-----+------+--------------------+------------------+
|   AT|    V|     AP|   RH|    PE|            features|        prediction|
+-----+-----+-------+-----+------+--------------------+------------------+
|14.96|41.76|1024.07|73.17|463.26|[14.96,41.76,1024...|467.26978995910457|
|25.18|62.96|1020.04|59.08|444.37|[25.18,62.96,1020...| 444.0773658973149|
| 5.11| 39.4|1012.16|92.14|488.56|[5.11,39.4,1012.1...|483.56264262613047|
|20.86|57.32|1010.24|76.64|446.48|[20.86,57.32,1010...| 450.5556682463947|
|10.82| 37.5|1009.23|96.62| 473.9|[10.82,37.5,1009....| 471.8254985835583|
|26.27|59.44|1012.23|58.77|443.67|[26.27,59.44,1012...| 442.3093914004435|
|15.89|43.96|1014.02|75.24|467.35|[15.89,43.96,1014...| 463.9649810634581|
| 9.48|44.71|1019.12|66.43|478.42|[9.48,44.71,1019....|478.17448242011676|
|14.64| 45.0|1021.78|41.25|475.98|[14.64,45.0,1021....| 472.0476219688437|
|11.74|43.56|1015.14|70.72| 477.5|[11.74,43.56,1015...| 473.0491644675578|
|17.99|43.72|1008.64|75.0

In [None]:
df_train, df_test = df.randomSplit(weights=[0.7, 0.3], seed = 200)

In [None]:
pipelineModel = pipeline.fit(df_train)
evaluator.evaluate(pipelineModel.transform(df_test))

4.510492498357932

# Tune the model

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

In [None]:
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

# In this case the estimator is simply the linear regression.
# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           trainRatio=0.8)

tuned_model = tvs.fit(vectorizer.transform(df_train))

In [None]:
tuned_model.bestModel, tuned_model.validationMetrics

(LinearRegressionModel: uid=LinearRegression_9b7f8e1ec74a, numFeatures=4,
 [5.30103355902185,
  5.635734128433439,
  5.353120042628339,
  4.735839375810344,
  4.735224914363956,
  4.736195578633287,
  5.2944493497484215,
  5.645263698389502,
  5.67343475927052,
  4.725452377771817,
  4.725402764469728,
  4.725370358220224])

In [None]:
df_test_pred = tuned_model.transform(vectorizer.transform(df_test))
df_test_pred.show()

+----+-----+-------+-----+------+--------------------+------------------+
|  AT|    V|     AP|   RH|    PE|            features|        prediction|
+----+-----+-------+-----+------+--------------------+------------------+
|2.34|39.42|1028.47|69.68|490.34|[2.34,39.42,1028....|493.24035735510785|
|2.71|39.42|1026.66|81.11| 489.3|[2.71,39.42,1026....|490.66936733478036|
|3.21|38.44|1017.11|84.86|492.93|[3.21,38.44,1017....| 488.7825633558698|
| 3.4|39.64| 1011.1|83.43|459.86|[3.4,39.64,1011.1...| 487.9941660622418|
|3.73|39.42| 1024.4|82.42|488.58|[3.73,39.42,1024....|488.33497257554006|
|3.91|35.47|1016.92|86.03|488.67|[3.91,35.47,1016....| 487.9093976824462|
|3.95|35.47|1017.36|84.88|488.64|[3.95,35.47,1017....| 488.0316938574985|
|3.98|35.47|1017.22|86.53|489.64|[3.98,35.47,1017....| 487.7136533545184|
| 4.0| 39.9|1009.64|97.16|490.79|[4.0,39.9,1009.64...|484.58222979309903|
|4.08|35.19|1018.87|97.07|489.44|[4.08,35.19,1018....|486.07689236094103|
|4.11|38.44| 1015.9|81.79|488.05|[4.11

In [None]:
evaluator.evaluate(df_test_pred)

4.511942298636096