In [2]:
from pathlib import Path
import pandas as pd
import numpy as np
import re
from window_ops.rolling import (
    seasonal_rolling_max,
    seasonal_rolling_mean,
    seasonal_rolling_min,
    seasonal_rolling_std,
)

## 04 - Feature Engineering

A standard regression model has no explicit understanding of time and so we need to engineer good features to embed the temporal aspect of the problem. We will do time delay embedding to add recent and seasonal information to the dataset.

We read in the versions of the train, val and test portions of the data where we imputed the missing values:

In [7]:
preprocessed = Path('../data/preprocessed')

In [8]:
# read in the train, val and test data sets where missing values are imputed
train_df = pd.read_parquet(preprocessed/'selected_blocks_train_missing_imputed.parquet')
val_df = pd.read_parquet(preprocessed/'selected_blocks_val_missing_imputed.parquet')
test_df = pd.read_parquet(preprocessed/'selected_blocks_test_missing_imputed.parquet')

Some of the features we will be creating will require a single joined-up dataset with continuous time. We join them up while tagging each portion so that we don't lose track:

In [9]:
train_df['type'] = 'train'
val_df['type'] = 'val'
test_df['type'] = 'test'
full_df = pd.concat([train_df, val_df, test_df]).sort_values(['LCLid', 'timestamp'])

In [11]:
# Get the appropriate 32 bit data type for a column
def get_32_bit_dtype(col):
    dtype = col.dtype
    if dtype.name.startswith('float'):
        ret_dtype = 'float32'
    elif dtype.name.startswith('int'):
        ret_dtype = 'int32'
    else:
        ret_dtype = None
    return ret_dtype

# Add lags, setting the data type of the lag columns to 32 bit if we want to save space
# performs the lags for each LCLId separately
def add_lags(df, lags, column, ts_id, use_32_bit=False):
    _32_bit_dtype = get_32_bit_dtype(df[column])
    if use_32_bit and _32_bit_dtype is not None:
        col_dict = {f'{column}_lag_{l}': df.groupby([ts_id])[column].shift(l).astype(_32_bit_dtype) for l in lags}
    else:
        col_dict = {f'{column}_lag_{l}': df.groupby([ts_id])[column].shift(l) for l in lags}
    df = df.assign(**col_dict)
    added_features = list(col_dict.keys())
    return df, added_features

We create the first 5 lags, the same 5 lags from the previous day, and the same 5 lags from the previous week in order to capture daily and weekly seasonality:

In [12]:
lags = (
    (np.arange(5) + 1).tolist()
    + (np.arange(5) + 46).tolist()
    + (np.arange(5) + (48 * 7) - 2).tolist()
)

In [13]:
full_df, added_features = add_lags(full_df, lags, 'energy_consumption', 'LCLid', True)

We can also add a rolling window aggregation (using the mean average and standard deviation) of with window sizes of 3, 6, 12 and 18 to connect the present data point to the recent past while smoothing out fluctuations and short-term irregularities.

In [14]:
def add_rolling_features(df, rolls, column, agg_funcs=['mean', 'std'], ts_id=None, n_shift=1, use_32_bit=False):
    _32_bit_dtype = get_32_bit_dtype(df[column])
    rolling_df = pd.concat([df.groupby(ts_id)[column]
                            .shift(n_shift).rolling(l)
                            .agg({f'{column}_rolling_{l}_{agg}': agg for agg in agg_funcs}) 
                            for l in rolls], axis=1)
    df = df.assign(**rolling_df.to_dict('list'))
    added_features = rolling_df.columns.tolist()
    if use_32_bit and _32_bit_dtype is not None:
        df[added_features] = df[added_features].astype(_32_bit_dtype)
    return df, added_features

In [15]:
full_df, added_features = add_rolling_features(
    full_df,
    rolls=[3, 6, 12, 48],
    column='energy_consumption',
    agg_funcs=['mean', 'std'],
    ts_id='LCLid',
    use_32_bit=True,
)

A seasonal rolling window aggregation takes a seasonal window, skipping a constant number of timesteps between each item in the window. Since this is not easy to do efficiently with Pandas, we use an implementation from [github.com/jmoralez/window_ops/](https://github.com/jmoralez/window_ops/) that uses NumPy and Numba to make doing this fast and efficient.

We add periods of 48 (one day) and 28*7 (one week), each with a window of size 3, aggregating on mean and standard deviation.

In [16]:
SEASONAL_ROLLING_MAP = {
    'mean': seasonal_rolling_mean,
    'min': seasonal_rolling_min,
    'max': seasonal_rolling_max,
    'std': seasonal_rolling_std,
}

def add_seasonal_rolling_features(df, seasonal_periods, rolls, column, agg_funcs, ts_id=None, n_shift=1, use_32_bit=False):
    _32_bit_dtype = get_32_bit_dtype(df[column])
    agg_funcs = {agg: SEASONAL_ROLLING_MAP[agg] for agg in agg_funcs}
    added_features = []
    for sp in seasonal_periods:
        if use_32_bit and _32_bit_dtype is not None:
            col_dict = {
                f'{column}_{sp}_seasonal_rolling_{l}_{name}': df.groupby(ts_id)[
                    column
                ]
                .transform(
                    lambda x: agg(
                        x.shift(n_shift * sp).values,
                        season_length=sp,
                        window_size=l,
                    )
                )
                .astype(_32_bit_dtype)
                for (name, agg) in agg_funcs.items()
                for l in rolls
            }
        else:
            col_dict = {
                f'{column}_{sp}_seasonal_rolling_{l}_{name}': df.groupby(ts_id)[
                    column
                ].transform(
                    lambda x: agg(
                        x.shift(n_shift * sp).values,
                        season_length=sp,
                        window_size=l,
                    )
                )
                for (name, agg) in agg_funcs.items()
                for l in rolls
            }
        df = df.assign(**col_dict)
        added_features += list(col_dict.keys())
    return df, added_features

In [17]:
full_df, added_features = add_seasonal_rolling_features(
    full_df,
    rolls=[3],
    seasonal_periods=[48, 48 * 7],
    column='energy_consumption',
    agg_funcs=['mean', 'std'],
    ts_id='LCLid',
    use_32_bit=True,
)

Exponentially Weighted Moving Averages can be thought of as an average of the entire history of the time series, but where exponentially decaying weights cause more recent points in time to have more weight in the average. These can be defined in terms of the decay parameter alpha, or the span (the number of periods that it takes for the decayed weights to approach zero), where alpha = 2 / (1 + span).

Here, we add EWMA features with spans of 60 days, 1 week, and 1 day.

In [19]:
def add_ewma(df, column, alphas=[0.5], spans=None, ts_id=None, n_shift=1, use_32_bit=False):
    # determine whether to use spans or alphas in ewm calculations
    if spans is not None:
        use_spans = True
    _32_bit_dtype = get_32_bit_dtype(df[column])
    if use_32_bit and _32_bit_dtype is not None:
        col_dict = {
            f"{column}_ewma_{'span' if use_spans else 'alpha'}_{param}": df.groupby(
                [ts_id]
            )[column]
            .shift(n_shift)
            .ewm(
                alpha=None if use_spans else param,
                span=param if use_spans else None,
                adjust=False,
            )
            .mean()
            .astype(_32_bit_dtype)
            for param in (spans if use_spans else alphas)
        }
    else:
        col_dict = {
            f"{column}_ewma_{'span' if use_spans else 'alpha'}_{param}": df.groupby(
                [ts_id]
            )[column]
            .shift(n_shift)
            .ewm(
                alpha=None if use_spans else param,
                span=param if use_spans else None,
                adjust=False,
            )
            .mean()
            for param in (spans if use_spans else alphas)
        }
    df = df.assign(**col_dict)
    return df, list(col_dict.keys())

In [20]:
full_df, added_features = add_ewma(
    full_df,
    spans=[48 * 60, 48 * 7, 48],
    column='energy_consumption',
    ts_id='LCLid',
    use_32_bit=True,
)

Through temporal embedding, we can embed time features which the ML model can leverage.

We can add calendar features such as the month, quarter, day of year, hour, minute and so on as categorical variables. It only makes sense to add calendar features which are temporally higher than the frequency of the time series.

We can also add elapsed-time features which capture the passage of time in an ML model. These columns increate monotonically as time increases. Here, we add a timestamp column to indicate the elapsed time.

In [21]:
def add_temporal_features(df, date_column_name, add_elapsed, prefix=None, drop=True, use_32_bit=False):
    date_column = df[date_column_name]
    prefix = (re.sub('[Dd]ate$', '', date_column_name) if prefix is None else prefix) + '_'
    # because we have 30min-frequency data, we can add the following features:
    attr = ['Month',
    'Quarter',
    'Is_quarter_end',
    'Is_quarter_start',
    'Is_year_end',
    'Is_year_start',
    'Is_month_start',
    'WeekDay',
    'Dayofweek',
    'Dayofyear',
    'Hour',
    'Minute']
    _32_bit_dtype = 'int32'
    added_features = []
    for n in attr:
        if n == 'Week':
            continue
        df[prefix + n] = (
            getattr(date_column.dt, n.lower()).astype(_32_bit_dtype)
            if use_32_bit
            else getattr(date_column.dt, n.lower())
        )
        added_features.append(prefix + n)
    # Pandas removed `dt.week` in v1.1.10
    if 'Week' in attr:
        week = (
            date_column.dt.isocalendar().week
            if hasattr(date_column.dt, 'isocalendar')
            else date_column.dt.week
        )
        df.insert(
            3, prefix + 'Week', week.astype(_32_bit_dtype) 
            if use_32_bit 
            else week
        )
        added_features.append(prefix + 'Week')
    if add_elapsed:
        mask = ~date_column.isna()
        df[prefix + 'Elapsed'] = np.where(
            mask, date_column.values.astype(np.int64) // 10**9, None
        )
        if use_32_bit:
            if df[prefix + 'Elapsed'].isnull().sum() == 0:
                df[prefix + 'Elapsed'] = df[prefix + 'Elapsed'].astype('int32')
            else:
                df[prefix + 'Elapsed'] = df[prefix + 'Elapsed'].astype('float32')
        added_features.append(prefix + 'Elapsed')
    if drop:
        df.drop(date_column_name, axis=1, inplace=True)
    return df, added_features

In [22]:
full_df, added_features = add_temporal_features(
    full_df,
    date_column_name='timestamp',
    add_elapsed=True,
    drop=False,
    use_32_bit=True,
)

Representing categorical calendar features as their continuous sine and cosine Fourier components can be advantageous depending on the kind of ML model being used.

Here, we encode the `timestamp_Month`, `timestamp_Hour` and `timestamp_minute` columns as their Fourier representations.

In [23]:
# calculate n Fourier terms, given the seasonal cycle and max cycle:
def calculate_fourier_terms(seasonal_cycle, max_cycle, n_fourier_terms):
    sin_X = np.empty((len(seasonal_cycle), n_fourier_terms), dtype='float64')
    cos_X = np.empty((len(seasonal_cycle), n_fourier_terms), dtype='float64')
    for i in range(1, n_fourier_terms + 1):
        sin_X[:, i - 1] = np.sin((2 * np.pi * seasonal_cycle * i) / max_cycle)
        cos_X[:, i - 1] = np.cos((2 * np.pi * seasonal_cycle * i) / max_cycle)
    return np.hstack([sin_X, cos_X])

In [24]:
# add the Fourier features for an individual column
def add_fourier_features(df, column_to_encode, max_value=None, Optional=None, n_fourier_terms=1, use_32_bit=False):
    if max_value is None:
        max_value = df[column_to_encode].max()
    fourier_features = calculate_fourier_terms(
        df[column_to_encode].astype(int).values,
        max_cycle=max_value,
        n_fourier_terms=n_fourier_terms,
    )
    feature_names = [
        f'{column_to_encode}_sin_{i}' for i in range(1, n_fourier_terms + 1)
    ] + [f'{column_to_encode}_cos_{i}' for i in range(1, n_fourier_terms + 1)]
    df[feature_names] = fourier_features
    if use_32_bit:
        df[feature_names] = df[feature_names].astype('float32')
    return df, feature_names

In [25]:
# add Fourier terms for all of the specified seasonal cycle columns (month, week, hour, etc.)
# The max_values are the maximum values that season cycles can attain, e.g. for month, max_value is 12.
def bulk_add_fourier_features(df, columns_to_encode, max_values, n_fourier_terms=1, use_32_bit=False):
    added_features = []
    for column_to_encode, max_value in zip(columns_to_encode, max_values):
        df, features = add_fourier_features(
            df,
            column_to_encode,
            max_value,
            n_fourier_terms=n_fourier_terms,
            use_32_bit=use_32_bit,
        )
        added_features += features
    return df, added_features

In [26]:
full_df, added_features = bulk_add_fourier_features(
    full_df,
    ['timestamp_Month', 'timestamp_Hour', 'timestamp_Minute'],
    max_values=[12, 24, 60],
    n_fourier_terms=5,
    use_32_bit=True,
)

We save our feature-engineered version of the dataset for later use:

In [27]:
full_df[full_df['type'] == 'train'].drop(columns='type').to_parquet(
    preprocessed / 'selected_blocks_train_missing_imputed_feature_engg.parquet'
)
full_df[full_df['type'] == 'val'].drop(columns='type').to_parquet(
    preprocessed / 'selected_blocks_val_missing_imputed_feature_engg.parquet'
)
full_df[full_df['type'] == 'test'].drop(columns='type').to_parquet(
    preprocessed / 'selected_blocks_test_missing_imputed_feature_engg.parquet'
)