In [1]:
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import torch
import torch.nn.functional as F
from torch import nn
from pytorch_forecasting import TimeSeriesDataSet
from pytorch_forecasting.models import BaseModel
import lightning.pytorch as lpt

In [2]:
N_LAGS = 7
HORIZON = 1
BATCH_SIZE = 16

mvtseries = pd.read_csv(
    "../assets/daily_multivariate_timeseries.csv",
    parse_dates=["datetime"],
    index_col="datetime",
)

n_vars = mvtseries.shape[1] + 1

In [3]:
class MultivariateSeriesDataModule(lpt.LightningDataModule):
    def __init__(
        self,
        data: pd.DataFrame,
        n_lags: int,
        horizon: int,
        test_size: float = 0.2,
        batch_size: int = BATCH_SIZE,
    ):
        super().__init__()
        self.data = data
        self.feature_names = [col for col in data.columns if col != "Incoming Solar"]
        self.batch_size = batch_size
        self.test_size = test_size
        self.n_lags = n_lags
        self.horizon = horizon
        self.target_scaler = StandardScaler()
        self.training = None
        self.validation = None
        self.test = None
        self.predict_set = None

    def preprocess_data(self):
        self.data["target"] = self.data["Incoming Solar"]
        self.data["time_index"] = np.arange(len(self.data))
        self.data["group_id"] = [0] * len(
            self.data
        )  # Assuming a single group for simplicity; adjust if needed

    def split_data(self):
        time_indices = self.data["time_index"].values
        train_indices, test_indices = train_test_split(
            time_indices, test_size=self.test_size, shuffle=False
        )
        train_indices, val_indices = train_test_split(
            train_indices, test_size=0.1, shuffle=False
        )
        return train_indices, val_indices, test_indices

    def scale_target(self, df, indices):
        scaled_values = self.target_scaler.transform(df.loc[indices, ["target"]])
        df.loc[indices, "target"] = scaled_values

    def setup(self, stage=None):
        # This method includes all the main data preprocessing steps
        self.preprocess_data()
        train_indices, val_indices, test_indices = self.split_data()

        train_df = self.data.loc[self.data["time_index"].isin(train_indices)]
        val_df = self.data.loc[self.data["time_index"].isin(val_indices)]
        test_df = self.data.loc[self.data["time_index"].isin(test_indices)]

        # Scale the target variable
        self.target_scaler.fit(train_df[["target"]])
        self.scale_target(df=train_df, indices=train_df.index)
        self.scale_target(df=val_df, indices=val_df.index)
        self.scale_target(df=test_df, indices=test_df.index)

        train_df = train_df.drop("Incoming Solar", axis=1)
        val_df = val_df.drop("Incoming Solar", axis=1)
        test_df = test_df.drop("Incoming Solar", axis=1)

        # Setup datasets
        self.training = TimeSeriesDataSet(
            train_df,
            time_idx="time_index",
            target="target",
            group_ids=["group_id"],
            max_encoder_length=self.n_lags,
            max_prediction_length=self.horizon,
            time_varying_unknown_reals=self.feature_names,
            scalers={name: StandardScaler() for name in self.feature_names},
        )

        self.validation = TimeSeriesDataSet.from_dataset(
            dataset=self.training, data=val_df
        )
        self.test = TimeSeriesDataSet.from_dataset(dataset=self.training, data=test_df)
        self.predict_set = TimeSeriesDataSet.from_dataset(
            dataset=self.training, data=self.data, predict=True
        )

    def train_dataloader(self):
        return self.training.to_dataloader(batch_size=self.batch_size, shuffle=False)

    def val_dataloader(self):
        return self.validation.to_dataloader(batch_size=self.batch_size, shuffle=False)

    def test_dataloader(self):
        return self.test.to_dataloader(batch_size=self.batch_size, shuffle=False)

    def predict_dataloader(self):
        return self.predict_set.to_dataloader(batch_size=1, shuffle=False)

In [4]:
class FeedForwardNet(nn.Module):
    def __init__(self, input_size, output_size):
        super().__init__()

        self.net = nn.Sequential(
            nn.Linear(input_size, 16),
            nn.ReLU(),
            nn.Linear(16, 8),
            nn.ReLU(),
            nn.Linear(8, output_size),
        )

    def forward(self, X):
        # Flatten the input tensor from [batch_size, N_LAGS, n_vars] to [batch_size, N_LAGS*n_vars]
        X = X.view(X.size(0), -1)
        return self.net(X)

In [5]:
class FeedForwardModel(BaseModel):
    def __init__(self, input_dim: int, output_dim: int):
        self.save_hyperparameters()

        super().__init__()
        self.network = FeedForwardNet(
            input_size=input_dim,
            output_size=output_dim,
        )

        self.train_loss_history = []
        self.val_loss_history = []

        self.train_loss_sum = 0.0
        self.val_loss_sum = 0.0
        self.train_batch_count = 0
        self.val_batch_count = 0

    def forward(self, x):
        #  method defines how the elements of the network interact and model the time series
        network_input = x["encoder_cont"].squeeze(-1)

        prediction = self.network(network_input)
        output = self.to_network_output(prediction=prediction)

        return output

    def on_train_epoch_end(self):
        # Compute the average loss and reset counters
        if self.train_batch_count > 0:
            avg_train_loss = self.train_loss_sum / self.train_batch_count
            self.train_loss_history.append(avg_train_loss)
            self.train_loss_sum = 0.0
            self.train_batch_count = 0

    def on_validation_epoch_end(self):
        # Compute the average loss and reset counters
        if self.val_batch_count > 0:
            avg_val_loss = self.val_loss_sum / self.val_batch_count
            self.val_loss_history.append(avg_val_loss)
            self.val_loss_sum = 0.0
            self.val_batch_count = 0

    def training_step(self, batch, batch_idx):
        x, y = batch
        y_pred = self(x).prediction
        y_pred = y_pred.squeeze(1)

        y_actual = y[0].squeeze(1)

        loss = F.mse_loss(y_pred, y_actual)
        self.train_loss_sum += loss.item()
        self.train_batch_count += 1

        self.log("train_loss", loss)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        y_pred = self(x).prediction
        y_pred = y_pred.squeeze(1)

        y_actual = y[0].squeeze(1)

        loss = F.mse_loss(y_pred, y_actual)
        self.val_loss_sum += loss.item()
        self.val_batch_count += 1
        self.log("val_loss", loss)
        return loss

    def test_step(self, batch, batch_idx):
        x, y = batch

        y_pred = self(x).prediction
        y_pred = y_pred.squeeze(1)

        y_actual = y[0].squeeze(1)

        loss = F.mse_loss(y_pred, y_actual)
        self.log("test_loss", loss)

    def predict_step(self, batch, batch_idx):
        x, y = batch

        y_pred = self(x).prediction
        y_pred = y_pred.squeeze(1)

        return y_pred

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.01)

In [6]:
datamodule = MultivariateSeriesDataModule(
    data=mvtseries, n_lags=N_LAGS, horizon=HORIZON, batch_size=BATCH_SIZE, test_size=0.3
)

datamodule.setup()

model = FeedForwardModel(input_dim=N_LAGS * n_vars, output_dim=1)

trainer = lpt.Trainer(max_epochs=30)
trainer.fit(model=model, datamodule=datamodule)

trainer.test(model=model, datamodule=datamodule)

forecasts = trainer.predict(model=model, datamodule=datamodule)

/home/volody/code/study-py/ts-pytorch/.venv/lib/python3.13/site-packages/lightning/pytorch/utilities/parsing.py:210: Attribute 'loss' is an instance of `nn.Module` and is already saved during checkpointing. It is recommended to ignore them using `self.save_hyperparameters(ignore=['loss'])`.
/home/volody/code/study-py/ts-pytorch/.venv/lib/python3.13/site-packages/lightning/pytorch/utilities/parsing.py:210: Attribute 'logging_metrics' is an instance of `nn.Module` and is already saved during checkpointing. It is recommended to ignore them using `self.save_hyperparameters(ignore=['logging_metrics'])`.
ðŸ’¡ Tip: For seamless cloud uploads and versioning, try installing [litmodels](https://pypi.org/project/litmodels/) to enable LitModelCheckpoint, which syncs automatically with the Lightning model registry.
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
/home/volody/code/study-py/ts-pytorch/.venv/lib/python3.13/site-packages/lightning/pytorch/trainer/connect

Sanity Checking: |          | 0/? [00:00<?, ?it/s]

/home/volody/code/study-py/ts-pytorch/.venv/lib/python3.13/site-packages/lightning/pytorch/trainer/connectors/data_connector.py:434: The 'val_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.


RuntimeError: mat1 and mat2 shapes cannot be multiplied (16x56 and 70x16)