In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score
import matplotlib.pyplot as plt


In [None]:
SEQ_LEN = 30         
BATCH_SIZE = 64
HIDDEN_DIM = 64
NUM_EPOCHS = 30
LR = 1e-3
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [None]:
df = pd.read_csv("data/SHEL_data.csv")


In [None]:

feature_cols = ["Open", "High", "Low", "Close", "Adj Close", "Volume"]
values = df[feature_cols].astype(np.float32).values
n_samples, n_features = values.shape


In [None]:
split_idx = int(0.9 * n_samples)


In [None]:
scaler = StandardScaler()
values_train = values[:split_idx]
scaler.fit(values_train)
values_scaled = scaler.transform(values).astype(np.float32)


In [None]:
def make_sequences(values_scaled, seq_len, split_idx):
    X_train, y_train = [], []
    X_test, y_test = [], []

    for t in range(seq_len, split_idx):
        X_train.append(values_scaled[t-seq_len:t])
        y_train.append(values_scaled[t])

    n_total = values_scaled.shape[0]
    for t in range(split_idx, n_total):
        if t - seq_len < 0:
            continue
        X_test.append(values_scaled[t-seq_len:t])
        y_test.append(values_scaled[t])

    X_train = np.stack(X_train)  
    y_train = np.stack(y_train) 
    X_test = np.stack(X_test)    
    y_test = np.stack(y_test)    

    return X_train, y_train, X_test, y_test


In [None]:

X_train, y_train, X_test, y_test = make_sequences(values_scaled, SEQ_LEN, split_idx)


In [None]:
print("Train sequences:", X_train.shape, "Test sequences:", X_test.shape)


In [None]:
class TimeSeriesDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.from_numpy(X)  
        self.y = torch.from_numpy(y) 

    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


In [None]:
train_ds = TimeSeriesDataset(X_train, y_train)
test_ds = TimeSeriesDataset(X_test, y_test)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False)


In [None]:
class NBeatsBlock(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        flattened_size = SEQ_LEN * input_dim
        self.fc1 = nn.Linear(flattened_size, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim)
        self.fc4 = nn.Linear(hidden_dim, hidden_dim)
        self.backcast = nn.Linear(hidden_dim, flattened_size)
        self.forecast = nn.Linear(hidden_dim, output_dim)
        self.relu = nn.ReLU()

    def forward(self, x_flat):
        x = self.relu(self.fc1(x_flat))
        x = self.relu(self.fc2(x))
        x = self.relu(self.fc3(x))
        x = self.relu(self.fc4(x))
        backcast = self.backcast(x)  # [B, T*D]
        forecast = self.forecast(x)  # [B, D]
        return backcast, forecast


class NBeatsForecaster(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_stacks=2, num_blocks=1):
        super().__init__()
        self.blocks = nn.ModuleList()
        for _ in range(num_stacks):
            for _ in range(num_blocks):
                self.blocks.append(NBeatsBlock(input_dim, hidden_dim, output_dim))

    def forward(self, x):
        B = x.shape[0]
        x_flat = x.view(B, -1) 
        forecast_sum = 0
        for block in self.blocks:
            backcast, forecast = block(x_flat)
            forecast_sum = forecast_sum + forecast
            x_flat = x_flat - backcast  # residual 
        return forecast_sum


In [None]:
def make_model(model_name="N-BEATS"):
    return NBeatsForecaster(n_features, HIDDEN_DIM, n_features)


In [None]:
def train_model(model_name="N-BEATS"):
    model = make_model(model_name).to(DEVICE)
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=LR)

    for epoch in range(1, NUM_EPOCHS + 1):
        model.train()
        train_loss = 0.0
        for xb, yb in train_loader:
            xb = xb.to(DEVICE)
            yb = yb.to(DEVICE)

            optimizer.zero_grad()
            preds = model(xb)
            loss = criterion(preds, yb)
            loss.backward()
            optimizer.step()
            train_loss += loss.item() * xb.size(0)

        train_loss /= len(train_ds)
        if epoch % 5 == 0 or epoch == 1:
            print(f"[{model_name}] Epoch {epoch}/{NUM_EPOCHS}, "
                  f"Train MSE (scaled): {train_loss:.4f}")

    return model


In [None]:
def compute_metrics(y_true_scaled, y_pred_scaled, X_test_scaled_last):
    
    mse_scaled = np.mean((y_pred_scaled - y_true_scaled) ** 2)
    rmse_scaled = np.sqrt(mse_scaled)

    y_true = scaler.inverse_transform(y_true_scaled)
    y_pred = scaler.inverse_transform(y_pred_scaled)
    eps = 1e-6
    mape = np.mean(np.abs((y_true - y_pred) / (np.abs(y_true) + eps))) * 100.0

    last = X_test_scaled_last
    actual_change = np.sign(y_true_scaled - last)
    pred_change = np.sign(y_pred_scaled - last)
    correct_dir = (actual_change == pred_change).astype(np.float32)
    mda = correct_dir.mean()

    r2 = r2_score(
        y_true_scaled.reshape(-1),
        y_pred_scaled.reshape(-1)
    )

    bias_per_feature = np.mean(y_pred - y_true, axis=0)  # [D]
    bias_overall = bias_per_feature.mean()
    # classify over / under / none
    avg_abs_level = np.mean(np.abs(y_true))
    threshold = 0.01 * avg_abs_level  # 1% of average magnitude
    if abs(bias_overall) < threshold:
        bias_flag = "None / minimal"
    elif bias_overall > 0:
        bias_flag = "Over-forecast (too high)"
    else:
        bias_flag = "Under-forecast (too low)"

    metrics = {
        "Scaled_RMSE": float(rmse_scaled),
        "MAPE_percent": float(mape),
        "MDA": float(mda),
        "R2": float(r2),
        "Bias_overall": float(bias_overall),
        "Bias_flag": bias_flag,
        "Bias_per_feature": dict(zip(feature_cols, bias_per_feature))
    }
    return metrics


**N-BEATS**


In [None]:
model_name = "Open Source Model #4"
model = train_model(model_name)


In [None]:
model.eval()
all_preds_scaled = []
all_true_scaled = []
all_last_inputs_scaled = []

with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(DEVICE)
        yb = yb.to(DEVICE)
        preds = model(xb)

        all_preds_scaled.append(preds.cpu().numpy())
        all_true_scaled.append(yb.cpu().numpy())
        all_last_inputs_scaled.append(xb[:, -1, :].cpu().numpy())


In [None]:
y_pred_scaled = np.concatenate(all_preds_scaled, axis=0)   # [N_test, D]
y_true_scaled = np.concatenate(all_true_scaled, axis=0)    # [N_test, D]
X_last_scaled = np.concatenate(all_last_inputs_scaled, axis=0)  # [N_test, D]


In [None]:
metrics = compute_metrics(y_true_scaled, y_pred_scaled, X_last_scaled)
print("\n=== Metrics for", model_name, "===")
for k, v in metrics.items():
    if isinstance(v, dict):
        print(k, ":")
        for kk, vv in v.items():
            print(f"  {kk}: {vv:.4f}")
    else:
        print(k, ":", v)


In [None]:
row_for_sheet = {
    "Model": model_name,
    "Scaled RMSE": metrics["Scaled_RMSE"],
    "MAPE": metrics["MAPE_percent"],
    "Mean Directional Accuracy (MDA)": metrics["MDA"],
    "R^2": metrics["R2"],
    "Forecast Bias?": metrics["Bias_flag"],
}
print("\nRow for spreadsheet:", row_for_sheet)


In [None]:
y_true = scaler.inverse_transform(y_true_scaled)
y_pred = scaler.inverse_transform(y_pred_scaled)

n_test = y_true.shape[0]
t_axis = np.arange(n_test)

fig, axes = plt.subplots(n_features, 1,
                         figsize=(12, 3 * n_features),
                         sharex=True)

if n_features == 1:
    axes = [axes]

for d, col in enumerate(feature_cols):
    ax = axes[d]
    ax.plot(t_axis, y_true[:, d], label=f"True {col}")
    ax.plot(t_axis, y_pred[:, d], label=f"Pred {col}")
    ax.set_ylabel(col)
    ax.grid(True, alpha=0.3)
    if d == 0:
        ax.legend(loc="upper right")

axes[-1].set_xlabel("Test window index (time)")
plt.suptitle(f"{model_name}: True vs Pred on Holdout (last 10%)")
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()
