# Motivation

Being able to accurately forecast the volatile demand for power, especially across low-voltage distribution grids, is increasingly important to providing sufficent energy supply. Paradoxically, it's at the low-voltage networks that are the most volatile, and yet the most critical to account for as more distributed energy resources (e.g., solar, storage) make their way onto the grid. From the grid operator's point of view, not only are lower voltage service points (e.g., customers) the revenue life blood of the business, having increased visibility into demand at such lower levels of aggregation helps to provide 


In this notebook, I explore looking at a [Kaggle data set](https://www.kaggle.com/jeanmidev/smart-meters-in-london) containing about 5,000 utility service points with time series data from advanced metering infrastructure (AMI). The context of this experiment is to simulate the needs of a utility or grid operator and generate accurate short-term (up to 7 days ahead) forecast. I explore possible approaches that involve

- *Feature-based modeling approaches*
- *Customer Segmentation + Clustering*
- *Statistical methods, ML methods, and hybrid cross-learning approaches*

Performance is evaluated using time-series cross-validation using a rolling 7-day forecast horizon.

Finally, I show how this solution might be scaled and put into production.

Keywords: *time series classification*, *energy forecasting*, *cross-learning*, *MLOps*


In [2]:
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

import os
from tqdm import tqdm

import keras
import tensorflow

from tsfeatures import tsfeatures
from tsfeatures import crossing_points, acf_features, stl_features, entropy, nonlinearity, stability
import tsfeatures

Using TensorFlow backend.


In [3]:
# Set Project Working Directory
project_path = '/workspace/portfolio'

os.chdir(project_path)

# Set Data Path
data_path = f"{project_path}/data"
data_path

'/workspace/portfolio/data'

# Data

The data set we will use contains high-frequency measures of electricity demand usage across a panel of approximately 5,000 homes. It also contains customer metadata from the [ACORN](https://acorn.caci.co.uk/downloads/Acorn-User-guide.pdf) network, as well as atmospheric weather data from the DarkSky API (now owned by Apple and will be sunset by 2022).

In [3]:
# Process AMI Data
# The smart meter data is stored at the half-hour level of granularity in series of CSV files. 
# Here, we iterate over the files and concatenate into a single data frame in 'wide' format

ami_files = os.listdir(data_path + "/hhblock_dataset/hhblock_dataset/")

# Iterate over files using list comprehension
df_from_each_file = (pd.read_csv(data_path + "/hhblock_dataset/hhblock_dataset/" + f, sep=',') for f in ami_files)

df_merged   = pd.concat(df_from_each_file, ignore_index=True)
df_merged.head()


Unnamed: 0,LCLid,day,hh_0,hh_1,hh_2,hh_3,hh_4,hh_5,hh_6,hh_7,...,hh_38,hh_39,hh_40,hh_41,hh_42,hh_43,hh_44,hh_45,hh_46,hh_47
0,MAC000013,2012-06-22,0.127,0.113,0.076,0.109,0.12,0.096,0.113,0.148,...,0.086,0.154,0.121,0.102,0.131,0.121,0.082,0.096,0.127,0.126
1,MAC000013,2012-06-23,0.106,0.135,0.228,0.079,0.109,0.117,0.079,0.122,...,0.107,0.113,0.106,0.077,0.119,0.126,0.151,0.135,0.108,0.093
2,MAC000013,2012-06-25,0.099,0.119,0.169,0.136,0.109,0.09,0.132,0.131,...,0.14,0.135,0.134,0.106,0.131,0.169,0.17,0.141,0.163,0.137
3,MAC000013,2012-06-26,0.142,0.103,0.117,0.136,0.127,0.129,0.15,0.137,...,0.136,0.136,0.304,0.159,0.126,0.145,0.196,0.158,0.173,0.149
4,MAC000013,2012-06-27,0.126,0.128,0.175,0.163,0.119,0.103,0.128,0.141,...,0.17,0.144,0.098,0.183,0.316,0.19,0.121,0.176,0.158,0.146


In [4]:
# Process Hourly Weather Data
wthr = pd.read_csv(data_path + "/weather_hourly_darksky.csv")

wthr = wthr.rename(columns = {"time":"timestamp"})

wthr['date'] = pd.to_datetime(wthr['timestamp']).dt.date.astype(str)
wthr['time'] = pd.to_datetime(wthr['timestamp']).dt.time.astype(str)
wthr['hour'] = pd.to_datetime(wthr['timestamp']).dt.hour.astype(int)

wthr = wthr.drop(["icon", "summary", "precipType", "apparentTemperature"], axis = 1)
wthr.head()

Unnamed: 0,visibility,windBearing,temperature,timestamp,dewPoint,pressure,windSpeed,humidity,date,time,hour
0,5.97,104,10.24,2011-11-11 00:00:00,8.86,1016.76,2.77,0.91,2011-11-11,00:00:00,0
1,4.88,99,9.76,2011-11-11 01:00:00,8.83,1016.63,2.95,0.94,2011-11-11,01:00:00,1
2,3.7,98,9.46,2011-11-11 02:00:00,8.79,1016.36,3.17,0.96,2011-11-11,02:00:00,2
3,3.12,99,9.23,2011-11-11 03:00:00,8.63,1016.28,3.25,0.96,2011-11-11,03:00:00,3
4,1.85,111,9.26,2011-11-11 04:00:00,9.21,1015.98,3.7,1.0,2011-11-11,04:00:00,4


In [5]:
# Time Lookup
lookup = pd.read_csv(data_path + "/time-lookup.csv", sep = ",")
lookup['timestamp'] = pd.to_datetime(lookup['time'])
lookup['time'] = lookup["timestamp"].dt.time
lookup['hour'] = lookup["timestamp"].dt.hour

In [6]:
# Unique Number of Sensor meters
ids = pd.DataFrame(df_merged['LCLid'].unique())
ids.columns = ['id']
ids.shape

# Simple Random Sample
sample_ids = ids.sample(2000)
sample_ids.shape

(2000, 1)

# Data Pivoting


In [7]:
batch_size = 500  #chunk row size

# Use List Comprehension to create a list of data frame batches
list_df = [sample_ids[i:i + batch_size] for i in range(0,sample_ids.shape[0],batch_size)]
#list_df = list_df[1]

rs = []

for i in tqdm(list_df):
    try:
        local_df = df_merged.loc[df_merged['LCLid'].isin(list_df[1]["id"])]

        df_long = (pd.wide_to_long(local_df, stubnames = "hh_", i = ['day', 'LCLid'], j = "hh")
            .sort_values(["day", "hh"])
            .reset_index())

        df = (df_long.join(lookup.set_index('hh'), how = "inner", on = "hh")
            .rename(columns = {"day":"date"})
            .sort_values(["date", "time"])
            .rename(columns={'hh_':'kw'}))

        df = df.drop(["timestamp", "time", "hh"], axis = 1).groupby(["LCLid", "date", "hour"]).agg(kw = ("kw", sum)).reset_index()

        df = pd.merge(df, wthr.drop(["time"], axis = 1), on = ["date", "hour"])
        rs.append(df)
        del df_long
        del df
        del local_df
        
    except:
        print("Failed to generate features")

del list_df
        
import gc
gc.collect()
        
rs = pd.concat(rs)
        

100%|██████████| 4/4 [05:58<00:00, 89.74s/it]


In [4]:
#rs.head()

#rs.describe()
#rs.to_csv(data_path + "/ami-long.csv", index = False)

df = pd.read_csv(data_path + "/ami-long.csv", sep = ",").sample(1000000)

In [5]:
import gc as gc
gc.collect()

59473

In [6]:
from tensorflow.keras import layers

y = df["kw"]
X = df.drop(["LCLid", "kw", "date", "timestamp"], axis = 1)
#del df

X.dtypes

hour             int64
visibility     float64
windBearing      int64
temperature    float64
dewPoint       float64
pressure       float64
windSpeed      float64
humidity       float64
dtype: object

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder

#df.head()
#ohe = OneHotEncoder()
#ohe.fit(X)
X_enc = ohe.transform(X)

In [None]:
# first neural network with keras tutorial
#

#from numpy import loadtxt
#from keras.models import Sequential
#from keras.layers import Dense


import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import InputLayer
# load the dataset
#dataset = loadtxt('pima-indians-diabetes.csv', delimiter=',')
# split into input (X) and output (y) variables
#X = dataset[:,0:8]
#y = dataset[:,8]
# define the keras model
model = Sequential()
model.add(tf.keras.layers.Dense(12, input_dim=8, activation='relu'))
model.add(tf.keras.layers.Dense(32, activation='relu'))
model.add(tf.keras.layers.Dense(32, activation='relu'))
model.add(tf.keras.layers.Dense(32, activation='relu'))
model.add(tf.keras.layers.Dense(32, activation='relu'))
model.add(tf.keras.layers.Dense(32, activation='relu'))
model.add(tf.keras.layers.Dense(1, activation = "linear"))
# compile the keras model
model.compile(loss='mean_absolute_error', optimizer='adam', metrics=['mean_absolute_error'])
# fit the keras model on the dataset
model.fit(X, y, epochs=10, batch_size=2)
# evaluate the keras model
_, accuracy = model.evaluate(X, y)
print('Accuracy: %.2f' % (accuracy*100))



# Define Sequential model with 3 layers
# model = keras.Sequential(
#     [
#         layers.Dense(units = 32, activation= "relu", name= "layer1"),
#         layers.Dense(units = 32, activation= "relu", name= "layer2"),
#         layers.Dense(units = 32, activation= "relu", name= "layer3"),
#         layers.Dense(units = 32, activation= "relu", name= "layer4"),
#         layers.Dense(units = 32, activation= "relu", name= "layer5"),
#         layers.Dense(units = 1, activation = "linear")
#     ]
# )
# Call model on a test input
#x = tf.ones((3, 3))
#y = model(x)



# deep_model = kera.
#     layer_dense(units = 32, activation = 'relu', input_shape = c(ncol(X_train))) %>%
#     layer_dense(units = 32, activation = 'relu', input_shape = c(ncol(X_train))) %>%
#     layer_dense(units = 32, activation = 'relu', input_shape = c(ncol(X_train))) %>%
#     layer_dense(units = 32, activation = 'relu', input_shape = c(ncol(X_train))) %>%
#     layer_dense(units = 32, activation = 'relu', input_shape = c(ncol(X_train))) %>%
#     # layer_dense(units = 32, activation = 'relu', input_shape = c(ncol(W))) %>%
#     # layer_dense(units = 32, activation = 'relu', input_shape = c(ncol(W))) %>%
#     # layer_dense(units = 32, activation = 'relu', input_shape = c(ncol(W))) %>%
#     # layer_dropout(0.15) %>%
#     #    layer_batch_normalization() %>%
#     layer_dense(units = 1, activation = 'linear') %>%
#     compile(
#       loss =  'mean_absolute_error', #abs_var_penal_loss, #
#       optimizer =  optimizer_adam(decay = decay_rate), #optimizer_adam(lr = 0.001, decay = decay_rate), #optimizer_sgd(lr = 0.00001*5, momentum = 0.0, nesterov = FALSE, decay = 0.05/nrow(X_train)),
#       metrics = c('mean_absolute_error')
#     )

Epoch 1/10
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: 'arguments' object has no attribute 'posonlyargs'
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: 'arguments' object has no attribute 'posonlyargs'
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10

In [16]:
# Define Sequential model with 3 layers
model = keras.Sequential(
    [
        layers.Dense(2, activation="relu", name="layer1"),
        layers.Dense(3, activation="relu", name="layer2"),
        layers.Dense(4, name="layer3"),
    ]
)
# Call model on a test input
x = tf.ones((3, 3))
y = model(x)


TypeError: The added layer must be an instance of class Layer. Found: <tensorflow.python.keras.layers.core.Dense object at 0x7fb9139c9d10>

In [None]:
### Process Household Metadata ###

#
hh_info = pd.read_csv(data_path + "/acorn_details.csv", encoding = 'unicode_escape')
print(hh_info.head())

#
hh_info = pd.read_csv(data_path + "/informations_households.csv")
hh_info["Acorn"].value_counts()

In [None]:
df.shape

In [None]:
#batch_size = 100  #chunk row size

# Use List Comprehension to create a list of data frame batches
#list_df = [ids[i:i + batch_size] for i in range(0,ids.shape[0],batch_size)]
#list_df = list_df[1]

df_long = (pd.wide_to_long(local_df, stubnames = "hh_", i = ['day', 'LCLid'], j = "hh")
    .sort_values(["day", "hh"])
    .reset_index())

df = (df_long.join(lookup.set_index('hh'), how = "inner", on = "hh")
    .rename(columns = {"day":"date"})
    .sort_values(["date", "time"])
    .rename(columns={'hh_':'kw'}))

df = df.drop(["timestamp", "time", "hh"], axis = 1).groupby(["LCLid", "date", "hour"]).agg(kw = ("kw", sum)).reset_index()

df = pd.merge(df, wthr.drop(["time"], axis = 1), on = ["date", "hour"])

print(df.head())

In [None]:
df.head()

In [None]:
def calc_lr_stats(df):
    '''function to subset ami data frame, create load research features based on set of ids'''
    
    # pivot wide to long
    df = (pd.wide_to_long(df, stubnames = "hh_", i = ['day', 'LCLid'], j = "hh")
        #.sort_values(["day", "hh"])
        .reset_index())
    
    # reassign columns
    df.columns = ['ds', 'unique_id', 'hh', 'y']

    # drop NAs
    df = df[["ds", "unique_id", "y"]].dropna()
    
    # calculate features
    features = df.groupby(["unique_id"]).agg(["max", "mean"])
    features['unique_id'] = features.index
    
    #features = df.head()
    return pd.DataFrame(features)


In [None]:
batch_size = 1000  #chunk row size

# Use List Comprehension to create a list of data frame batches
list_df = [ids[i:i + batch_size] for i in range(0,ids.shape[0],batch_size)]

rs = []

for i in tqdm(list_df):
    try:
        local_df = df_merged.loc[df_merged['LCLid'].isin(i["id"])]
        rs.append(calc_lr_stats(local_df))
    except:
        print("Failed to generate features")


    

In [None]:
# Using Spark
# - reference https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html
# - reference https://docs.faculty.ai/how_to/spark/local_spark.html
# - referemce https://spark.apache.org/docs/2.4.4/sql-pyspark-pandas-with-arrow.html

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession, DataFrame
from pyspark.conf import SparkConf
from functools import reduce
import pyspark

spark = SparkSession.builder \
         .master("local[*]") \
         .appName("energy-forecasting") \
         .config("spark.executor.memory", "10g") \
         .config("spark.executor.cores", 8) \
         .config("spark.deploy.defaultCores", 8) \
         .config("spark.driver.memory", "3g") \
         .config("spark.sql.execution.arrow.pyspark.enabled", True) \
         .getOrCreate()

# explicit functions
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

@pandas_udf("max double, mean double, unique_id string", PandasUDFType.GROUPED_MAP)
def calc_lr_stats_py(df):
    '''function to subset ami data frame, create load research features based on set of ids'''

    # pivot wide to long
    df = (pd.wide_to_long(df, stubnames = "hh_", i = ['day', 'LCLid'], j = "hh")
        #.sort_values(["day", "hh"])
        .reset_index())
    
    # reassign columns
    df.columns = ['ds', 'unique_id', 'hh', 'y']

    # drop NAs
    df = df[["ds", "unique_id", "y"]].dropna()
    
    # calculate features
    features = df.groupby(["unique_id"]).agg(["max", "mean"])
    features['unique_id'] = features.index
    
    #features = df.head()
    return pd.DataFrame(features)


In [None]:
rs.head()

In [None]:
### Use Batch Processing and Pandas UDF in Spark to create Load Research Statistics ###
%time

batch_size = ids.shape[0]  #chunk row size

# Use List Comprehension to create a list of data frame batches
list_df = [ids[i:i + batch_size] for i in range(0,ids.shape[0],batch_size)]

#gc.collect()
rs = []

for i in tqdm(list_df):
    try:
        local_df = df_merged.loc[df_merged['LCLid'].isin(i["id"])]
        spark_df = spark.createDataFrame(local_df)
        print("Copied Data Into Spark...")
        rs_df = spark_df.groupby('LCLid').apply(calc_lr_stats_py).collect()
        #rs_df = spark_df.groupby('LCLid').apply(calc_lr_stats_py)
        rs.append(rs_df.select("*").toPandas())

    except:
        print("Failed to generate features")
    
rs = pd.concat(rs)

#unionAll(*[df1, df2, df3]).show()

# reassign columns
rs.columns = ['peak_demand', 'avg_demand', 'unique_id']
rs.head()

In [None]:
#from sklearn.cluster import KMeans

#num_df = rs[['peak_demand', 'avg_demand']]
#num_df["load_factor"] = num_df['avg_demand'].div(num_df['peak_demand']).replace(np.inf, 0)
#num_df = num_df.dropna()


# K-means clustering
#num_df["cluster"] = KMeans(n_clusters = 3, random_state = 0).fit_predict(num_df)
#num_df["cluster"] = num_df["cluster"].astype(str)
#num_df["cluster"].value_counts()


In [None]:
sns.set(rc = {'figure.figsize':(15,8)})
sns.scatterplot(data=num_df, x="peak_demand", y="avg_demand", hue = num_df.cluster.tolist())

#sns.lmplot(data=num_df), x="peak_demand", y="avg_demand", hue = "cluster", markers =['v', 'o', 'x'])

In [None]:
#sns.scatterplot(data=num_df, x="avg_demand", y="peak_demand", hue = num_df.cluster.tolist())
#num_df["load_factor"] = num_df['avg_demand'].div(num_df['peak_demand']).replace(np.inf, 0)

fig = plt.figure(figsize = (16, 8))
ax = fig.add_subplot(projection='3d')

x = num_df["peak_demand"]
y = num_df["avg_demand"]
z = num_df["load_factor"]

ax.scatter3D(x, y, z)

# Add x, y gridlines
ax.grid(b = True, color ='grey',
        linestyle ='-.', linewidth = 0.3,
        alpha = 0.2)

my_cmap = plt.get_cmap('Set1')
 

    # Creating plot
sctt = ax.scatter3D(x, y, z,
                    alpha = 0.7,
                    c = num_df["cluster"].astype(int),
                    cmap = my_cmap,
                    marker ='o')

 
plt.title("Load Factor")
ax.set_xlabel('Peak Demand (kW)', fontweight ='bold')
ax.set_ylabel('Average Demand (kW)', fontweight ='bold')
ax.set_zlabel('Load Factor ', fontweight ='bold')
fig.colorbar(sctt, ax = ax, shrink = 0.5, aspect = 5)
 
ax.view_init(20, 320)
    
# show plot
plt.show()


## Time Series Features

In addition to classic load research statistics, we can also borrow from other domains that analyze time series data for forecasting purposes. There has been significant research recently into time-series features, and an excellent R package for generating said features is [tsfeatures](https://cran.r-project.org/web/packages/tsfeatures/vignettes/tsfeatures.html). Here we use a [Python implementation / API](https://github.com/Nixtla/tsfeatures) to the R package 

In [None]:
def calc_features_py(df):

    # pivot wide to long
    df = (pd.wide_to_long(df, stubnames = "hh_", i = ['day', 'LCLid'], j = "hh")
        .sort_values(["day", "hh"])
        .reset_index())
    
    df.columns = ['ds', 'unique_id', 'hh', 'y']

    df = df[["ds", "unique_id", "y"]].dropna()
    
    # calculate ts_features
    features = tsfeatures.tsfeatures(df, freq = 1, features = [stl_features])
    #features = tsfeatures.tsfeatures(df, freq = 1, features = [stl_features, acf_features, entropy, crossing_points, stability, nonlinearity])
    #features = tsfeatures.tsfeatures(df, freq = 1)

    return pd.DataFrame(features).drop(["nperiods", "seasonal_period"], axis = 1)

check = calc_features_py(df = df_merged)
check.head()

In [None]:

@pandas_udf("unique_id string, trend double, spike double, linearity double, curvature double, e_acf1 double, e_acf10 double", PandasUDFType.GROUPED_MAP)
def calc_entropy_features_py(df):
    '''function to subset ami data frame, create load research features based on set of ids'''

    # pivot wide to long
    df = (pd.wide_to_long(df, stubnames = "hh_", i = ['day', 'LCLid'], j = "hh")
        .sort_values(["day", "hh"])
        .reset_index())
    
    df.columns = ['ds', 'unique_id', 'hh', 'y']

    df = df[["ds", "unique_id", "y"]].dropna()
    
    # calculate ts_features
    features = tsfeatures.tsfeatures(df, freq = 1, features = [entropy, crossing_points, stability, nonlinearity])

    return pd.DataFrame(features).drop(["nperiods", "seasonal_period"], axis = 1)

@pandas_udf("unique_id string, trend double, spike double, linearity double, curvature double, e_acf1 double, e_acf10 double", PandasUDFType.GROUPED_MAP)
def calc_stl_features_py(df):
    '''function to subset ami data frame, create load research features based on set of ids'''

    # pivot wide to long
    df = (pd.wide_to_long(df, stubnames = "hh_", i = ['day', 'LCLid'], j = "hh")
        .sort_values(["day", "hh"])
        .reset_index())
    
    df.columns = ['ds', 'unique_id', 'hh', 'y']

    df = df[["ds", "unique_id", "y"]].dropna()
    
    # calculate ts_features
    features = tsfeatures.tsfeatures(df, freq = 1, features = [stl_features])

    return pd.DataFrame(features).drop(["nperiods", "seasonal_period"], axis = 1)

In [None]:
### Use Batch Processing and Pandas UDF in Spark to create Time Series Statistics ###

batch_size = 50  #chunk row size

# Use List Comprehension to create a list of data frame batches
list_df = [ids[i:i + batch_size] for i in range(0,ids.shape[0],batch_size)]

gc.collect()
rs = []

for i in tqdm(list_df):
    try:
        local_df = df_merged.loc[df_merged['LCLid'].isin(i["id"])]
        spark_df = spark.createDataFrame(local_df)
        rs_df = spark_df.groupby('LCLid').apply(calc_stl_features_py).collect()
        rs.append(pd.DataFrame(rs_df))

    except:
        print("Failed to generate features")
    
rs = pd.concat(rs)
    
# reassign columns
#rs.columns = ['peak_demand', 'avg_demand', 'unique_id']
rs.head()

In [None]:
from tqdm import tqdm

batch_size = 10  #chunk row size

# Use List Comprehension to create a list of data frame batches
list_df = [ids[i:i + batch_size] for i in range(0,ids.shape[0],batch_size)]

rs = []

for i in tqdm(list_df):
    print(i)
    try:
        rs.append(calc_features(i, df = df_merged))
    except:
        print("Failed to generate features")
    
    

# Machine Learning

In [None]:
# - local models
# - global models
# - local models with clusters "ensembles of exprts"


# Local Methods


# Global Models
def train_gbm

def train_mlp

def train_


