## PCA with PySpark

In [2]:
# Load dataframe
wind_sd = spark.read.csv(path="/FileStore/tables/wind.csv", header=True, inferSchema=True)

We are going to select only the columns whose names contain 'u100', 'v100', 'u10', 'v10', 'inss', 'iews', the output and 'year' (to perform the train, validation and test split). Note that if a column name contains the substring 'u100' it automatically contains 'u10' so the condition is simpler. 

After that we will store the final dataframe in cache because, by omission, Spark does not persist dataframes.

In [4]:
# Keep columns which contain 'u100', 'v100', 'u10', 'v10', 'inss', 'iews', the output and 'year'
my_list = wind_sd.columns
selected_columns = [col for col in my_list if ('u10' in col)| ('v10' in col) | ('inss' in col) | ('iews' in col) | ('year'== col) | ('energy' == col)]
wind_sd = wind_sd.select(*selected_columns).cache()

Once we have the smaller dataset with 200 columns, we divide it into train, validation and test. In addition, we are going to create a dataframe with train and validation all together. If storage space was an issue, we would create only the trainAndVal dataframe and split it right before PCA.

Since 'year' is no longer an usefull attribute, we get rid of it.

In [6]:
# Create train, validation and test datafames
train = wind_sd.filter(wind_sd.year <= 2006).drop('year')
val = wind_sd.filter((wind_sd.year == 2007) | (wind_sd.year == 2008)).drop('year')
test = wind_sd.filter((wind_sd.year == 2009) | (wind_sd.year == 2010)).drop('year')
trainAndVal = wind_sd.filter(wind_sd.year <= 2008).drop('year')

Before using our dataframes we need to convert them into 2 columns dataframes: one with the input attributes and the other with the output class.

In [8]:
# Cast to vectors so that PCA can work with our data
from pyspark.ml.feature import VectorAssembler

ignore = ['energy']

assembler = VectorAssembler(
    inputCols=[x for x in train.columns if x not in ignore],
    outputCol='features')

train_vec = assembler.transform(train).select(['energy', 'features'])
val_vec = assembler.transform(val).select(['energy', 'features'])
test_vec = assembler.transform(test).select(['energy', 'features'])
trainAndVal_vec = assembler.transform(trainAndVal).select(['energy', 'features'])

The goal now is to compare the performance of different maxDepths in a regression tree. We could compute the PCA in every iteration of the loop, but this is not efficient. Then, the data is transformed to a PC space before the *for* loop.

Note that the trainAndVal_pca and test_pca are computed with another pca model, since in this scenario we include information from train and validation in the training!

In [10]:
# Perform PCA
from pyspark.ml.feature import PCA

pca = PCA(k=200, inputCol="features", outputCol = "pca_features")
mdl_pca = pca.fit(train_vec)
train_pca = mdl_pca.transform(train_vec)
val_pca = mdl_pca.transform(val_vec)

mdl_pca_trainAndVal = pca.fit(trainAndVal_vec)
trainAndVal_pca = mdl_pca_trainAndVal.transform(trainAndVal_vec)
test_pca = mdl_pca_trainAndVal.transform(test_vec)

mdl_pca.explainedVariance

Now, we will test different values of maxDepth and check the Mean Absolute Error with the validation partition. The best maxDepth value will be stored and used in a final model.

In [12]:
# Find best maxDepth
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

best_mae = 1000
final_maxDepth = 0
evaluator = RegressionEvaluator(labelCol="energy", predictionCol="prediction", metricName="mae")
  
for maxDepth in range(1, 12):
  print("maxDepth: ", maxDepth)
  dt = DecisionTreeRegressor(featuresCol='pca_features',
                           labelCol="energy",
                           maxDepth = maxDepth)
  mdl = dt.fit(train_pca)
  pred_sd = mdl.transform(val_pca)
  mae = evaluator.evaluate(pred_sd)
  print("MAE: ", mae)
  
  if mae < best_mae:
    final_maxDepth = maxDepth
    best_mae = mae
    
print("The selected maxDepth is ", final_maxDepth)

We observe how 6 is the optimum maxDepth value. With greater values the model seems to overfit and with smaller ones it is too simple to capture the data relationships. 

Using this hyper-parameter value, we will compute a final model with the train and validation data and evaluate it with the test partition.

In [14]:
# Re-train the model and evaluate it with the test set
final_dt = DecisionTreeRegressor(featuresCol='pca_features',
                           labelCol="energy",
                           maxDepth = final_maxDepth)
final_mdl = final_dt.fit(trainAndVal_pca)
predictions = final_mdl.transform(test_pca)
evaluator = RegressionEvaluator(labelCol="energy", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions)
print("Final MAE for {} of maxDepth: {}".format(final_maxDepth, mae))

MAE value has increased a little bit. It seems like, despite having more data for training the model, predicting the energy produced in 2009 and 2010 is more complicated with our historical data.

We will now tune the hyper-parameter "number of Principal Components". We will check how the performance varies on the validation set changing the number of Principal Components. Since Databricks is paying for this, we will perform the loop in increments of 1.

In [16]:
# Find optimum number of Principal Components
from pyspark.ml import Pipeline

best_mae = 1000
final_k = 0
evaluator = RegressionEvaluator(labelCol="energy", predictionCol="prediction", metricName="mae")

for k in range(1, 200):
  print("Number of PCs: ", k)
  
  pca2 = PCA(k=k, inputCol="features", outputCol = "pca_features")
  dt2 = DecisionTreeRegressor(featuresCol=pca2.getOutputCol(),
                           labelCol="energy",
                           maxDepth = final_maxDepth)
  pipe = Pipeline(stages=[pca2, dt2])
  
  mdl_pipe = pipe.fit(train_vec)
  pred_pipe = mdl_pipe.transform(val_vec)
  mae = evaluator.evaluate(pred_pipe)
  print("MAE: ", mae)
  
  if mae < best_mae:
    final_k = k
    best_mae = mae
    
print("The selected number of PCs is ", final_k)

Originally, there were 200 Principal Components but we knew that just the first one explained 79% of the variance. From component 11 to 200, the percentage of explained variance was almost negligible. Then, we are in a strong position to ensure that, despite having eliminated most of the input attributes, there is still a lot of redundancy that can be eliminated with PCA.

In [18]:
# Re-train the model and evaluate it with the test set
final_pca = PCA(k=final_k, inputCol="features", outputCol = "pca_features")
final_dt2 = DecisionTreeRegressor(featuresCol=final_pca.getOutputCol(),
                         labelCol="energy",
                         maxDepth = final_maxDepth)
final_pipe = Pipeline(stages=[final_pca, final_dt2])

mdl_final_pipe = final_pipe.fit(trainAndVal_vec)
final_pred_pipe = mdl_final_pipe.transform(test_vec)

evaluator = RegressionEvaluator(labelCol="energy", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(final_pred_pipe)
print("Final MAE for {} of maxDepth and {} PCs: {}".format(final_maxDepth, final_k,mae))

The final performance with only 10 PCs is better than with all of them.

However, this was not very efficient. As it has been pointed out, if you compute PCA with k=5 and then PCA with k=10, you are computing the first 5 components twice (once for k=5 and again for k=10). We will try to find a way to compute all PCs once and select the components from there. 

To do so, we will use the feature VectorSlicer, which allows to take n elements from each row in a column of a dataframe. The results should be (and they are) exactly the same as in the previous for loop.

Recall that we had all the Princpal Components pre-calculated previously in the dataframes train_pca, val_pca, test_pca and trainAndVal_pca.

In [20]:
from pyspark.ml.feature import VectorSlicer

best_mae = 1000
final_k = 0
evaluator = RegressionEvaluator(labelCol="energy", predictionCol="prediction", metricName="mae")

for k in range(1, 200):
  print("Number of PCs: ", k)
  
  slicer = VectorSlicer(inputCol="pca_features", outputCol="subset_pca_features", indices=range(0,k))
  
  slice_train_pca = slicer.transform(train_pca)
  slice_val_pca = slicer.transform(val_pca)
  
  opt_dt = DecisionTreeRegressor(featuresCol='subset_pca_features',
                           labelCol="energy",
                           maxDepth = final_maxDepth)
  
  opt_mdl = opt_dt.fit(slice_train_pca)
  opt_predictions = opt_mdl.transform(slice_val_pca)
  mae = evaluator.evaluate(opt_predictions)
  print("MAE: ", mae)
  
  if mae < best_mae:
    final_k = k
    best_mae = mae
    
print("The selected number of Principal Components is ", final_k)

As expected, the results are identical as the ones obtained with the least-efficient solution.