In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [3]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [4]:
sc = SparkContext()

In [5]:
spark = SparkSession(sc)

### Read the dataset

In [6]:
import pandas as pd

In [7]:
path = 'Du_lieu_cung_cap/CCPP/Folds5x2_pp.xlsx'

In [8]:
df1 = pd.read_excel(path, 'Sheet1')
df2 = pd.read_excel(path, 'Sheet2')
df3 = pd.read_excel(path, 'Sheet3')
df4 = pd.read_excel(path, 'Sheet4')
df5 = pd.read_excel(path, 'Sheet5')

In [9]:
# concat 5 dataframes into 1
df = pd.concat([df1, df2, df3, df4, df5])
df.head()

Unnamed: 0,AT,V,AP,RH,PE
0,14.96,41.76,1024.07,73.17,463.26
1,25.18,62.96,1020.04,59.08,444.37
2,5.11,39.4,1012.16,92.14,488.56
3,20.86,57.32,1010.24,76.64,446.48
4,10.82,37.5,1009.23,96.62,473.9


In [10]:
# Convert pandas dataframe into pyspark dataframe
data = spark.createDataFrame(df)

In [11]:
data.show(10)

+-----+-----+-------+-----+------+
|   AT|    V|     AP|   RH|    PE|
+-----+-----+-------+-----+------+
|14.96|41.76|1024.07|73.17|463.26|
|25.18|62.96|1020.04|59.08|444.37|
| 5.11| 39.4|1012.16|92.14|488.56|
|20.86|57.32|1010.24|76.64|446.48|
|10.82| 37.5|1009.23|96.62| 473.9|
|26.27|59.44|1012.23|58.77|443.67|
|15.89|43.96|1014.02|75.24|467.35|
| 9.48|44.71|1019.12|66.43|478.42|
|14.64| 45.0|1021.78|41.25|475.98|
|11.74|43.56|1015.14|70.72| 477.5|
+-----+-----+-------+-----+------+
only showing top 10 rows



In [12]:
data.printSchema()

root
 |-- AT: double (nullable = true)
 |-- V: double (nullable = true)
 |-- AP: double (nullable = true)
 |-- RH: double (nullable = true)
 |-- PE: double (nullable = true)



In [13]:
data.count()

47840

### Data preprocessing

In [14]:
# Check null values
data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).toPandas()

Unnamed: 0,AT,V,AP,RH,PE
0,0,0,0,0,0


In [15]:
data.select([count(when(isnan(c), c)).alias(c) for c in data.columns]).toPandas()

Unnamed: 0,AT,V,AP,RH,PE
0,0,0,0,0,0


- Không có dữ liệu null, na

In [16]:
# Check duplicates
dup_rows = data.count() - data.distinct().count()
dup_rows

38313

- Có dữ liệu bị duplicated

In [17]:
# Drop duplicates
data = data.drop_duplicates()
data.count()

9527

In [18]:
data.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+
|summary|                AT|                 V|                AP|                RH|                PE|
+-------+------------------+------------------+------------------+------------------+------------------+
|  count|              9527|              9527|              9527|              9527|              9527|
|   mean|19.658225044610113| 54.29342080403093|1013.2370840768365| 73.33495119135094| 454.3359095203101|
| stddev| 7.444397078136751|12.686309009374407| 5.940526064757306|14.607513252912092|17.039080412855842|
|    min|              1.81|             25.36|            992.89|             25.56|            420.26|
|    max|             37.11|             81.56|            1033.3|            100.16|            495.76|
+-------+------------------+------------------+------------------+------------------+------------------+



### Chuyển đổi dữ liệu

In [19]:
data.columns

['AT', 'V', 'AP', 'RH', 'PE']

In [20]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [21]:
assembler = VectorAssembler(inputCols = ['AT', 'V', 'AP', 'RH'], 
                            outputCol = 'features') #input

In [22]:
data_pre = assembler.transform(data)

In [23]:
data_pre.show(2)

+-----+-----+-------+-----+------+--------------------+
|   AT|    V|     AP|   RH|    PE|            features|
+-----+-----+-------+-----+------+--------------------+
|24.54|60.29|1017.42|58.94|447.67|[24.54,60.29,1017...|
|10.59|42.49|1009.59|77.36|477.49|[10.59,42.49,1009...|
+-----+-----+-------+-----+------+--------------------+
only showing top 2 rows



### Scale the data

In [24]:
from pyspark.ml.feature import MinMaxScaler

In [25]:
scaler = MinMaxScaler(inputCol='features',
                        outputCol='scaledFeatures')

In [26]:
# Compute summary statistics by fitting the MinMaxScaler
scalerModel = scaler.fit(data_pre)

In [27]:
# Normalize each feature
final_data = scalerModel.transform(data_pre)

In [28]:
final_data = final_data.select('scaledFeatures', 'PE') # features: input, PE: output
final_data.count()

9527

In [29]:
final_data.show(5, False)

+-------------------------------------------------------------------------------+------+
|scaledFeatures                                                                 |PE    |
+-------------------------------------------------------------------------------+------+
|[0.6439093484419265,0.6215302491103202,0.6070279633754019,0.4474530831099195]  |447.67|
|[0.24872521246458926,0.3048042704626335,0.41326404355357726,0.6943699731903485]|477.49|
|[0.7050991501416431,0.7330960854092526,0.3073496659242754,0.6221179624664879]  |430.21|
|[0.5504249291784703,0.2902135231316726,0.4877505567928743,0.31782841823056307] |459.81|
|[0.7345609065155808,0.8793594306049821,0.42464736451373375,0.4726541554959786] |436.87|
+-------------------------------------------------------------------------------+------+
only showing top 5 rows



### Chia dữ liệu train-test

In [30]:
# split the dataset into training and test
train_data, test_data = final_data.randomSplit([0.7, 0.3])

In [31]:
train_data.describe().show()

+-------+------------------+
|summary|                PE|
+-------+------------------+
|  count|              6747|
|   mean| 454.2666236846001|
| stddev|17.057736601186757|
|    min|            421.57|
|    max|            495.24|
+-------+------------------+



In [32]:
test_data.describe().show()

+-------+------------------+
|summary|                PE|
+-------+------------------+
|  count|              2780|
|   mean|  454.504064748202|
| stddev|16.995606047612053|
|    min|            420.26|
|    max|            495.76|
+-------+------------------+



### Xây dựng model với Linear Regression

In [33]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import LinearRegressionModel

In [34]:
# Create a Linear Regression Model Object
lr = LinearRegression(featuresCol = 'scaledFeatures',
                      labelCol = 'PE',
                      predictionCol = 'prediction')

In [35]:
# Fit the model to the data and call this model lrModel
lrModel = lr.fit(train_data)

In [36]:
# Print the coefficients and intercept for linear regression
print('Coefficient: {} Intercept: {}'.format(lrModel.coefficients, lrModel.intercept))

Coefficient: [-69.8838695200798,-13.386757270656567,2.7094084542080004,-11.953899997472401] Intercept: 502.8295785137063


### Đánh giá model với test dataset

In [37]:
test_results = lrModel.evaluate(test_data)

In [38]:
test_results.residuals.show(5)

+-------------------+
|          residuals|
+-------------------+
| -2.974213976811029|
|-1.7616464110126344|
|-6.8253239372667736|
| -4.340587502754943|
|   3.86598344949374|
+-------------------+
only showing top 5 rows



In [39]:
# Check test dataset
test_model = lrModel.transform(test_data)

In [40]:
# Inspect results
test_model.select('prediction', 'PE').show(5)

+------------------+------+
|        prediction|    PE|
+------------------+------+
|493.52421397681104|490.55|
|491.06164641101265| 489.3|
| 489.4853239372668|482.66|
|489.54058750275493| 485.2|
|489.06401655050627|492.93|
+------------------+------+
only showing top 5 rows



In [41]:
print('RMSE: {}'.format(test_results.rootMeanSquaredError))
print('MSE: {}'.format(test_results.meanSquaredError))
print('r2: {}'.format(test_results.r2))

RMSE: 4.576219140079239
MSE: 20.941781618027573
r2: 0.9274735259822106


### Xây dựng model với Random Forest 

In [42]:
from pyspark.ml.regression import RandomForestRegressor

In [43]:
rfc = RandomForestRegressor(featuresCol='scaledFeatures',
                              labelCol='PE',
                              predictionCol='prediction')

In [44]:
rfc_model = rfc.fit(train_data)

### Đánh giá model với test dataset

In [45]:
# Check test dataset
rfc_test_model = rfc_model.transform(test_data)

In [46]:
# Inspect results
rfc_test_model.select('PE','prediction','scaledFeatures').show(3, False)

+------+-----------------+---------------------------------------------------------------------------------+
|PE    |prediction       |scaledFeatures                                                                   |
+------+-----------------+---------------------------------------------------------------------------------+
|490.55|483.4281860213131|[0.0,0.25017793594306054,0.8421182875525888,0.689142091152815]                   |
|489.3 |483.4281860213131|[0.0254957507082153,0.25017793594306054,0.8356842365751082,0.7446380697050938]   |
|482.66|482.984824594973 |[0.028045325779036824,0.25409252669039145,0.44840386043058694,0.7694369973190347]|
+------+-----------------+---------------------------------------------------------------------------------+
only showing top 3 rows



In [47]:
from pyspark.ml.evaluation import RegressionEvaluator

In [48]:
evaluator = RegressionEvaluator(labelCol='PE', 
                                predictionCol="prediction")

In [49]:
rmse = evaluator.evaluate(rfc_test_model, {evaluator.metricName:'rmse'})
mse = evaluator.evaluate(rfc_test_model, {evaluator.metricName:'mse'})
r2 = evaluator.evaluate(rfc_test_model, {evaluator.metricName:'r2'})

In [50]:
print('RMSE: ', rmse)
print('MSE: ', mse)
print('R2: ', r2)

RMSE:  4.372828993073071
MSE:  19.121633402660446
R2:  0.9337771411501153


- Random Forest model has higher R2 ~ 0.93 than Linear Regression model ~0.92 => Random Forest might be a better model