Daniel Rocha Ruiz, MSc in Data Science and Business Analytics

Sources:
- http://archive.ics.uci.edu/ml/machine-learning-databases/00374/
- http://archive.ics.uci.edu/ml/datasets/Appliances+energy+prediction
- https://databricks.com/blog/2019/09/10/doing-multivariate-time-series-forecasting-with-recurrent-neural-networks.html
- https://pages.databricks.com/rs/094-YMS-629/images/Blog_%20A%20Multivariate%20Time%20Series%20Forecasting%20Appliance%20Energy%20Usage.html
- https://pages.databricks.com/rs/094-YMS-629/images/Blog_%20A%20Multivariate%20Time%20Series%20Forecasting%20Appliance%20Energy%20Usage.html
- https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf

# Set-up
## Import packages

In [None]:
# general
import matplotlib.pyplot as plt
import numpy as np 
from time import time

# scikit-learn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler

# mlflow
import mlflow
import mlflow.keras

# pyspark
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import round

# keras
from keras.callbacks import TensorBoard, EarlyStopping
from keras.layers import Dense, CuDNNLSTM, Dropout
from keras.layers.advanced_activations import LeakyReLU
from keras.models import Sequential
from keras.preprocessing.sequence import TimeseriesGenerator
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import dbutils

## Get data with Spark

In [None]:
spark = SparkSession \
            .builder \
            .appName("Python Spark SQL basic example") \
            .config("spark.some.config.option", "some-value") \
            .getOrCreate()

In [None]:
df = spark.read.csv('../data/energydata_complete.csv', header=True, inferSchema=True)

# Modelling
## Process the data

In [None]:
# Drop nulls and date column
df = df.dropna() 
dataset = df.drop('date')

# Round up the columns data to one decimal point
columns = dataset.columns
for i in columns:
    dataset = dataset.withColumn(i, round(i, 1))
    
values = dataset.collect()



values = dataset.collect() # should return a list 

# standardize
scaler = StandardScaler()
scaled = scaler.fit_transform(values)

X = scaled[:][:-144]
y = scaled[:, 0][144:]

# split into train and test sets
trainX, testX, trainY, testY = train_test_split(X, y, test_size=0.20, random_state=42, shuffle = False)

# Create overlapping windows of lagged values for training and testing datasets
timesteps = 864
train_generator = TimeseriesGenerator(trainX, trainY, length=timesteps, sampling_rate=1, batch_size=timesteps)
test_generator = TimeseriesGenerator(testX, testY, length=timesteps, sampling_rate=1, batch_size=timesteps)

train_X, train_y = train_generator[0]
test_X, test_y = test_generator[0]

train_samples = train_X.shape[0]*len(train_generator)
test_samples = test_X.shape[0]*len(test_generator)

print("Total Records (n): {}".format(df.count()))
print("Total Records after adjusting for 24 hours: {}".format(len(X)))
print("Number of samples in training set (.8 * n): trainX = {}".format(trainX.shape[0]))
print("Number of samples in testing set (.2 * n): testX = {}".format(testX.shape[0]))
print("Size of individual batches: {}".format(test_X.shape[1]))
print("Number of total samples in training feature set: {}".format(train_samples))
print("Number of samples in testing feature set: {}".format(test_samples))

## Train the model

In [None]:
# Tensorboard
tb_dir = '/tmp/tensorflow_log_dir/{}'.format(time())
tensorboard = TensorBoard(log_dir = tb_dir)
dbutils.tensorboard.start(tb_dir)

In [None]:
# LSTM expects the input data in a specific 3D format of test sample size, time steps, no. of input features. We had defined the time steps as n_lag variable in previous step.  Time steps are the past observations that the network will learn from (e.g. backpropagation through time).
# For details on what individual hyperparameters mean, see here: https://github.com/keras-team/keras/blob/master/keras/layers/recurrent.py#L2051

units = 128
num_epoch = 5000
learning_rate = 0.00144

with mlflow.start_run(experiment_id=3133492, nested=True):

    model = Sequential()
    model.add(CuDNNLSTM(units, input_shape=(train_X.shape[1], train_X.shape[2])))
    model.add(LeakyReLU(alpha=0.5)) 
    model.add(Dropout(0.1))
    model.add(Dense(1))

    adam = Adam(lr=learning_rate)
    # Stop training when a monitored quantity has stopped improving.
    callback = [EarlyStopping(monitor="loss", min_delta = 0.00001, patience = 50, mode = 'auto', restore_best_weights=True), tensorboard] 

    # Using regression loss function 'Mean Standard Error' and validation metric 'Mean Absolute Error'
    model.compile(loss='mse', optimizer=adam, metrics=['mae'])

    # fit network
    history = model.fit_generator(train_generator, \
                                epochs=num_epoch, \
                                validation_data=test_generator, \
                                callbacks = callback, \
                                verbose=2, \
                                shuffle=False, \
                                initial_epoch=0)

    mlflow.log_param("Units", units)
    mlflow.log_param("Epochs", num_epoch)
    mlflow.log_param("Learning Rate", learning_rate)
    mlflow.log_param("Lags cosidered", timesteps)

    #   Return loss value and metric value
    score = model.evaluate_generator(test_generator, verbose=0)   
    mlflow.log_metric("Test Loss", score[0]) 
    mlflow.log_metric("MAE", score[1])   
    mlflow.log_metric("Actual Epochs", len(history.history['loss']))
    mlflow.keras.log_model(model, "LSTM Model")

    # The model can be saved for future use and move to production
    #   mlflow.keras.save_model(model1, "/dbfs/ved-demo/timeseries/best-appliance-model/")

# Viz

## Plot Loss

In [None]:
# series: Loss
train_loss = np.mean(history.history['loss'])
train_mae = np.mean(history.history['mean_absolute_error'])
title = 'Train Loss: {0:.3f} Test Loss: {1:.3f}\n  Train MAE: {2:.3f}, Val MAE: {3:.3f}'.format(train_loss, score[0], train_mae, score[1])

# create plot
fig = plt.figure(figsize=(10,10))
plt.style.use('seaborn')
# plot series
plt.plot(history.history['loss'], 'c-', label='train')
plt.plot(history.history['val_loss'], 'm:', label='test')
# others
plt.title(title)
plt.legend()
plt.grid(True)
plt.close()
display(fig)

## Plot Predictions

In [None]:
# create plot
fig = plt.figure(figsize=(10,10))
plt.style.use('seaborn')
palette = plt.get_cmap('Set1')
# plot series
plt.plot(y, marker='', color=palette(4), linewidth=1, alpha=0.9, label='actual')
plt.plot(yhat_train_plot, marker='', color=palette(2), linewidth=1, alpha=0.9, label='training predictions')
plt.plot(yhat_test_plot, marker='', color=palette(3), linewidth=1, alpha=0.9, label='testing predictions')
# others
plt.title('Appliances Energy Prediction', loc='center', fontsize=20, fontweight=5, color='orange')
plt.ylabel('Energy used (Wh)')
plt.legend()
fig.set_size_inches(w=15,h=5)
plt.close()
display(fig)