## Intro

The main purpose of this notebook is to show how to use Spark to perform distributed training of ML models on the cluster. Here we'll train a few Prophet models using Hyperopt to tune the hyperparameters. Hyperopt is a general purpose library that can be used to optimise any function that has parameters and here it is used to optimise the prediction loss function (rmse). Additionally,the search of the best hyperparameters will be tracked using MLflow. 

This notebook uses daily aggregated energy consumption data. 

Key steps in this notebook:  
- Define a train function
- Define a search space and select a strategy for the search
- Run the optimization and record the search in mlflow

Please be aware that this notebook needs to be run on a Databricks cluster with **ML Runtime**. It has been tested with 13.1 ML Runtime..

## Imports

In [0]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error
from sklearn.model_selection import ParameterGrid

#import hyperopt from the ML runtime. This notebook need to be run on Databricks ML Runtime. 
#The hyperopt package installed on the ML runtime is different to the open source version. 
from hyperopt import fmin, hp, tpe
from hyperopt import SparkTrials, STATUS_OK
from prophet  import Prophet

import mlflow

## Setup MLflow

In [0]:
# You must create the expirement by hand in the Databricks Machine Learning Experiments GUI
# Then copy the name of the experiment and paste it here

experimentPath = "/Users/simon.buse@ewz.ch/MittelfristPrognoseTest"

if mlflow.get_experiment_by_name(experimentPath) != None:
    print(f"Experiment {experimentPath} exists, setting it as the active experiemnt")
    mlflow.set_experiment(experimentPath)
else:
    raise Exception("You must first create the experiment in the Databricks Machine Learning GUI")

## Define Functions

In [0]:
def resampleFixEnds(pdf, frequency):
    """
    The function resamples/aggregates the data according to the sampling frequency. Often the first 
    and last data points will deviate after resampling. As a simple fix, these points are simply deleted
    if they deviate more than 20% from their neighboring data point.
    """

    pdf = pdf.resample(frequency).sum(min_count=1)  #frequency: "D,W,M"

    for column in pdf.columns:
        if pdf[column].iloc[0] < 0.8 * pdf[column].iloc[1]:
            pdf = pdf.drop(pdf.index[0])

        if pdf[column].iloc[-1] < 0.8 * pdf[column].iloc[-2]:
            pdf = pdf.drop(pdf.index[-1])

    return pdf

## Prepare the Data

In [0]:
url = "https://data.stadt-zuerich.ch/dataset/ewz_stromabgabe_netzebenen_stadt_zuerich/download/ewz_stromabgabe_netzebenen_stadt_zuerich.csv"
dataPdf = pd.read_csv(url, index_col=None)

dataPdf["Timestamp"] = pd.to_datetime(dataPdf["Timestamp"], utc=True)

#set timestamp as index to do a daily aggregation
dataPdf = dataPdf.set_index(dataPdf["Timestamp"])  
dataPdf = resampleFixEnds(dataPdf, "D")

#Drop the timezone to avoid warnings
dataPdf.index = dataPdf.index.tz_localize(None)  

#rename the columns into y and ds. needed by prophet
dataPdf["ds"] = dataPdf.index
#rescaling the data to GWh, good practise not to work with huge numbers
dataPdf["y"] = (dataPdf["Value_NE5"].values + dataPdf["Value_NE7"].values)/1e6
dataPdf = dataPdf.drop(columns=["Value_NE5", "Value_NE7"])

# put aside some data for evaluation
split = len(dataPdf)-365
trainPdf, testPdf = dataPdf.iloc[:split], dataPdf.iloc[split:]
trainPdf

## Define the Model and the Search Space.

In [0]:

def train(params):
  """
  This is our main training function which we pass to Hyperopt.
  It takes in hyperparameter settings, fits a model based on those settings,
  evaluates the model, and returns the loss.
  """

  with mlflow.start_run(run_name='inner_run', nested=True) as run: 
    
    forecaster = Prophet(
        seasonality_mode=        params["seasonality_mode"],
        changepoint_prior_scale= params["changepoint_prior_scale"],
        seasonality_prior_scale= params["seasonality_prior_scale"],
        holidays_prior_scale=    params["holidays_prior_scale"],
        changepoint_range=       params["changepoint_range"],
    )

    if params["holidays"] != None:
        forecaster.add_country_holidays(country_name=params["holidays"])

    forecaster.fit(trainPdf)
    predictedValues = forecaster.predict(testPdf)

    rmse = mean_squared_error(y_true=testPdf.y.values, y_pred=predictedValues.yhat.values, squared=False)
    
    mlflow.log_metric('rmse', rmse)
    mlflow.set_tag("model","Prophet")
    #mlflow.log_params(params), breaks the code, params are logged automatically.

  return {"loss": rmse, "status": STATUS_OK, "Trained_Model": forecaster}

# Define the search space for Hyperopt. Prophets main parameters where found here
# https://facebook.github.io/prophet/docs/diagnostics.html#hyperparameter-tuning

search_space = {
  "seasonality_mode":        hp.choice("seasonality_mode",["multiplicative", "additive"]),
  "holidays":                hp.choice("holidays",[None,"Switzerland"]),
  "changepoint_prior_scale": hp.loguniform("changepoint_prior_scale", -6.9, -0.69),  # according to recom. same as [0.001,0.5]
  "seasonality_prior_scale": hp.loguniform("seasonality_prior_scale", -6.9, 2.3),    # according to recom. same as [0.001, 10]
  "holidays_prior_scale":    hp.loguniform("holidays_prior_scale", -6.9, 2.3),       # according to recom. same as [0.001, 10]
  "changepoint_range":       hp.uniform("changepoint_range", 0.8, 0.95)              # optional according to docs, default = 0.8
}

#Give a name to the run, this name is will be used to group the search results.
with mlflow.start_run(run_name='outer_run_prophet'):
  
  # Select a search algorithm for Hyperopt to use.
  algorithm = tpe.suggest  # Tree of Parzen Estimators, a Bayesian method

  # Distribute tuning across our Spark cluster
  sparkTrials = SparkTrials(parallelism=4)

  hyperparameters = fmin(
      fn=train,
      space=search_space,
      algo=algorithm,
      trials=sparkTrials,
      max_evals=30,
      timeout=5*60     #seconds
      ) 

  bestModel = sparkTrials.results[np.argmin([r["loss"] for r in sparkTrials.results])]["Trained_Model"]

  print(hyperparameters)


## Display the results

In [0]:
# plot the outcome of the model
f, axes = plt.subplots(2, 1, figsize=(18, 8))

predictedValues = bestModel.predict(testPdf)

axes[0].plot(predictedValues.ds.values, predictedValues.yhat.values, color="tab:red", label="forcast")
#axes[0].plot(trainPdf.ds.values, trainPdf.y.values, color="tab:blue", label="train")
axes[0].plot(testPdf.ds.values, testPdf.y.values, color="tab:orange", label="truth", alpha=0.5)
axes[0].legend()
axes[0].set_title("NE5 + NE7")
axes[0].set_ylabel("Last [GWh]")

xmin, xmax = axes[0].get_xlim()
axes[1].plot(testPdf.ds,(predictedValues.yhat.values-testPdf.y.values)/(testPdf.y.values)*100)
axes[1].hlines(0, xmin, xmax, color="tab:grey", linestyle="--")
axes[1].set_xlim(xmin,xmax)
axes[1].set_ylabel("(Pred-True)/True [%]")

plt.show()

In [0]:
# plot the induvidual components of the best model
forecast = bestModel.predict(dataPdf)
bestModel.plot(forecast)
fig = bestModel.plot_components(forecast)