# Machine Learning with Spark 
---

## 1. Data preparation

In [1]:
import numpy as np
import pandas as pd

import seaborn as sns
import matplotlib.pyplot as plt

pd.options.display.max_columns = 50

import warnings
warnings.filterwarnings("ignore")

In [2]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import *
#from pyspark.sql.functions import lit, col
from pyspark.sql.types import DoubleType

Let's see what our dataset looks like

In [3]:
# creation of a spark session
spk_sess = SparkSession \
    .builder \
    .appName("_Project_Spark_App") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# load csv file in a DF and show first lines
df = spk_sess.read.csv("./solar_generation_by_station.csv", header=True, sep=",");

Creation of a dataframe with time step corresponding to measures in our DF

In [4]:
def generate_series(start, stop, interval):
    """
    :param start  - lower bound, inclusive
    :param stop   - upper bound, exclusive
    :interval int - increment interval in seconds
    """

    # Determine start and stops in epoch seconds
    start, stop = spk_sess.createDataFrame([(start, stop)], ("start", "stop")) \
                        .select([col(c).cast("timestamp") \
                        .cast("long") for c in ("start", "stop")]) \
                        .first()
    # Create range with increments and cast to timestamp
    return spk_sess.range(start, stop, interval) \
                .select(col("id").cast("timestamp").alias("value"))


# credits : https://stackoverflow.com/questions/43141671/sparksql-on-pyspark-how-to-generate-time-series
dt_gen = generate_series("1986-01-01", "2016-01-01", 60 * 60) # By hour, by day use 60 * 60 * 24

# from pyspark.sql.functions import monotonically_increasing_id
# The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive :
# dt_gen = dt_gen.withColumn("index", monotonically_increasing_id())
# an other solution consist in dt_gen = dt_gen.withColumn('index', row_number()) or with zipWithIndex()
pandas_df = dt_gen.toPandas()
pandas_df['idx'] = pandas_df.index +1
dt_gen = spk_sess.createDataFrame(pandas_df)

del pandas_df

Let's verify the 2 DF have the same lenght, then join them

In [5]:
df.count(), dt_gen.count()

(262968, 262968)

In [6]:
df = df.join(dt_gen, df.time_step == dt_gen.idx)
df.select('time_step', 'idx', 'value', 'AT11', 'AT12', 'FR10').show(10)

+---------+---+-------------------+-------+-------------------+--------------------+
|time_step|idx|              value|   AT11|               AT12|                FR10|
+---------+---+-------------------+-------+-------------------+--------------------+
|        1|  1|1986-01-01 00:00:00|    0.0|                0.0|                 0.0|
|        2|  2|1986-01-01 01:00:00|    0.0|                0.0|                 0.0|
|        3|  3|1986-01-01 02:00:00|    0.0|                0.0|                 0.0|
|        4|  4|1986-01-01 03:00:00|    0.0|                0.0|                 0.0|
|        5|  5|1986-01-01 04:00:00|    0.0|                0.0|                 0.0|
|        6|  6|1986-01-01 05:00:00|    0.0|                0.0|                 0.0|
|        7|  7|1986-01-01 06:00:00|    0.0|                0.0|                 0.0|
|        8|  8|1986-01-01 07:00:00|    0.0|                0.0|                 0.0|
|        9|  9|1986-01-01 08:00:00|0.13127|0.08148999999999999|  

In [7]:
# drop useless cols
df = df.drop('time_step', 'index')

# keep only columns relatives to france
col_fr = [c for c in df.columns if 'FR' in c]
col_fr.append('value')
df = df.select(col_fr)

In [8]:
# rename the time_step col
df = df.withColumnRenamed("value", "date_time")

# change cols types
for c in df.columns:
    if c != 'date_time':
        df = df.withColumn(c, df[c].cast(DoubleType()))

There isn't any missing values :

In [9]:
#from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c), c)).alias(c) for c in df.columns if c != 'date_time']).show()

+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|FR42|FR61|FR72|FR25|FR26|FR52|FR24|FR21|FR83|FR43|FR23|FR10|FR81|FR63|FR41|FR62|FR30|FR51|FR22|FR53|FR82|FR71|
+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|
+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+



In [10]:
# drop na values if needed / not the case here
df = df.na.drop()

In the case of NaNs, here is a usefull link to [deal with missing values](https://fr.coursera.org/lecture/big-data-machine-learning/handling-missing-values-in-spark-Goh1z). Now, we've to add few columns with the date time infos.

In [11]:
df = df.withColumn("year", year(df.date_time).alias('year')) \
    .withColumn("month", month(df.date_time).alias('month')) \
    .withColumn("day_of_year", dayofyear(df.date_time).alias('day_of_year')) \
    .withColumn("hour", hour(df.date_time).alias('hour'))

df.select('FR42', 'date_time', 'month', 'day_of_year', 'hour').orderBy('date_time').show(23)

+-------------------+-------------------+-----+-----------+----+
|               FR42|          date_time|month|day_of_year|hour|
+-------------------+-------------------+-----+-----------+----+
|                0.0|1986-01-01 00:00:00|    1|          1|   0|
|                0.0|1986-01-01 01:00:00|    1|          1|   1|
|                0.0|1986-01-01 02:00:00|    1|          1|   2|
|                0.0|1986-01-01 03:00:00|    1|          1|   3|
|                0.0|1986-01-01 04:00:00|    1|          1|   4|
|                0.0|1986-01-01 05:00:00|    1|          1|   5|
|                0.0|1986-01-01 06:00:00|    1|          1|   6|
|                0.0|1986-01-01 07:00:00|    1|          1|   7|
|            0.05205|1986-01-01 08:00:00|    1|          1|   8|
|0.18700999999999998|1986-01-01 09:00:00|    1|          1|   9|
|            0.30285|1986-01-01 10:00:00|    1|          1|  10|
|            0.21996|1986-01-01 11:00:00|    1|          1|  11|
|            0.13234|1986

---

# 2. Predictions with various ML models

## 2.1 Metric

RMSE measures the differences between predicted values by the model and the actual values. However, RMSE alone is meaningless until we compare with the actual “MV” value, such as mean, min and max. After such comparison, our RMSE looks pretty good.

Basic stats

In [12]:
df.select('FR10', 'FR22', 'FR23', 'FR24', 'FR25').describe().show()

+-------+-------------------+-------------------+-------------------+-------------------+-------------------+
|summary|               FR10|               FR22|               FR23|               FR24|               FR25|
+-------+-------------------+-------------------+-------------------+-------------------+-------------------+
|  count|             262968|             262968|             262968|             262968|             262968|
|   mean|0.13080652623132855| 0.1259932192129841|0.12627776257947732| 0.1360888294012959|0.12797964170545473|
| stddev| 0.2084071774531926|0.20127526900909207|0.20220008711163703|0.20982899870933316|0.20146804261326595|
|    min|                0.0|                0.0|                0.0|                0.0|                0.0|
|    max|            0.91125| 0.9161299999999999|            0.92315| 0.9179700000000001|            0.92156|
+-------+-------------------+-------------------+-------------------+-------------------+-------------------+



In [13]:
df.groupBy("hour").agg(mean('FR10').alias('avg solar efficiency'), count('FR10')).sort('hour', ascending=True).show(24)

+----+--------------------+-----------+
|hour|avg solar efficiency|count(FR10)|
+----+--------------------+-----------+
|   0|                 0.0|      10957|
|   1|                 0.0|      10957|
|   2|                 0.0|      10957|
|   3|                 0.0|      10957|
|   4|3.606552888564386...|      10957|
|   5|0.004788690334945695|      10957|
|   6|0.029255778954093294|      10957|
|   7| 0.10150853974628092|      10957|
|   8| 0.20717653281007584|      10957|
|   9| 0.31310120105868405|      10957|
|  10| 0.37807000730126866|      10957|
|  11|  0.4175870010039244|      10957|
|  12|  0.4211149110157888|      10957|
|  13|  0.3987946728119013|      10957|
|  14| 0.34350144656384035|      10957|
|  15| 0.27063141735876617|      10957|
|  16|  0.1663934708405586|      10957|
|  17| 0.06960151775120929|      10957|
|  18|0.014985195765264213|      10957|
|  19|0.002810180706397738|      10957|
|  20|                 0.0|      10957|
|  21|                 0.0|      10957|


During the 4th hour there is a weird value of 3.60, because efficiency can't be above 1...

## 2.2 Data preparation

Prepare data for Machine Learning. We need two columns only — features (date time infos) and target (“FR10”):

In [33]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, GBTRegressor, RandomForestRegressor, IsotonicRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [15]:
vectorAssembler = VectorAssembler(inputCols = ['hour', 'month', 'day_of_year', 'hour'], outputCol = 'features')
vect_df = vectorAssembler.transform(df)
vect_df = vect_df.select(['features', 'FR10'])

Split data into two disctinct sets for training and testing purposes. Here we don't split randomly because this method doesn't make sense for time series : 

In [16]:
splits = vect_df.randomSplit([0.8, 0.2])
train_df = splits[0]
test_df = splits[1]
(train_df.count(), len(train_df.columns)), (test_df.count(), len(test_df.columns))

((209883, 2), (53085, 2))

Instead we'll keep the last month to test our model, and the rest of the data is used to train it

In [17]:
df = df.select('date_time', 'year', 'month', 'day_of_year', 'hour', 'FR10')
#df.write.csv('data_clean.csv')

#df.createOrReplaceTempView("test")
#df3 = spk_sess.sql("select * from test")
#df3.show()

df = df.toPandas()
df = df[df.year > 2006]
train_df, test_df = df[~((df.year == 2015) & (df.month == 12))], df[(df.year == 2015) & (df.month == 12)]
train_df, test_df = spk_sess.createDataFrame(train_df), spk_sess.createDataFrame(test_df)
(len(train_df.columns), train_df.count()), (len(test_df.columns), test_df.count())

((6, 78144), (6, 744))

When i try to use df.filter(df.year > 2006) the following error occurs :

Py4JJavaError: An error occurred while calling o649.collectToPython.  : java.lang.OutOfMemoryError: GC overhead limit exceeded

In [18]:
def vectorize_df(_df):
    vectorAssembler = VectorAssembler(inputCols = ['hour', 'month', 'day_of_year'], outputCol = 'features')
    return vectorAssembler.transform(_df).select(['features', 'FR10'])

train_df, test_df = vectorize_df(train_df), vectorize_df(test_df)
(len(train_df.columns), train_df.count()), (len(test_df.columns), test_df.count())

((2, 78144), (2, 744))

In [19]:
train_df.show(5)

+---------------+-------+
|       features|   FR10|
+---------------+-------+
| [11.0,1.0,6.0]|0.04745|
|[15.0,1.0,14.0]|0.19483|
| [4.0,1.0,24.0]|    0.0|
| [2.0,1.0,25.0]|    0.0|
|[20.0,2.0,39.0]|    0.0|
+---------------+-------+
only showing top 5 rows



## 2.3 Linear Regression

In [20]:
lr = LinearRegression(featuresCol = 'features', labelCol='FR10', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [0.0,0.0,0.0]
Intercept: 0.13186472448300593


Summarize the model over the training set and print out some metrics:

In [21]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 0.209429
r2: 0.000000


In [22]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","FR10","features").show(5)
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="FR10",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+-------------------+--------------------+-----------------+
|         prediction|                FR10|         features|
+-------------------+--------------------+-----------------+
|0.13186472448300593|0.029639999999999996| [8.0,12.0,336.0]|
|0.13186472448300593|             0.39233|[10.0,12.0,339.0]|
|0.13186472448300593|                 0.0| [2.0,12.0,347.0]|
|0.13186472448300593|                 0.0|[16.0,12.0,350.0]|
|0.13186472448300593|                 0.0| [2.0,12.0,360.0]|
+-------------------+--------------------+-----------------+
only showing top 5 rows

R Squared (R2) on test data = -0.22257


In [23]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 0.149467


In [24]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 1
objectiveHistory: [0.5]
+--------------------+
|           residuals|
+--------------------+
|-0.08441472448300594|
| 0.06296527551699407|
|-0.13186472448300593|
|-0.13186472448300593|
|-0.13186472448300593|
|-0.11619472448300593|
|-0.13186472448300593|
|-0.13186472448300593|
|  0.3843552755169941|
|-0.13186472448300593|
|-0.13186472448300593|
|  0.6594952755169942|
|-0.13009472448300594|
|-0.13186472448300593|
|-0.13186472448300593|
|   0.539675275516994|
| 0.16819527551699406|
|0.027705275516994088|
|-0.13186472448300593|
| 0.09031527551699409|
+--------------------+
only showing top 20 rows



Using our Linear Regression model to make some predictions:

In [25]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","FR10","features").show()

+-------------------+--------------------+-----------------+
|         prediction|                FR10|         features|
+-------------------+--------------------+-----------------+
|0.13186472448300593|0.029639999999999996| [8.0,12.0,336.0]|
|0.13186472448300593|             0.39233|[10.0,12.0,339.0]|
|0.13186472448300593|                 0.0| [2.0,12.0,347.0]|
|0.13186472448300593|                 0.0|[16.0,12.0,350.0]|
|0.13186472448300593|                 0.0| [2.0,12.0,360.0]|
|0.13186472448300593|                 0.0|[23.0,12.0,336.0]|
|0.13186472448300593|                 0.0|[20.0,12.0,345.0]|
|0.13186472448300593|             0.01671|[15.0,12.0,347.0]|
|0.13186472448300593|                 0.0| [1.0,12.0,353.0]|
|0.13186472448300593| 0.12824000000000002|[12.0,12.0,335.0]|
|0.13186472448300593|                 0.0| [7.0,12.0,359.0]|
|0.13186472448300593| 0.05192000000000001|[15.0,12.0,343.0]|
|0.13186472448300593|                 0.0|[20.0,12.0,343.0]|
|0.13186472448300593|   

## 2.4 Decision tree regression

In [26]:
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'FR10')
dt_model = dt.fit(train_df)

dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(labelCol="FR10", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.0839109


Feature Importance

In [27]:
dt_model.featureImportances

SparseVector(3, {0: 0.8302, 1: 0.1066, 2: 0.0632})

## 2.5 Random Forrest regression

In [34]:
rf = RandomForestRegressor(featuresCol = 'features', labelCol = 'FR10')
rf_model = rf.fit(train_df)
rf_predictions = rf_model.transform(test_df)
rf_predictions.select('prediction', 'FR10', 'features').show(5)

+--------------------+--------------------+-----------------+
|          prediction|                FR10|         features|
+--------------------+--------------------+-----------------+
|   0.047394335587558|0.029639999999999996| [8.0,12.0,336.0]|
| 0.13750131879665933|             0.39233|[10.0,12.0,339.0]|
|0.028248235424898925|                 0.0| [2.0,12.0,347.0]|
|0.019594284593185803|                 0.0|[16.0,12.0,350.0]|
|0.022353252692110394|                 0.0| [2.0,12.0,360.0]|
+--------------------+--------------------+-----------------+
only showing top 5 rows



In [35]:
rf_evaluator = RegressionEvaluator(labelCol="FR10", predictionCol="prediction", metricName="rmse")
rmse = rf_evaluator.evaluate(rf_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.102336


## 2.6 Gradient-boosted tree regression

In [28]:
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'FR10', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'FR10', 'features').show(5)

+--------------------+--------------------+-----------------+
|          prediction|                FR10|         features|
+--------------------+--------------------+-----------------+
| 0.07350410135473669|0.029639999999999996| [8.0,12.0,336.0]|
|  0.2154570770400125|             0.39233|[10.0,12.0,339.0]|
|-0.00126914225566...|                 0.0| [2.0,12.0,347.0]|
|-0.00462768737890...|                 0.0|[16.0,12.0,350.0]|
|-0.00126914225566...|                 0.0| [2.0,12.0,360.0]|
+--------------------+--------------------+-----------------+
only showing top 5 rows



In [29]:
gbt_evaluator = RegressionEvaluator(labelCol="FR10", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.0810609


## 2.7 Survival regression

In [30]:
# Trains an isotonic regression model.
iso = IsotonicRegression(featuresCol = 'features', labelCol = 'FR10')

iso_model = iso.fit(train_df)
print("Boundaries in increasing order: %s\n" % str(iso_model.boundaries))
print("Predictions associated with the boundaries: %s\n" % str(iso_model.predictions))

Boundaries in increasing order: [0.0,4.0,4.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0

In [31]:
# Makes predictions.
iso_predictions = iso_model.transform(test_df)
iso_predictions.select('prediction', 'FR10', 'features').show(5)

+-------------------+--------------------+-----------------+
|         prediction|                FR10|         features|
+-------------------+--------------------+-----------------+
|            0.15222|0.029639999999999996| [8.0,12.0,336.0]|
|0.19101931436142883|             0.39233|[10.0,12.0,339.0]|
|                0.0|                 0.0| [2.0,12.0,347.0]|
|0.19101931436142883|                 0.0|[16.0,12.0,350.0]|
|                0.0|                 0.0| [2.0,12.0,360.0]|
+-------------------+--------------------+-----------------+
only showing top 5 rows



In [32]:
iso_evaluator = RegressionEvaluator(labelCol="FR10", predictionCol="prediction", metricName="rmse")
rmse = iso_evaluator.evaluate(iso_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.144906


---

# Conclusions

Root Mean Squared Errors (RMSE) on test data 
- linear regression : 0.149
- decision tree regression : 0.084
- random forrest regression : 0.102
- survival regression : 0.145