<a href="https://colab.research.google.com/github/NithyaprasathS/time_series_forecasting/blob/main/Time_Series_Forecasting.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [17]:
import numpy as np                      # Numerical operations & data generation
import pandas as pd                     # Tabular time series handling
import torch                            # Deep learning framework
import torch.nn as nn                   # Neural network layers
from torch.utils.data import Dataset, DataLoader  # Efficient batching
from sklearn.preprocessing import StandardScaler  # Feature scaling
from sklearn.metrics import mean_squared_error, mean_absolute_error
from captum.attr import IntegratedGradients       # Explainability

def generate_time_series(n_days=1095):
    """
    Generates synthetic multivariate time series data
    with trend, seasonality, noise, and structural break.
    """
    t = np.arange(n_days)

    # Feature 1: Trend + yearly seasonality
    f1 = 0.01 * t + np.sin(2 * np.pi * t / 365)

    # Feature 2: Weekly seasonality + noise
    f2 = np.sin(2 * np.pi * t / 7) + 0.3 * np.random.randn(n_days)

    # Feature 3: Structural break after day 700
    f3 = np.where(t < 700, 0.5 * t / 700, 1.2 * t / 700)
    f3 += 0.2 * np.random.randn(n_days)

    data = np.vstack([f1, f2, f3]).T
    return pd.DataFrame(data, columns=["feature_1", "feature_2", "feature_3"])

def preprocess_data(df, window_size=60, forecast_horizon=30):
    """
    Normalizes data and creates rolling statistics and windows.
    """
    scaler = StandardScaler()
    scaled = scaler.fit_transform(df)

    X, y = [], []

    for i in range(len(scaled) - window_size - forecast_horizon):
        X.append(scaled[i:i+window_size])
        y.append(scaled[i+window_size:i+window_size+forecast_horizon, 0])
        # Forecast feature_1

    return np.array(X), np.array(y), scaler

class TimeSeriesDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

class LSTMForecast(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        _, (hidden, _) = self.lstm(x)
        return self.fc(hidden[-1])

    def train_model(self, loader, optimizer, criterion, epochs=20):
        self.train()
        for epoch in range(epochs):
            total_loss = 0
            for X, y in loader:
                optimizer.zero_grad()
                output = self(X)
                loss = criterion(output, y)
                loss.backward()
                optimizer.step()
                total_loss += loss.item()
            print(f"Epoch {epoch+1}, Loss: {total_loss/len(loader):.4f}")

def evaluate(model, loader):
    model.eval()
    preds, actuals = [], []

    with torch.no_grad():
        for X, y in loader:
            preds.append(model(X).numpy())
            actuals.append(y.numpy())

    preds = np.vstack(preds)
    actuals = np.vstack(actuals)

    rmse = np.sqrt(mean_squared_error(actuals, preds))
    mae = mean_absolute_error(actuals, preds)
    return rmse, mae
def explain(model, sample):
    ig = IntegratedGradients(model)
    attribution = ig.attribute(sample.unsqueeze(0), target=0)
    return attribution.squeeze().detach().numpy()

if __name__ == "__main__":
    df = generate_time_series()

    X, y, scaler = preprocess_data(df)

    split1 = int(0.7 * len(X))
    split2 = int(0.85 * len(X))

    train_ds = TimeSeriesDataset(X[:split1], y[:split1])
    val_ds = TimeSeriesDataset(X[split1:split2], y[split1:split2])
    test_ds = TimeSeriesDataset(X[split2:], y[split2:])

    train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_ds, batch_size=32)

    model = LSTMForecast(input_size=3, hidden_size=64, output_size=30)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()

    model.train_model(train_loader, optimizer, criterion)

    rmse, mae = evaluate(model, test_loader)
    print(f"Test RMSE: {rmse:.4f}, MAE: {mae:.4f}")

    sample_X, _ = test_ds[0]
    attributions = explain(model, sample_X)
    print("Explainability shape:", attributions.shape)

Epoch 1, Loss: 0.4874
Epoch 2, Loss: 0.1957
Epoch 3, Loss: 0.0565
Epoch 4, Loss: 0.0294
Epoch 5, Loss: 0.0143
Epoch 6, Loss: 0.0084
Epoch 7, Loss: 0.0074
Epoch 8, Loss: 0.0070
Epoch 9, Loss: 0.0065
Epoch 10, Loss: 0.0063
Epoch 11, Loss: 0.0060
Epoch 12, Loss: 0.0059
Epoch 13, Loss: 0.0056
Epoch 14, Loss: 0.0059
Epoch 15, Loss: 0.0055
Epoch 16, Loss: 0.0058
Epoch 17, Loss: 0.0053
Epoch 18, Loss: 0.0056
Epoch 19, Loss: 0.0047
Epoch 20, Loss: 0.0044
Test RMSE: 0.3466, MAE: 0.3044
Explainability shape: (60, 3)
