In [18]:
import os
import warnings

warnings.filterwarnings("ignore")  # avoid printing out absolute paths

In [19]:
# increase default window size for notebook
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:95% !important; }</style>"))

In [20]:
# use pytorch-forecasting
import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping, LearningRateMonitor

from pytorch_forecasting import TimeSeriesDataSet, TemporalFusionTransformer


In [21]:
import copy
from pathlib import Path
import warnings

import numpy as np
import pandas as pd
import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping, LearningRateMonitor
from pytorch_lightning.loggers import TensorBoardLogger
import torch

from pytorch_forecasting import Baseline, TemporalFusionTransformer, TimeSeriesDataSet
from pytorch_forecasting.data import GroupNormalizer
from pytorch_forecasting.metrics import SMAPE, PoissonLoss, QuantileLoss
from pytorch_forecasting.models.temporal_fusion_transformer.tuning import optimize_hyperparameters


In [22]:
import sys, os
cwd = sys.path[0]
sys.path.append(os.path.join(cwd, 'my_modules'))                # sys.path[0] is dir of the ipynb file
import custom_plot
import data_clean
import data_preprocess
import constants

In [23]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import re
import timeit
from datetime import timedelta
from dataclasses import dataclass       # C like structure
import glob                             # finds all the pathnames matching specified pattern
import datetime as dt
import random
!python --version
print('pandas version: ' + pd.__version__)
print('numpy version: ' + np.__version__)

Python 3.9.7
pandas version: 1.4.2
numpy version: 1.21.5


In [24]:
%matplotlib notebook
# Plot related packages,%matplotlib notebook makes plots in jupyter interactive
# constants for plotting
x_label_elapsedtime = 0
x_label_datetime = 1

In [25]:
# Load processed data (phase 1)

dir_path = os.path.join(cwd, 'csv')
filename = 'cell_cycles.pkl'

# load last saved df from phase1 data and convert time stamp and sort
li_ts_cycles_ph1 = data_preprocess.load_object(dir_path, filename)

In [26]:
# Load processed data (phase 2)
dir_path = os.path.join(cwd, 'csv', 'phase_2_pkl')
filename = 'mod1_cell_cycles.pkl'
li_ts_cycles_ph2 = data_preprocess.load_object(dir_path, filename)

In [27]:
# Combine phase1 and phase 2 data
import math

# for x, y in li_ts_cycles_ph1:
#     print('phase1 cells', x, y)
    
# for x, y in li_ts_cycles_ph2:
#     print('phase2 cells', len(y))

combined_phase_cycles = []
if len(li_ts_cycles_ph1) <= len(li_ts_cycles_ph2):
    for i in range(len(li_ts_cycles_ph1)):              # each cell contain multiple cycles
        c_1, cycles_1 = li_ts_cycles_ph1[i]                 # for example, (v1, [cycles])
        c_2, cycles_2 = li_ts_cycles_ph2[i]
        if (c_2 == c_1):
            cycles_1 += cycles_2                        # this adds cycle to li_ts_cycles_ph1
    combined_phase_cycles = li_ts_cycles_ph1
else:
    for i in range(len(li_ts_cycles_ph2)):              # each cell contain multiple cycles
        c_2, cycles_2 = li_ts_cycles_ph2[i]
        c_1, cycles_1 = li_ts_cycles_ph1[i]
        if (c_2 == c_1):
            cycles_2 += cycles_1                        # this adds cycle to li_ts_cycles
    combined_phase_cycles = li_ts_cycles_ph2


for x, y in combined_phase_cycles:
    print('combined cells', x)

combined cells V1
combined cells V2
combined cells V3
combined cells V4
combined cells V5
combined cells V6
combined cells V7
combined cells V8
combined cells V9
combined cells V10
combined cells V11
combined cells V12


In [28]:
import math

# Separate training, validation and test cycles, keep cycles without cell ID
TRAIN_SAMPLES = 0.9
li_train_cycles = []      # each of this list is a separate static_dynamic_static cycle
# li_validation_cycles = []
li_test_cycles= []
counter = 0

total_cycles = 0
for (c, li_cycles) in combined_phase_cycles:              # each cell contain multiple cycles
    counter += 1
    num_cell_cycles = len(li_cycles)
    num_train_cycles = math.floor(num_cell_cycles * TRAIN_SAMPLES)                 # training cycle samples 70%
    # num_validation_cycles = math.ceil((num_cell_cycles - num_train_cycles) * 0.5)
    # num_test_cycles = num_cell_cycles - num_validation_cycles - num_train_cycles
    num_test_cycles = num_cell_cycles - num_train_cycles
    total_cycles += len(li_cycles)
    # print(num_cell_cycles, num_train_cycles, num_validation_cycles, num_test_cycles)

    li_train_cycles += li_cycles[0:num_train_cycles]
    # li_validation_cycles += li_cycles[num_train_cycles:num_train_cycles+num_validation_cycles]
    # li_test_cycles += li_cycles[num_train_cycles+num_validation_cycles:]
    li_test_cycles += li_cycles[num_train_cycles:]
#     if (counter == 7):
#         break
    break

print(total_cycles, len(li_train_cycles), len(li_test_cycles))     # test

17 15 2


### univariate: current to voltage mapping

In [29]:
# Make dataset ready for LSTM
# [temporal input sequence] [output]
# example:
# [[[1], [2], [3], [4], [5]]] [6]
# [[[2], [3], [4], [5], [6]]] [7]
# [[[3], [4], [5], [6], [7]]] [8]

def df_to_supervised_univariate(df_feature, df_label, window_size=1):
    df_feature_as_np = df_feature.to_numpy()
    df_label_as_np = df_label.to_numpy()
    X = []
    y = []
    total_rows = len(df_feature_as_np) - window_size + 1
    for i in range(len(df_feature_as_np) - window_size):
        row = [[a] for a in df_feature_as_np[i:i+window_size]]
        X.append(row)
        label = df_label_as_np[i+window_size]
        y.append(label)
    return np.array(X), np.array(y)

In [30]:
from sklearn.preprocessing import MinMaxScaler, StandardScaler

TIMESTEPS = 10
# One sample only
df = li_train_cycles[2].copy()
df = df[['V', 'current', 'Temp', 'elapsed_sec']]

# statndard
df['V_standard'] = StandardScaler().fit_transform(df[['V']])
df['cur_standard'] = StandardScaler().fit_transform(df[['current']])

# normnal
df['V_normal'] = MinMaxScaler().fit_transform(df[['V']])
df['cur_normal'] = MinMaxScaler().fit_transform(df[['current']])

df['elapsed_sec'] = df['elapsed_sec'].astype(int)

# print(df.describe())
print(df)

# reverse data before training, for test
# df = df.iloc[::-1]
# df1 = df.loc[df['contactor_state'] == 2]               # take only dynamic voltage

# X, y = df_to_supervised_univariate(df['cur_standard'], df['V_standard'], TIMESTEPS)     # standard
# X, y = df_to_supervised_univariate(df['cur_normal'], df['V_normal'], TIMESTEPS)     # normal
# X, y = df_to_supervised_univariate(df['current'], df['V'], TIMESTEPS)     # without normalization
# print(X.shape, y.shape)


             V  current   Temp  elapsed_sec  V_standard  cur_standard  \
695111  4.1432      0.0  14.25            0    2.186559      0.776016   
695112  4.1432      0.0  14.25            1    2.186559      0.776016   
695113  4.1432      0.0  14.25            2    2.186559      0.776016   
695114  4.1432      0.0  14.25            3    2.186559      0.776016   
695115  4.1432      0.0  14.25            4    2.186559      0.776016   
...        ...      ...    ...          ...         ...           ...   
696599  4.0188      0.0  15.00         1488   -0.420828      0.776016   
696600  4.0188      0.0  15.00         1489   -0.420828      0.776016   
696601  4.0189      0.0  15.00         1490   -0.418732      0.776016   
696602  4.0189      0.0  15.00         1491   -0.418732      0.776016   
696603  4.0189      0.0  15.00         1492   -0.418732      0.776016   

        V_normal  cur_normal  
695111  1.000000    0.805672  
695112  1.000000    0.805672  
695113  1.000000    0.805672  

In [32]:
###########
# Import libraries
###########

# Basic Python
import sys # Python sys functions
import os  # Python os functions
import time # Python time functions
import typing #Python types
import logging # Python logging functions
import warnings # Python warnings
warnings.filterwarnings("ignore")  # don't show warnings
# import fastparquet  # Engine for parquet support
# import GPUtil #GPU status from NVIDA GPUs

# Open-source libraries:
import numpy as np # Numerical processing
import pandas as pd  # Dataframe (tabular data) processing
# import matplotlib # Graph plotting
from matplotlib import pyplot as plt # Graph plotting
# stop warnings from pyotorch_forecasting too many open plots
plt.rcParams.update({'figure.max_open_warning': 0})
# import ray                # Run distributed code
# from ray_lightning import RayPlugin #Ray plugin to parallelize Pytorch Lightning
# from ray.train import Trainer # Ray library for other AI libraries

# PyTorch, PyTorch Lightning, and PyTorch Forecasting
import torch  #Pytorch
import pytorch_lightning as pl  #PyTorch Lightning convenience APIs for PyTorch
import pytorch_forecasting as ptf #PyTorch Forecasting convenience APIs for PyTorch Lightning
pl.seed_everything(415)  # Set global random seed

# PyTorch visualization uses Tensorboard
import tensorflow as tf #Tensorflow
import tensorboard as tb  #Tensorboard
tf.io.gfile = tb.compat.tensorflow_stub.io.gfile  #compatibility for PyTorch

# TODO remove this
# Override pytorch_forecasting with my copy
sys.path.insert( 0, os.path.abspath("../githubPublicPytorchForecasting/my-copy-pytorch-forecasting") )
import pytorch_forecasting as ptf 


!python --version
print(f"pytorch: {torch.__version__}")
print(f"pytorch_lightning: {pl.__version__}")
print(f"pytorch_forecasting: {ptf.__version__}")
# print(f"ray: {ray.__version__}")
# print(f"gputil: {GPUtil.__version__}")


Global seed set to 415


Python 3.9.7
pytorch: 1.11.0
pytorch_lightning: 1.6.5
pytorch_forecasting: 0.10.2


In [33]:
# Define functions
# Todo: Move functions inside util.py

# Convert data from pandas to PyTorch tensors.
def convert_pandas_pytorch_timeseriesdata(
    input_data_pandas_df:pd.DataFrame, 
    config:dict
) -> typing.Union['pytorch_forecasting.data.timeseries.TimeSeriesDataSet',
                  'torch.utils.data.dataloader.DataLoader']:

    """Converts pandas dataframe into TimeSeries folded tensors following 
       the backtesting technique.  A generator for doing the folding is 
       per batch also created.  One for the training data.  
       Another for the validation data.  

    Inputs:
        pd.DataFrame: All the input data
        dict: config is a configuration file containing hard-coded settings.

    Returns:
        'pytorch_forecasting.data.timeseries.TimeSeriesDataSet': training data
        'torch.utils.data.dataloader.DataLoader': training data loader
        'torch.utils.data.dataloader.DataLoader': validation data loader
    """
    
    # specify data parameters
    FORECAST_HORIZON = config.get("forecast_horizon", 168)
    CONTEXT_LENGTH = config.get("context_length", 63)
    BATCH_SIZE = config.get("batch_size", 32)
    NUM_TRAINING_WORKERS = config.get("num_training_workers", 4)
    id_col_name = "pulocationid"
    target_value = "trip_quantity"
    
    the_df = input_data_pandas_df.copy()
    
    # define forecast horizon and training cutoff
    max_prediction_length = FORECAST_HORIZON  #decoder length = 1 week forecast horizon
    max_encoder_length = CONTEXT_LENGTH  # window or context length
    training_cutoff = the_df["time_idx"].max() - max_prediction_length 

    # convert pandas to PyTorch tensor
    training_data = ptf.data.TimeSeriesDataSet(
        the_df[lambda x: x.time_idx <= training_cutoff],
        allow_missing_timesteps=True,
        time_idx="time_idx",
        target=target_value,
        group_ids=[id_col_name],
        min_encoder_length=5,  # min 5 historical values must exist
        max_encoder_length=max_encoder_length,
        min_prediction_length=1,
        max_prediction_length=max_prediction_length,
        static_categoricals=[id_col_name],
        # static_reals=["avg_population_2017", "avg_yearly_household_income_2017"],
        static_reals=[],
        time_varying_known_categoricals=["day_hour"],
        # group of categorical variables can be treated as one variable
        # variable_groups={"special_days": special_days},  
        time_varying_known_reals=["time_idx", ],
                            # "mean_item_loc_weekday",
                            # "binned_max_item"],
        time_varying_unknown_categoricals=[],
        time_varying_unknown_reals=[target_value,],

        # https://pytorch-forecasting.readthedocs.io/en/v0.2.4/_modules/pytorch_forecasting/data.html
        target_normalizer=ptf.data.GroupNormalizer(
            groups=["pulocationid"], 
            transformation="softplus"  #forces positive values
        ), 
        add_relative_time_idx=True, # add as feature
        add_target_scales=True, # add avg target_value as feature
        add_encoder_length=True, # add as feature
    )
    
    # create PyTorch dataloader for training
    train_loader = training_data\
                        .to_dataloader(
                            train=True, 
                            batch_size=BATCH_SIZE, 
                            num_workers=NUM_TRAINING_WORKERS)
    
    # create validation PyTorch data 
    # (predict=True) means make do inference using the validation data
    val_dataset = ptf.data.TimeSeriesDataSet\
                    .from_dataset(
                        training_data, 
                        data=the_df, 
                        predict=True, 
                        stop_randomization=True)

    # create PyTorch dataloaders for inference on validation data
    validation_loader = val_dataset\
                    .to_dataloader(
                        train=False, 
                        batch_size=BATCH_SIZE * 10, 
                        num_workers=NUM_TRAINING_WORKERS)
    
    # return original df converted to PyTorch tensors, and pytorch loaders
    return training_data, train_loader, validation_loader

# Define a PyTorch Lightning TemporalFusionTransformer model
def define_pytorch_model(
    train_dataset: 'pytorch_forecasting.data.timeseries.TimeSeriesDataSet', 
    config: dict, 
    ray_plugin: 'ray_lightning.ray_ddp.RayPlugin'
) -> typing.Union['pytorch_forecasting.models.temporal_fusion_transformer.TemporalFusionTransformer',
                  'pytorch_lightning.trainer.trainer.Trainer']:

    """Define a PyTorch Lightning TemporalFusionTransformer model and a 
       PyTorch Lightning trainer.  Initial values for the model are hard-
       coded in the config dictionary.

    Returns:
        'pytorch_forecasting.models.temporal_fusion_transformer.TemporalFusionTransformer': model
        'pytorch_lightning.trainer.trainer.Trainer': trainer for fitting the model
    """  
    # get the parameters from config
    NUM_GPU = config.get("num_gpus", 0)
    EPOCHS = config.get("epochs", 30)
    LR = config.get("lr", 0.01)
    HIDDEN_SIZE = config.get("hidden_size", 40)
    ATTENTION_HEAD_SIZE = config.get("attention_head_size", 4)
    HIDDEN_CONTINUOUS_SIZE = config.get("hidden_continuous_size", 1)
    DROPOUT = config.get("droupout", 0.1)
    LIMIT_TRAIN_BATCHES = config.get("limit_train_batches", 30)
    FAST_MODE = config.get("fast_mode", False)
    TUNING_RUN = config.get("tuning_run", False)
    
    print(f"learning_rate = {LR}")
    print(f"hidden_size = {HIDDEN_SIZE}")
    print(f"attention_head_size = {ATTENTION_HEAD_SIZE}")
    print(f"hidden_continuous_size = {HIDDEN_CONTINUOUS_SIZE}")
    print(f"limit_train_batches = {LIMIT_TRAIN_BATCHES}")
    
    if ray_plugin is None:
        PLUGINS = []
    else:
        PLUGINS=[ray_plugin]

    # configure early stopping when validation loss does not improve 
    early_stop_callback = \
        pl.callbacks.EarlyStopping(
            monitor="val_loss", 
            min_delta=1e-4, 
            patience=10,   #1
            verbose=False, 
            mode="min")
    
    # Create the Tune Reporting Callback
    metrics = {"loss": "ptf.metrics.QuantileLoss()"}
    tune_callback = \
        ray.tune.integration.pytorch_lightning.TuneReportCallback(
            metrics, 
            on="validation_end")
    
    # configure logging
    lr_logger = pl.callbacks.LearningRateMonitor(logging_interval='epoch')
    logger = pl.loggers.TensorBoardLogger("lightning_logs")  # log results to a tensorboard
        
    # Define callbacks based on passed parameter to run tuning or not
    if TUNING_RUN:
        CALLBACKS = [lr_logger, early_stop_callback, tune_callback]
    else:
        CALLBACKS = [lr_logger, early_stop_callback]
        
    # configure PyTorch trainer with Ray Lightning plugin
    torch_trainer = pl.Trainer(
        max_epochs=EPOCHS,
        gpus=NUM_GPU,
        # weights_summary="top",
        
        # The value at which to clip gradients. 
        # Passing gradient_clip_val=None disables gradient clipping. 
        gradient_clip_val=0.1, 
        
        # Number of batches or percent size of training data each epoch
        # limit_train_batches=30,  #use 30 batches of training data each epoch 
        limit_train_batches=LIMIT_TRAIN_BATCHES,  
        
        # how often to log, default=50
        logger=logger,
        # log_every_n_steps=500,  #default 50
        
        # sanity check runs n batches of val before starting the training, default=2
        # num_sanity_val_steps=1,
        
        # Callbacks run sequentially in the order defined here.  Except ModelCheckpoint callback
        # runs after all others to ensure all states are saved to the checkpoints.
        callbacks=CALLBACKS,
        
        # Run "fast" mode for quick sanity check
        # Note: No trainer checkpoints will be saved in fast mode
        fast_dev_run=FAST_MODE,
        
        # This is the Ray parallelizing distributed part
        # regular python - just comment out below line
        plugins = PLUGINS
    )
    if not FAST_MODE:
        print(f"checkpoints location: {torch_trainer.logger.log_dir}")
    

    # initialize the model
    tft = ptf.models.TemporalFusionTransformer.from_dataset(
        train_dataset,
        learning_rate=LR,
        hidden_size=HIDDEN_SIZE, # num neurons in each layer, bigger runs more slowly
        # lstm_layers=HIDDEN_LAYERS, #LSTM layers=1 #default=1 for tft architecture
        attention_head_size=ATTENTION_HEAD_SIZE,  #default 4 cells in LSTM layer
        dropout=DROPOUT,
        hidden_continuous_size=HIDDEN_CONTINUOUS_SIZE,  #similar to categorical embedding size
        # 7 quantiles by default: [0.02, 0.1, 0.25, 0.5, 0.75, 0.9, 0.98]
        # output_size=7,  
        # optimizer loss metric
        loss=ptf.metrics.QuantileLoss(),
        # uncomment for learning rate finder and otherwise, e.g. to 10 for logging every 10 batches
        log_interval=50,  #50
        reduce_on_plateau_patience=4, # reduce learning automatically
    )
    print(f"Number of parameters in network: {tft.size()/1e3:.1f}k")
    
    # return the model and trainer
    return tft, torch_trainer

# Define a calling function to read data, define model, train it
def train_func(config: dict, 
               ray_plugin: 'ray_lightning.ray_ddp.RayPlugin'
) -> typing.Union['torch.utils.data.dataloader.DataLoader',
        'pytorch_lightning.trainer.trainer.Trainer',
        'pytorch_forecasting.models.temporal_fusion_transformer.TemporalFusionTransformer',
         str]:
    """Define a calling function to read data, define a model and train it.

    Inputs:
        dict: configuration dictionary with hard-coded runtime values
        'ray_lightning.ray_ddp.RayPlugin': plugin for PyTorch Lightning trainer

    Returns:
        'torch.utils.data.dataloader.DataLoader': validation data loader
        'pytorch_lightning.trainer.trainer.Trainer': trainer for fitting the model
        'pytorch_forecasting.models.temporal_fusion_transformer.TemporalFusionTransformer': trained model
        str: path where Pytorch Forecasting model is stored
    """
    # # stop warnings from pyotorch_forecasting too many open plots
    # matplotlib.rcParams.update({'figure.max_open_warning': 0})
    
    # read data into pandas dataframe
    filename = "data/clean_taxi_hourly.parquet"
    df = pd.read_parquet(filename)
    df = df[["time_idx", "pulocationid", "day_hour",
                 "trip_quantity", "mean_item_loc_weekday",
                 "binned_max_item"]].copy()

    # convert data from pandas to PyTorch tensors
    train_dataset, train_loader, validation_loader = \
        convert_pandas_pytorch_timeseriesdata(df, config)

    # define a PyTorch deep learning forecasting model
    model, trainer = define_pytorch_model(
                                           train_dataset, 
                                           config,
                                           ray_plugin)


In [26]:
# Create dataset and dataloaders

max_prediction_length = 300                # last 300 seconds as a validation set
max_encoder_length = 24
training_cutoff = df["elapsed_sec"].max() - max_prediction_length

training = TimeSeriesDataSet(
    df[lambda x: x["elapsed_sec"] <= training_cutoff],
    time_idx="elapsed_sec",
    target="V",    # voltage
    group_ids=["current", "V"],
    min_encoder_length=max_encoder_length // 2,  # keep encoder length long (as it is in the validation set)
    max_encoder_length=max_encoder_length,
    min_prediction_length=1,
    max_prediction_length=max_prediction_length,
    static_categoricals=[],       # no static in my data, static = does not vary with time
    static_reals=[],
    time_varying_known_categoricals=[],
    variable_groups={},  # group of categorical variables can be treated as one variable
    time_varying_known_reals=["V", "current"],
    time_varying_unknown_categoricals=[],
    time_varying_unknown_reals=[
        "V",
        "current",
    ],
#     target_normalizer=GroupNormalizer(
#         groups=["agency", "sku"], transformation="softplus"
#     ),  # use softplus and normalize by group
    add_relative_time_idx=True,
    add_target_scales=True,
    add_encoder_length=True,
)

# create validation set (predict=True) which means to predict the last max_prediction_length points in time
# for each series
validation = TimeSeriesDataSet.from_dataset(training, df, predict=True, stop_randomization=True)

# create dataloaders for model
batch_size = 32  # set this between 32 to 128
train_dataloader = training.to_dataloader(train=True, batch_size=batch_size, num_workers=0)
val_dataloader = validation.to_dataloader(train=False, batch_size=batch_size * 10, num_workers=0)


AssertionError: target V should be an unknown continuous variable in the future