In [1]:
import sys
import os
import os.path

from pyspark import SparkContext

##########################################################################################################
# SPARK CONTEXT INITIALIZATION
# Something like this is required if you want to use SPARK in Windows. In linux/ubuntu would be similar
SPARK_HOME = """C:/Users/Karolina/Downloads/spark-2.4.0-bin-hadoop2.7""" #CHANGE THIS PATH TO YOURS!
sys.path.append(os.path.join(SPARK_HOME, "python", "lib", "py4j-0.10.7-src.zip")) #BEWARE WITH py4j version!!
sys.path.append(os.path.join(SPARK_HOME, "python", "lib", "pyspark.zip"))
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
sc = spark.sparkContext
##########################################################################################################

# This is the spark context
print(spark)
print(sc)

In [2]:
wind_sd = spark.read.csv(path="/FileStore/tables/wind.csv", header=True, inferSchema=True)

In [3]:
# First remove the attributes that cannot be used for prediction
# For now, we will keep 'year' here, as it will need to be used for splitting the data into train, validation and test sets
wind_sd = wind_sd.drop('steps')
wind_sd = wind_sd.drop('month')
wind_sd = wind_sd.drop('day')
wind_sd = wind_sd.drop('hour')

In [4]:
# Then only select attributes containing 'u100', 'v100', 'u10', 'v10', 'ienss' and 'iews'
# 'energy' is kept as it is response, 'year' kept as explained above
selected = []
for col in wind_sd.columns:
    if(('energy' in col) or('year' in col) or ('u100' in col) or ('v100' in col) or ('u10' in col) or ('v10' in col) or ('ienss' in col) or ('iews' in col)):
    selected.append(col)

wind_sd = wind_sd.select(*selected)

In [5]:
# Now split the data into training, validation and test sets
trainingData_sd = wind_sd.filter("2005 <= year and year <= 2006")
validationData_sd = wind_sd.filter("2007 <= year and year <= 2008")
testData_sd = wind_sd.filter("2009 <= year and year <= 2010")

In [6]:
# Then remove the 'year' column from all datasets:
trainingData_sd = trainingData_sd.drop('year')
validationData_sd = validationData_sd.drop('year')
testData_sd = testData_sd.drop('year')

In [7]:
# Prepare the dataframes for ML use
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler

# First rename 'energy' variable to 'label' in order to be compatible with spark ML libraries
trainingData_sd = trainingData_sd.withColumnRenamed("energy", "label")
validationData_sd = validationData_sd.withColumnRenamed("energy", "label")
testData_sd = testData_sd.withColumnRenamed("energy", "label")

ignore = ['label']

# First for training data frame
training_assembler = VectorAssembler(
    inputCols=[x for x in trainingData_sd.columns if x not in ignore],
    outputCol='features')

trainingData_sd = training_assembler.transform(trainingData_sd).select(['label', 'features'])

# Then for validation data frame
validation_assembler = VectorAssembler(
    inputCols=[x for x in validationData_sd.columns if x not in ignore],
    outputCol='features')

validationData_sd = validation_assembler.transform(validationData_sd).select(['label', 'features'])

# Lastly for testing data frame
test_assembler = VectorAssembler(
    inputCols=[x for x in testData_sd.columns if x not in ignore],
    outputCol='features')

testData_sd = test_assembler.transform(testData_sd).select(['label', 'features'])

In [8]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Define regression evaluator that will be used to compare predictions using the value of MAE for different models
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")

# Train and test multiple decision trees to find the optimal max depth
for i in range(0, 11):
    dt = DecisionTreeRegressor(maxDepth=i)
    dt = dt.fit(trainingData_sd)
    predictors_dt = dt.transform(validationData_sd)
    mae_dt = evaluator.evaluate(predictors_dt)
    print("MAE for k={} is {}".format(i, mae_dt))
    # In this case the optimal maximum depth is 6 as it returns the lowest MAE value

In [9]:
# Combine training data and validation data into one set
combined_sd = trainingData_sd.union(validationData_sd)

# Train the model with train and validation data using optimal depth = 6
dt = DecisionTreeRegressor(maxDepth=6)
dt = dt.fit(combined_sd)

# Evaluate the model using the test data
predictors_sd = dt.transform(testData_sd)
mae = evaluator.evaluate(predictors_sd)
print("MAE with optimal depth = 6 is: {}".format(mae))

In [10]:
# First we try the original way of calculating optimal number of PCAs in a loop, evaluating cases one by one subsequently, however, this way of doing things is not optimal since it includes redundant recalculation of PCAs in every loop
for i in range(2,11):
    pca0 = PCA(k=i, inputCol="features")
    dt0 = DecisionTreeRegressor(maxDepth=i)
    ppl0 = Pipeline(stages=[pca0, dt0])
    model0 = ppl0.fit(trainingData_sd)
    predictions0 = model0.transform(validationData_sd)
    mae0 = evaluator.evaluate(predictions0)
    print("MAE for k={} is: {}".format(i, mae0))
  # This returns the optimal k=6

In [11]:
import numpy as np
from numpy.linalg import eigh

# The method for finding the optimal number of PCAs without recalculating them every time, is by calculating the variance explained by the first k components

# This has been done using the main ideas found here: https://stackoverflow.com/questions/33428589/pyspark-and-pca-how-can-i-extract-the-eigenvectors-of-this-pca-how-can-i-calcu/33481471

# Compute the covariance matrix for a given data frame
def estimateCovariance(df):
    m = df.select(df['features']).rdd.map(lambda x: x[0]).mean()
    dfZeroMean = df.select(df['features']).rdd.map(lambda x:   x[0]).map(lambda x: x-m)
    return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()

# Compute the top k PCAs and corresponding eigenvalues
def pca(df, k=2):
    cov = estimateCovariance(df)
    col = cov.shape[1]
    eigVals, eigVecs = eigh(cov)
    inds = np.argsort(eigVals)
    eigVecs = eigVecs.T[inds[-1:-(col+1):-1]]  
    components = eigVecs[0:k]
    eigVals = eigVals[inds[-1:-(col+1):-1]]  # sort eigenvals
    return components.T, eigVals

# Calculate the fraction of variance explained by the top k eigenvectors
def varianceExplained(eigenvalues, k=1):
    for i in range(1, k+1):
        print("Variance explained for k={} is {}".format(i, np.sum(eigenvalues[0:i])/np.sum(eigenvalues)))

In [12]:
# As previously, test upto a maximum of 10 PCAs
k = 10
components, eigenvalues = pca(trainingData_sd, k)
varianceExplained(eigenvalues, k)
# 3 PCAs are enough to explain more than 99% of variance

In [13]:
from pyspark.ml.feature import PCA
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml import Pipeline

#Using k=3 as from the method regarding variance explained
final_pca = PCA(k=3, inputCol="features")
final_dt = DecisionTreeRegressor(maxDepth=6,
                                 featuresCol=final_pca.getOutputCol(),
                                 labelCol="label")

ppl = Pipeline(stages=[final_pca, final_dt])

final_model = ppl.fit(combined_sd)

final_predictions = final_model.transform(testData_sd)

final_mae = evaluator.evaluate(final_predictions)

print("MAE using only 3 PCAs: {}".format(final_mae))

In [14]:
# Using k=6 as from the loop
final_pca = PCA(k=6, inputCol="features")
final_dt = DecisionTreeRegressor(maxDepth=6,
                                 featuresCol=final_pca.getOutputCol(),
                                 labelCol="label")

ppl = Pipeline(stages=[final_pca, final_dt])

final_model = ppl.fit(combined_sd)

final_predictions = final_model.transform(testData_sd)

final_mae = evaluator.evaluate(final_predictions)

print("MAE using only 6 PCAs: {}".format(final_mae))

In [15]:
# In the end, it appears that using 3 PCAs gives better results than with 6 PCAs, and also, the MAE score is not that much higher than using all PCAs - 322 compared with only using 3 - 339, but this is reducing the complexity of the model from 175 PCAs to only using 3.