In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset, Dataset

import lightning as L
from lightning.pytorch.tuner import Tuner
from lightning.pytorch.callbacks.early_stopping import EarlyStopping
from lightning.pytorch.callbacks import DeviceStatsMonitor
from lightning.pytorch.callbacks import BasePredictionWriter
from lightning.pytorch.profilers import PyTorchProfiler

In [None]:
class TimeSeriesDataset(Dataset):
  def __init__(self, X: np.ndarray, y: np.ndarray, seq_len: int = 1):
    self.X = torch.tensor(X).float()
    self.y = torch.tensor(y).float()
    self.seq_len = seq_len

  def __len__(self):
    return self.X.__len__() - (self.seq_len-1)

  def __getitem__(self, index):
    return (self.X[index:index+self.seq_len], self.y[index+self.seq_len-1])

In [None]:
class ColoradoDataModule(L.LightningDataModule):
  def __init__(self, data_dir: str, scaler: int, seq_len: int, batch_size: int, num_workers: int, is_persistent: bool):
    super().__init__()
    self.data_dir = data_dir
    self.scaler = scaler
    self.seq_len = seq_len
    self.batch_size = batch_size
    self.num_workers = num_workers
    self.is_persistent = is_persistent
    self.X_train = None
    self.y_train = None
    self.X_val = None
    self.y_val = None
    self.X_test = None
    self.y_test = None

  def setup(self, stage: str):
    df = pd.read_csv(self.data_dir)
    df.index = df['Start_DateTime']
    df = df[['Start_DateTime', 'Energy_Consumption']].sort_index()
    df.dropna(inplace=True)
    df['Start_DateTime'] = pd.to_datetime(df['Start_DateTime'], format='%Y-%m-%d %H:%M:%S')
    df.set_index('Start_DateTime', inplace=True)
    X = df.copy()
    y = X['Energy_Consumption'].shift(-1).ffill()
    X_tv, self.X_test, y_tv, self.y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
    self.X_train, self.X_val, self.y_train, self.y_val = train_test_split(X_tv, y_tv, test_size=0.25, shuffle=False)
    
    preprocessing = self.scaler
    preprocessing.fit(self.X_train) # should only fit to training data
        
    if stage == "fit" or stage is None:
      self.X_train = preprocessing.transform(self.X_train)
      self.y_train = self.y_train.values.reshape((-1, 1))
      self.X_val = preprocessing.transform(self.X_val)
      self.y_val = self.y_val.values.reshape((-1, 1))

    if stage == "test" or "predict" or stage is None:
      self.X_test = preprocessing.transform(self.X_test)
      self.y_test = self.y_test.values.reshape((-1, 1))

  def train_dataloader(self):
    train_dataset = TimeSeriesDataset(self.X_train, self.y_train, seq_len=self.seq_len)
    train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.num_workers, persistent_workers=self.is_persistent)
    return train_loader
  
  def val_dataloader(self):
    val_dataset = TimeSeriesDataset(self.X_val, self.y_val, seq_len=self.seq_len)
    val_loader = DataLoader(val_dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.num_workers, persistent_workers=self.is_persistent)
    return val_loader

  def test_dataloader(self):
    test_dataset = TimeSeriesDataset(self.X_test, self.y_test, seq_len=self.seq_len)
    test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.num_workers, persistent_workers=self.is_persistent)
    return test_loader

  def predict_dataloader(self):
    test_dataset = TimeSeriesDataset(self.X_test, self.y_test, seq_len=self.seq_len)
    test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.num_workers, persistent_workers=self.is_persistent)
    return test_loader
      

In [None]:
class LSTM(L.LightningModule):
  def __init__(self, input_size, hidden_size, num_layers, criterion, dropout, learning_rate):
    super().__init__()
    self.save_hyperparameters(ignore=['criterion'])
    self.dropout = dropout
    self.criterion = criterion
    self.learning_rate = learning_rate
    self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
    self.fc = nn.Linear(hidden_size, 1)

  def forward(self, x):
    out, _ = self.lstm(x)
    out = self.fc(out[:, -1, :])  # Get the last time step
    return out

  def training_step(self, batch, batch_idx):
    x, y = batch
    y_hat = self(x)
    train_loss = self.criterion(y_hat, y) 
    self.log("train_loss", train_loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
    return train_loss

  def validation_step(self, batch, batch_idx):
    x, y = batch
    y_hat = self(x)
    val_loss = self.criterion(y_hat, y)
    self.log("val_loss", val_loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
    return val_loss

  def test_step(self, batch, batch_idx):
    x, y = batch
    y_hat = self(x)
    test_loss = self.criterion(y_hat, y)
    self.log("test_loss", test_loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
    return test_loss

  def predict_step(self, batch, batch_idx):
    x, y = batch
    y_hat = self(x)
    return y_hat

  def configure_optimizers(self):
    return torch.optim.Adam(self.parameters(), lr=0.001)

In [None]:
class CustomWriter(BasePredictionWriter):
  def __init__(self, output_dir, write_interval):
    super().__init__(write_interval)
    self.output_dir = output_dir

  def write_on_epoch_end(self, trainer, pl_module, predictions, batch_indices):
    torch.save(predictions, os.path.join(self.output_dir, f"predictions_{trainer.global_rank}.pt"))
    # torch.save(batch_indices, os.path.join(self.output_dir, f"batch_indices_{trainer.global_rank}.pt")) # for batch indices if needed

In [None]:
params = dict(
  seq_len = 12,
  batch_size = 8,
  criterion = nn.MSELoss(),
  max_epochs = 50,
  n_features = 7,
  hidden_size = 100,
  num_layers = 1,
  dropout = 1, # can be 0.2 if more output layers are present
  learning_rate = 0.001,
  num_workers = 0, # only work in .py for me
  is_persistent = False, # only work in .py for me
  scaler = MinMaxScaler()
)

In [None]:
model = LSTM(input_size=1, hidden_size=params['hidden_size'], num_layers=params['num_layers'], criterion=params['criterion'], dropout=params['dropout'], learning_rate=params['learning_rate'])
colmod = ColoradoDataModule(data_dir='ColoradoData_Boulder.csv', scaler=params['scaler'], seq_len=params['seq_len'], batch_size=params['batch_size'], num_workers=params['num_workers'], is_persistent=params['is_persistent'])
pred_writer = CustomWriter(output_dir="Models", write_interval="epoch")

Bottleneck Finding

In [None]:
trainer_bottleneck = L.Trainer(max_epochs=50, profiler="simple")
trainer_bottleneck.fit(model, colmod)
trainer_bottleneck.test(model, colmod)
trainer_bottleneck.predict(model, colmod, return_predictions=False)

CPU usage

In [None]:
trainer_cpu = L.Trainer(profiler=PyTorchProfiler())
trainer_cpu.fit(model, colmod)
trainer_cpu.test(model, colmod)
trainer_cpu.predict(model, colmod, return_predictions=False)

Performance Evaluation

In [None]:
trainer_eval = L.Trainer(max_epochs=50, callbacks=[DeviceStatsMonitor()])
trainer_eval.fit(model, colmod)
trainer_eval.test(model, colmod)
trainer_eval.predict(model, colmod, return_predictions=False)

Fast Development run (runs n batches of training, validation and test to check for bugs)

In [None]:
trainer_dev = L.Trainer(max_epochs=params['max_epochs'], default_root_dir='Models', callbacks=[EarlyStopping(monitor="val_loss", mode="min")], fast_dev_run=10)
trainer_dev.fit(model, colmod)
trainer_dev.test(model, colmod)
trainer_dev.predict(model, colmod, return_predictions=False)

Find largest batch size fitting into memory, often yielding better estimations of gradients but can result in longer runtimes

In [None]:
trainer_tun = L.Trainer(max_epochs=params['max_epochs'], default_root_dir='Models', callbacks=[EarlyStopping(monitor="val_loss", mode="min")])
tuner = Tuner(trainer_tun)

tuner.scale_batch_size(model, colmod, mode="power")
# tuner.scale_batch_size(model, colmod, mode="binsearch")

Find Learning Rate Tuning

In [None]:
trainer_LRtun = L.Trainer(max_epochs=params['max_epochs'], default_root_dir='Models', callbacks=[EarlyStopping(monitor="val_loss", mode="min")])
tuner2 = Tuner(trainer_LRtun)

lr_finder = tuner2.lr_find(model, colmod)
# print(lr_finder.results)

fig = lr_finder.plot(suggest=True)
fig.show()

new_lr = lr_finder.suggestion()