## Predict electricity tariff based on demographic and electric usage data

In [0]:
energy = spark.read.csv('/FileStore/tables/energy_data-20.csv',inferSchema=True,header=True)

In [0]:
#energy = energy.sample(False, 0.1)

In [0]:
energy.printSchema()

In [0]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
  inputCols=['Age', 'MaritalStatus', 'AnnualConsumption', 'DayNightConsumption', 'IncomeLevel', 'DwellingArea', 'HasChildren', 'SolarRoof', 'ShiftableLoad', 'RiskAttitude', 'AttitudeSustainability'],
  outputCol="features")

In [0]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="Tariff", outputCol="tariff_index")

### Logistic Regression Pipeline

In [0]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="tariff_index", featuresCol="features")

In [0]:
from pyspark.ml import Pipeline

lr_pipeline = Pipeline(stages=[assembler, indexer, lr])

### Random Forest Classifier Pipeline

In [0]:
from pyspark.ml.classification import RandomForestClassifier

rfc = RandomForestClassifier(labelCol="tariff_index", featuresCol="features")

In [0]:
rfc_pipeline = Pipeline(stages=[assembler, indexer, rfc])

### Train/Test Split

In [0]:
train,test = energy.randomSplit([0.7,0.3])

### Logistic Regression Grid Search

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

lr_evaluator =  MulticlassClassificationEvaluator().setLabelCol("tariff_index").setMetricName("accuracy")

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

lr_paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.3, 0.7, 1])
             .addGrid(lr.elasticNetParam, [0.4, 0.8, 1])
             .build())

lr_crossval = CrossValidator(estimator=lr_pipeline,
                          estimatorParamMaps=lr_paramGrid,
                          evaluator=lr_evaluator,
                          numFolds=5,
                          seed=100) 

In [0]:
lr_cvModel = lr_crossval.fit(train)

In [0]:
print (lr_cvModel.bestModel.stages[-1].getRegParam())
print (lr_cvModel.bestModel.stages[-1].getElasticNetParam())

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

lr_predictions = lr_cvModel.transform(test)
acc = lr_evaluator.evaluate(lr_predictions, {lr_evaluator.metricName: "accuracy"})
print("accuracy = {}".format(acc))

### Random Forest Grid Search

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

rfc_evaluator =  MulticlassClassificationEvaluator().setLabelCol("tariff_index").setMetricName("accuracy")

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

rfc_paramGrid = (ParamGridBuilder()
             .addGrid(rfc.maxDepth, [2, 7, 12])
             .addGrid(rfc.numTrees, [2, 5, 10])
             .build())

rfc_crossval = CrossValidator(estimator=rfc_pipeline,
                          estimatorParamMaps=rfc_paramGrid,
                          evaluator=rfc_evaluator,
                          numFolds=5,
                          seed=100)

In [0]:
rfc_cvModel = rfc_crossval.fit(train)

In [0]:
print (rfc_cvModel.bestModel.stages[-1].getMaxDepth())
print (rfc_cvModel.bestModel.stages[-1].getNumTrees)

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

rfc_predictions = rfc_cvModel.transform(test)
rfc_acc = rfc_evaluator.evaluate(rfc_predictions, {rfc_evaluator.metricName: "accuracy"})
print("accuracy = {}".format(rfc_acc))

### Logistic Regression w/Hyperopt

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import mlflow

def lr_objective_function(params):    
  # set the hyperparameters that we want to tune
  regParam = params["regParam"]
  elasticNetParam = params["elasticNetParam"]

  # create a grid with our hyperparameters
  lr_grid = (ParamGridBuilder()
    .addGrid(lr.regParam, [regParam])
    .addGrid(lr.elasticNetParam, [elasticNetParam])
    .build())

  # cross validate the set of hyperparameters
  lr_cv = CrossValidator(estimator=lr_pipeline, estimatorParamMaps=lr_grid, evaluator=lr_evaluator, numFolds=3)
  lr_cvModel = lr_cv.fit(train)

  # get our average RMSE across all three folds
  acc = -lr_cvModel.avgMetrics[0]

  return {"loss": acc, "status": STATUS_OK}

In [0]:
from hyperopt import hp

search_space = {
  "regParam": hp.uniform("regParam", 0.3, 1),
  "elasticNetParam": hp.uniform("elasticNetParam", 0.4, 1)
}

In [0]:
from hyperopt import fmin, tpe, STATUS_OK, Trials
import numpy as np

mlflow.autolog(exclusive=False)

# Creating a parent run
with mlflow.start_run(run_name = "lr_tpe"):
    num_evals = 9
    trials = Trials()
    best_hyperparam = fmin(fn=lr_objective_function, 
                             space=search_space,
                             algo=tpe.suggest, 
                             max_evals=num_evals,
                             trials=trials,
                             rstate=np.random.default_rng(42)
                            )
    # get optimal hyperparameter values
    best_regParam = best_hyperparam["regParam"]
    best_elasticNetParam = best_hyperparam["elasticNetParam"]

    # change RF to use optimal hyperparameter values (this is a stateful method)
    lr.setRegParam(best_regParam)
    lr.setElasticNetParam(best_elasticNetParam)

      # train pipeline on entire training data - this will use the updated RF values
    pipelineModel = lr_pipeline.fit(train)

      # evaluate final model on test data
    predDF = pipelineModel.transform(test)
    acc = lr_evaluator.evaluate(predDF)

      # Log param and metric for the final model
    mlflow.log_param("regParam", best_regParam)
    mlflow.log_param("elasticNetParam", best_elasticNetParam)
    mlflow.log_metric("acc", acc)

In [0]:
print(acc)

### Random Forest w/Hyperopt

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import mlflow

def rfc_objective_function(params):    
  # set the hyperparameters that we want to tune
  maxDepth = params["maxDepth"]
  numTrees = params["numTrees"]

  # create a grid with our hyperparameters
  rfc_grid = (ParamGridBuilder()
    .addGrid(rfc.maxDepth, [maxDepth])
    .addGrid(rfc.numTrees, [numTrees])
    .build())

  # cross validate the set of hyperparameters
  rfc_cv = CrossValidator(estimator=rfc_pipeline, estimatorParamMaps=rfc_grid, evaluator=rfc_evaluator, numFolds=3)
  rfc_cvModel = rfc_cv.fit(train)

  # get our average RMSE across all three folds
  acc = -rfc_cvModel.avgMetrics[0]

  return {"loss": acc, "status": STATUS_OK}

In [0]:
from hyperopt import hp

rfc_search_space = {
  "maxDepth": hp.randint("maxDepth", 2, 12),
  "numTrees": hp.randint("numTrees", 2, 10)
}

In [0]:
from hyperopt import fmin, tpe, STATUS_OK, Trials
import numpy as np

mlflow.autolog(exclusive=False)

# Creating a parent run
with mlflow.start_run(run_name = "rfc_tpe"):
    num_evals = 9
    trials = Trials()
    best_hyperparam = fmin(fn=rfc_objective_function, 
                             space=rfc_search_space,
                             algo=tpe.suggest, 
                             max_evals=num_evals,
                             trials=trials,
                             rstate=np.random.default_rng(42)
                            )
    # get optimal hyperparameter values
    best_maxDepth = best_hyperparam["maxDepth"]
    best_numTrees = best_hyperparam["numTrees"]

    # change RF to use optimal hyperparameter values (this is a stateful method)
    rfc.setMaxDepth(best_maxDepth)
    rfc.setNumTrees(best_numTrees)

      # train pipeline on entire training data - this will use the updated RF values
    rfc_pipelineModel = rfc_pipeline.fit(train)

      # evaluate final model on test data
    predDF = rfc_pipelineModel.transform(test)
    acc = rfc_evaluator.evaluate(predDF)

      # Log param and metric for the final model
    mlflow.log_param("max_depth", best_maxDepth)
    mlflow.log_param("num_trees", best_numTrees)
    mlflow.log_metric("acc", acc)

In [0]:
print(acc)

### CV & Mlflow - Log Reg

In [0]:
import mlflow

mlflow.autolog(exclusive=False)

with mlflow.start_run(run_name = "lr_cv"):

    from pyspark.ml.evaluation import MulticlassClassificationEvaluator

    lr_evaluator =  MulticlassClassificationEvaluator().setLabelCol("tariff_index").setMetricName("accuracy")

    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

    lr_paramGrid = (ParamGridBuilder()
                 .addGrid(lr.regParam, [0.3, 0.7, 1])
                 .addGrid(lr.elasticNetParam, [0.4, 0.8, 1])
                 .build())

    lr_crossval = CrossValidator(estimator=lr_pipeline,
                              estimatorParamMaps=lr_paramGrid,
                              evaluator=lr_evaluator,
                              numFolds=5,
                              seed=100) 

    lr_cvModel = lr_crossval.fit(train)

    best_RegParam = lr_cvModel.bestModel.stages[-1].getRegParam()
    best_NetParam = lr_cvModel.bestModel.stages[-1].getElasticNetParam()

    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.mllib.evaluation import MulticlassMetrics

    lr_predictions = lr_cvModel.transform(test)
    acc = lr_evaluator.evaluate(lr_predictions, {lr_evaluator.metricName: "accuracy"})
    print("accuracy = {}".format(acc))

    mlflow.log_param("regparam", best_RegParam)
    mlflow.log_param("netparam", best_NetParam)
    mlflow.log_metric("acc", acc)

### CV & Mlflow- Random Forest

In [0]:
import mlflow

mlflow.autolog(exclusive=False)

with mlflow.start_run(run_name = "rfc_cv"):
    
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator

    rfc_evaluator =  MulticlassClassificationEvaluator().setLabelCol("tariff_index").setMetricName("accuracy")

    from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

    rfc_paramGrid = (ParamGridBuilder()
                 .addGrid(rfc.maxDepth, [2, 7, 12])
                 .addGrid(rfc.numTrees, [2, 5, 10])
                 .build())

    rfc_crossval = CrossValidator(estimator=rfc_pipeline,
                              estimatorParamMaps=rfc_paramGrid,
                              evaluator=rfc_evaluator,
                              numFolds=5,
                              seed=100)

    rfc_cvModel = rfc_crossval.fit(train)

    best_maxdepth = rfc_cvModel.bestModel.stages[-1].getMaxDepth()
    best_numtrees = rfc_cvModel.bestModel.stages[-1].getNumTrees

    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.mllib.evaluation import MulticlassMetrics

    rfc_predictions = rfc_cvModel.transform(test)
    rfc_acc = rfc_evaluator.evaluate(rfc_predictions, {rfc_evaluator.metricName: "accuracy"})
    print("accuracy = {}".format(rfc_acc))
    
    mlflow.log_param("maxdepth", best_maxdepth)
    mlflow.log_param("numtrees", best_numtrees)
    mlflow.log_metric("acc", rfc_acc)

Out of the 4 models tracked, the random forest model with tpe implemented performed the best as it had the highest accuracy of 0.79. When comparing grid search and tpe methods for the random forest model, it was interesting that the grid search method suggested hyperparameters of 7 max depth and 10 number of trees, while the tpe method suggested 5 max depth and 8 number of trees. In terms of the logistic regression models, the tpe and grid search methods for hyperparameter tuning had almost identical accuracy score, however their configured parameters were different with the grid search log reg suggesting elastic net parameter of 0.4 while the tpe suggested a value of 0.77. In terms of regParam, the grid search suggested a value of 0.3 while the tpe suggested a value of 0.87. 

In terms of speed, the tpe log reg took 1.01 minutes to run while the grid search took 1.12 minutes to run. This is not surprising due to the Bayesian approach of tpe. However, the tpe of the random forest took 57 seconds to run while the grid search took 55.79 seconds to run. This is likely due to the fact that tree splitting is always dependent on the trees that come before it making a Bayesian approach tohyperparameter tuning nearly inconsequential in terms of run time. 

The model that should be implemented to predict tariff should be the random forest tpe. It has the highest accuracy out of all models and the difference in running time for the tpe and grid search models is very small. The hyperparameters for this model are a max depth of 5 and number of trees of 8.