In [None]:
import torch
import torch.nn as nn

class TimeSeriesTransformer(nn.Module):
    def __init__(
        self,
        d_model: int,
        nhead: int = 4,
        num_layers: int = 1,
        dropout: float = 0.1,
        pool: str = 'last'
    ):
        """
        Transformer module for time-series features output by CNN.

        Args:
            d_model (int): Dimensionality of feature embeddings (num_filters from CNN).
            nhead (int): Number of attention heads.
            num_layers (int): Number of Transformer encoder layers.
            dropout (float): Dropout probability in Transformer.
            pool (str): Pooling strategy: 'last' or 'mean'.
        """
        super().__init__()
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dropout=dropout,
            batch_first=False
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
        self.pool = pool

    def forward(self, cnn_features: torch.Tensor) -> torch.Tensor:
        """
        Args:
            cnn_features: Tensor of shape [batch_size, d_model, seq_len]
        Returns:
            Tensor of shape [batch_size, d_model]
        """
        # Permute to [seq_len, batch_size, d_model]
        x = cnn_features.permute(2, 0, 1)
        # Apply Transformer encoder
        tr_out = self.transformer(x)  # [seq_len, batch_size, d_model]
        # Pool over the sequence dimension
        if self.pool == 'last':
            output = tr_out[-1]         # Last time step [batch_size, d_model]
        elif self.pool == 'mean':
            output = tr_out.mean(dim=0) # Mean over seq [batch_size, d_model]
        else:
            raise ValueError(f"Unknown pool type: {self.pool}")
        return output

    def get_parameters(self):
        return self.parameters()


Demo

In [None]:
def main():
    import os
    os.environ["CUDA_VISIBLE_DEVICES"] = ""   # force CPU
    import numpy as np
    import pandas as pd
    import torch

    # 1) Synthetic price data
    np.random.seed(42)
    dates = pd.date_range("2022-01-01", periods=10)
    prices = pd.DataFrame({
        "Asset_1": np.cumprod(1 + np.random.normal(0, 0.01, size=10)),
        "Asset_2": np.cumprod(1 + np.random.normal(0, 0.01, size=10))
    }, index=dates)
    print("1. Prices shape:", prices.shape, "\n")

    # 2) Generate residuals
    gen = CointegrationResidualGenerator(prices)
    gen.compute_all_asset_residuals()
    residuals = gen.get_asset_residuals()
    print("2. Residuals shape:", residuals.shape, "\n")

    # 3) Prepare CNN input
    window = 5
    cnn_np = gen.prepare_cnn_input_from_residuals(window=window)
    print("3. CNN input shape:", cnn_np.shape, "\n")
    cnn_tensor = torch.tensor(
        cnn_np.transpose(0,2,1),
        dtype=torch.float32,
        device='cpu'
    )

    # 4) CNN forward (handle 1‐ or 2‐output cases)
    cnn_model = CNN(
        input_length=window,
        num_features=cnn_np.shape[2],
        num_filters=4,
        num_classes=2,
        filter_size=2
    ).to('cpu')

    cnn_out = cnn_model(cnn_tensor)
    if isinstance(cnn_out, tuple):
        logits, feat = cnn_out
    else:
        # logits only; manually extract the conv‐feature map for Transformer
        logits = cnn_out
        with torch.no_grad():
            x1 = cnn_model.relu(cnn_model.conv1(cnn_tensor))
            feat = cnn_model.relu(cnn_model.conv2(x1))
            # apply skip‐connection exactly as in your CNN
            diff = cnn_tensor.shape[2] - feat.shape[2]
            x_skip = cnn_tensor[:, :, diff:] if diff>0 else cnn_tensor
            if feat.shape == x_skip.shape:
                feat = feat + x_skip

    print("4. CNN logits shape:", logits.shape)
    print("5. CNN feature map shape:", feat.shape, "\n")

    # 5) Transformer forward
    trans_model = TimeSeriesTransformer(
        d_model=feat.size(1),
        nhead=4
    ).to('cpu')
    emb = trans_model(feat)
    print("6. Transformer embedding shape:", emb.shape, "\n")

    # 6) FNN allocator + normalization
    fnn_model = FNN(input_dim=emb.size(1), hidden_dim=32).to('cpu')
    w_eps  = fnn_model(emb)
    w_norm = soft_normalize(w_eps)
    print("7. Raw weights shape:", w_eps.shape)
    print("8. Normalized weights shape:", w_norm.shape)

if __name__ == "__main__":
    main()


In [None]:
# ──────────────────────────────────────────────────────────────────────────────
# 1) Display the price matrix
from IPython.display import display

print("1) Electricity Price Matrix (France & Germany, 2021-05-10→16):")
display(price_matrix.head())
print("Shape:", price_matrix.shape, "\n")

# 2) Generate cointegration residuals
gen = CointegrationResidualGenerator(price_matrix)
gen.compute_all_asset_residuals()
residuals = gen.get_asset_residuals()

print("2) Cointegration Residuals (head):")
display(residuals.head())
print("Shape:", residuals.shape, "\n")

# 3) Prepare CNN input (3-day window for this tiny example)
window = 3
cnn_np = gen.prepare_cnn_input_from_residuals(window=window)
print(f"3) CNN input shape: {cnn_np.shape}  # (windows, days, assets)")
print("   Sample window [0]:\n", cnn_np[0], "\n")

# 4) Run through CNN block
import torch

# Build tensor [batch, features, length]
cnn_tensor = torch.tensor(cnn_np.transpose(0, 2, 1), dtype=torch.float32)

cnn_model = CNN(
    input_length=window,
    num_features=cnn_np.shape[2],
    num_filters=4,
    num_classes=2,
    filter_size=2
).to("cpu")

# Forward pass
out = cnn_model(cnn_tensor)
# Handle return signature
if isinstance(out, tuple):
    logits, feat = out
else:
    logits = out
    # Manually rebuild feat if needed
    x1 = cnn_model.relu(cnn_model.conv1(cnn_tensor))
    feat = cnn_model.relu(cnn_model.conv2(x1))
    diff = cnn_tensor.size(2) - feat.size(2)
    skip = cnn_tensor[:, :, diff:] if diff > 0 else cnn_tensor
    if feat.shape == skip.shape:
        feat = feat + skip

print(f"4) CNN logits shape: {logits.shape}")
print(f"   Sample logits:\n{logits[:3]}\n")
print(f"5) CNN feature-map shape: {feat.shape}\n")

# 5) Transformer
trans_model = TimeSeriesTransformer(d_model=feat.size(1), nhead=4).to("cpu")
emb = trans_model(feat)
print(f"6) Transformer embeddings shape: {emb.shape}")
print(f"   Sample embeddings:\n{emb[:3]}\n")

# 6) FNN allocator + soft-normalize
fnn_model = FNN(input_dim=emb.size(1), hidden_dim=32).to("cpu")
w_raw = fnn_model(emb)
w_norm = soft_normalize(w_raw)

print(f"7) Raw allocation weights shape: {w_raw.shape}")
print(f"   Sample w_raw:\n{w_raw[:3]}\n")
print(f"8) L₁-normalized weights shape: {w_norm.shape}")
print(f"   Sample w_norm:\n{w_norm[:3]}")
# ──────────────────────────────────────────────────────────────────────────────
