### Read libraries

In [1]:
import numpy as np
import pandas as pd
import pyspark
import os
import urllib
import sys

from pyspark.sql.functions import *
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml.feature import *


In [2]:
spark = pyspark.sql.SparkSession.builder.appName('ccpp').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/15 07:02:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Read Data

In [3]:
pp_df = spark.read.csv("./ccpp.csv", header=True, inferSchema=True)
pp_df.show(5)


+-----+-----+-------+-----+------+
|   AT|    V|     AP|   RH|    PE|
+-----+-----+-------+-----+------+
|14.96|41.76|1024.07|73.17|463.26|
|25.18|62.96|1020.04|59.08|444.37|
| 5.11| 39.4|1012.16|92.14|488.56|
|20.86|57.32|1010.24|76.64|446.48|
|10.82| 37.5|1009.23|96.62| 473.9|
+-----+-----+-------+-----+------+
only showing top 5 rows



### Data preprocessing

In [4]:
data = pp_df

In [5]:
# vectorize all numerical columns into a single feature column
feature_cols = data.columns[:-1]
assembler = VectorAssembler(inputCols=feature_cols, 
                            outputCol='features')
data = assembler.transform(data)

# only select the features and label column
data = data.select(['features', 'PE'])
print("Reading for machine learning")
data.show(10)

Reading for machine learning
+--------------------+------+
|            features|    PE|
+--------------------+------+
|[14.96,41.76,1024...|463.26|
|[25.18,62.96,1020...|444.37|
|[5.11,39.4,1012.1...|488.56|
|[20.86,57.32,1010...|446.48|
|[10.82,37.5,1009....| 473.9|
|[26.27,59.44,1012...|443.67|
|[15.89,43.96,1014...|467.35|
|[9.48,44.71,1019....|478.42|
|[14.64,45.0,1021....|475.98|
|[11.74,43.56,1015...| 477.5|
+--------------------+------+
only showing top 10 rows



In [6]:
data.take(1)

[Row(features=DenseVector([14.96, 41.76, 1024.07, 73.17]), PE=463.26)]

### Data Splitting

In [7]:
# use Linear Regression to train on the training set
train, test = data.randomSplit([0.70, 0.30])

train.show(5)
test.show(5)

train.count(), test.count()

+--------------------+------+
|            features|    PE|
+--------------------+------+
|[1.81,39.42,1026....|490.55|
|[2.34,39.42,1028....|490.34|
|[2.58,39.42,1028....|488.69|
|[2.64,39.64,1011....|481.29|
|[2.71,39.42,1026....| 489.3|
+--------------------+------+
only showing top 5 rows

+--------------------+------+
|            features|    PE|
+--------------------+------+
|[3.0,39.64,1011.0...| 485.2|
|[3.2,41.31,997.67...|489.86|
|[3.38,39.64,1011....|488.92|
|[3.68,39.64,1011....|490.02|
|[3.85,35.47,1016....|489.78|
+--------------------+------+
only showing top 5 rows



(6650, 2918)

## Linear Regression

### Model Building

In [9]:
lr = LinearRegression(featuresCol= 'features',
                      labelCol= 'PE',)
model = lr.fit(train)


# predict on the test set
prediction = model.transform(test)
print("Prediction")
prediction.show(10)

22/08/15 07:02:33 WARN Instrumentation: [20cf86bb] regParam is zero, which might cause numerical instability and overfitting.
22/08/15 07:02:33 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/08/15 07:02:33 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/08/15 07:02:33 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
Prediction
+--------------------+------+------------------+
|            features|    PE|        prediction|
+--------------------+------+------------------+
|[3.0,39.64,1011.0...| 485.2|489.35044589894005|
|[3.2,41.31,997.67...|489.86|484.71951680052683|
|[3.38,39.64,1011....|488.92|488.43619711472627|
|[3.68,39.64,1011....|490.02| 487.4299222915509|
|[3.85,35.47,1016....|489.78| 488.2727138654674|
|[3.94,39.9,1008.0...|488.81|  484.543952744429|
|[3.95,35.47,1017....|488.64| 488.1839494207284|
|[3

In [10]:
model.coefficients, model.intercept

(DenseVector([-1.9654, -0.236, 0.0709, -0.155]), 445.3338439190661)

### Model Evaluation

In [12]:
model.summary.rootMeanSquaredError

4.580422551341466

In [13]:
model.summary.meanAbsoluteError

3.6241914393767862

### Model pickle

In [14]:
model.save('lin_reg.sav')

                                                                                

## Decision Tree Regression

In [17]:
DT = DecisionTreeRegressor(featuresCol= 'features',
                      labelCol= 'PE',)
DTmodel = DT.fit(train)


# predict on the test set
prediction = DTmodel.transform(test)
print("Prediction")
prediction.show(10)



Prediction
+--------------------+------+------------------+
|            features|    PE|        prediction|
+--------------------+------+------------------+
|[3.0,39.64,1011.0...| 485.2|         485.69716|
|[3.2,41.31,997.67...|489.86|482.68819277108435|
|[3.38,39.64,1011....|488.92|         485.69716|
|[3.68,39.64,1011....|490.02|         485.69716|
|[3.85,35.47,1016....|489.78|         485.69716|
|[3.94,39.9,1008.0...|488.81|         485.69716|
|[3.95,35.47,1017....|488.64|         485.69716|
|[3.95,38.44,1016....|492.46|         485.69716|
|[3.98,35.47,1017....|489.64|         485.69716|
|[4.0,39.9,1009.64...|490.79|         485.69716|
+--------------------+------+------------------+
only showing top 10 rows



In [19]:
dt_evaluator = RegressionEvaluator(labelCol="PE",
                                  predictionCol="prediction",
                                  metricName="rmse")

rmse = dt_evaluator.evaluate(prediction)
rmse

4.4277655784439665

In [20]:

DTmodel.save('DecTree_reg.sav')

## Gradient Boosted Tree Regression

In [21]:
GBT = GBTRegressor(featuresCol= 'features',
                      labelCol= 'PE',)
GBTmodel = GBT.fit(train)


# predict on the test set
prediction = GBTmodel.transform(test)
print("Prediction")
prediction.show(10)


Prediction
+--------------------+------+------------------+
|            features|    PE|        prediction|
+--------------------+------+------------------+
|[3.0,39.64,1011.0...| 485.2| 487.1838344408074|
|[3.2,41.31,997.67...|489.86| 486.3234691636766|
|[3.38,39.64,1011....|488.92| 487.1838344408074|
|[3.68,39.64,1011....|490.02| 483.3398164302246|
|[3.85,35.47,1016....|489.78|488.15027450787096|
|[3.94,39.9,1008.0...|488.81| 489.2199963474435|
|[3.95,35.47,1017....|488.64| 487.6059601289539|
|[3.95,38.44,1016....|492.46| 486.4251068055292|
|[3.98,35.47,1017....|489.64| 488.1241399642476|
|[4.0,39.9,1009.64...|490.79| 487.0919093846457|
+--------------------+------+------------------+
only showing top 10 rows



In [22]:
gbt_evaluator = RegressionEvaluator(labelCol="PE",
                                  predictionCol="prediction",
                                  metricName="rmse")

gbtrmse = gbt_evaluator.evaluate(prediction)
gbtrmse

3.99004897177047

In [24]:

GBTmodel.save('GBTree_reg.sav')