In [5]:
!pip install torch torchvision torchaudio


import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt

# N-BEATS Block
class NBeatsBlock(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(NBeatsBlock, self).__init__()
        self.hidden_layers = nn.Sequential(
            *[nn.Sequential(nn.Linear(input_size, hidden_size), nn.ReLU()) for _ in range(num_layers)]
        )
        self.backcast = nn.Linear(hidden_size, input_size)  # Ensure correct shape
        self.forecast = nn.Linear(hidden_size, output_size)


    def forward(self, x):
        x = self.hidden_layers(x)  # Pass through hidden layers
        backcast = self.backcast(x)  # Predict backcast
        forecast = self.forecast(x)  # Predict forecast
        return backcast, forecast


# Full N-BEATS Model
class NBeats(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_blocks, num_layers):
        super(NBeats, self).__init__()
        self.blocks = nn.ModuleList([NBeatsBlock(input_size, hidden_size, output_size, num_layers) for _ in range(num_blocks)])

    def forward(self, x):
        residual = x
        forecast_sum = torch.zeros(x.shape[0], self.blocks[0].forecast.out_features, device=x.device)
        for block in self.blocks:
            backcast, forecast = block(residual)
            residual = residual - backcast  # Update residual
            forecast_sum += forecast  # Sum forecasts
        return forecast_sum




In [6]:
# Generate synthetic sine wave data
def generate_sine_wave(num_samples=500, input_size=10, output_size=5):
    x = np.linspace(0, 50, num_samples)
    y = np.sin(x) + np.random.normal(scale=0.1, size=x.shape)  # Sine wave with noise
    X, Y = [], []
    
    for i in range(len(y) - input_size - output_size):
        X.append(y[i:i+input_size])
        Y.append(y[i+input_size:i+input_size+output_size])
    
    return np.array(X), np.array(Y)

# Create dataset
input_size, output_size = 10, 5
X_train, Y_train = generate_sine_wave(num_samples=500, input_size=input_size, output_size=output_size)
X_train, Y_train = torch.tensor(X_train, dtype=torch.float32), torch.tensor(Y_train, dtype=torch.float32)


In [8]:
# Hyperparameters
hidden_size = 128
num_blocks = 3
num_layers = 2
epochs = 100
batch_size = 32
learning_rate = 0.001

# Initialize model
model = NBeats(input_size, hidden_size, output_size, num_blocks, num_layers)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
loss_fn = nn.MSELoss()

X_train = X_train.view(X_train.shape[0], -1)
Y_train = Y_train.view(Y_train.shape[0], -1)


# Training loop
for epoch in range(epochs):
    optimizer.zero_grad()
    predictions = model(X_train)
    loss = loss_fn(predictions, Y_train)
    loss.backward()
    optimizer.step()
    
    if (epoch + 1) % 10 == 0:
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")


RuntimeError: mat1 and mat2 shapes cannot be multiplied (485x128 and 10x128)

In [9]:
# Make predictions
model.eval()
with torch.no_grad():
    test_inputs = X_train[:10]  # Use first 10 samples
    predictions = model(test_inputs).numpy()

# Plot predictions vs. actual values
plt.figure(figsize=(10, 5))
for i in range(5):
    plt.plot(range(input_size, input_size + output_size), Y_train[i], label=f"Actual {i}", linestyle="dashed")
    plt.plot(range(input_size, input_size + output_size), predictions[i], label=f"Predicted {i}")

plt.xlabel("Time Step")
plt.ylabel("Value")
plt.legend()
plt.title("N-BEATS Time Series Forecasting")
plt.show()


RuntimeError: mat1 and mat2 shapes cannot be multiplied (10x128 and 10x128)