# Deep State Space Models (DSSMs) -- Implementation

**WARNING:** Please do not read this notebook. This notebook is simply a conglomeration of unordered code cells. Particularly, it does not (yet) contain an implementation of DSSMs that works!

In [None]:
# Imports

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from sklearn.metrics import accuracy_score, mean_absolute_error, mean_squared_error
from sklearn.model_selection import GridSearchCV, TimeSeriesSplit
from sklearn.preprocessing import MinMaxScaler

import tensorflow as tf

import torch
import torch.nn as nn
import torch.optim as optim

In [None]:
# Configs

# Target Stock Ticker
TARGET_TICKER = "TSLA"

# Dimensionality of the latent state
# Commonly used values are 10, 20, 50.
STATE_DIM = 10

# Sliding window length
SW_LENGTH = 60

# Learning rate for the Model
LEARNING_RATE = 0.001

In [None]:
# Data Load

df = pd.read_csv("../data/processed_combined_data.csv")
df["date"] = pd.to_datetime(df["date"])

targets = [f"close_{TARGET_TICKER}"]
non_features = [*targets, "date"]

X_cols = [col for col in df.columns if col not in non_features]
y_cols = targets

# X_df = df[X_cols]
X_df = df[["close_NVDA", "oil"]]
y_df = df[y_cols]

# As PyTorch wants the np type to be definitive
# and the np dtype is inferred from the df dtype,
# the following line is needed to ensure the dtype is float64
X_df = X_df.astype(np.float64) 
# X_df = X_df.select_dtypes(include=[np.number]) 

X_np = X_df.to_numpy().astype(np.float64)
y_np = y_df.to_numpy().astype(np.float64)

X_np.shape, y_np.shape

## PyTorch Version

In [None]:
# Data Preprocessing for PyTorch

# We must provide writeable arrays to PyTorch with definite dtypes (here, float32).

# Size of X_seq shall be (n_samples - SW_LENGTH + 1, SW_LENGTH, n_features).
X_seq = np.lib.stride_tricks.sliding_window_view(X_np, window_shape=SW_LENGTH, axis=0)
X_seq = X_seq.transpose(0, 2, 1)
X_seq = np.array(X_seq, dtype=np.float32)

# Size of y_seq shall be (n_samples - SW_LENGTH + 1, 1).
y_seq = y_np[SW_LENGTH - 1 :].reshape(-1, 1)
y_seq = np.array(y_seq, dtype=np.float32)

# Conversion to PyTorch tensors
X_tensor = torch.tensor(X_seq)
y_tensor = torch.tensor(y_seq)

X_tensor.shape, y_tensor.shape

In [None]:
# DSSM Architecture


class DSSM(nn.Module):

    def __init__(
        self,
        state_dim: int,
        input_dim: int,
        output_dim: int,
    ) -> None:
        super(DSSM, self).__init__()
        # Trainable parameters can be defined as nn.Parameter.
        # State transition matrix
        self.A = nn.Parameter(torch.randn(state_dim, state_dim))
        # Control matrix
        self.B = nn.Parameter(torch.randn(state_dim, input_dim))
        # Output matrix
        self.C = nn.Parameter(torch.randn(output_dim, state_dim))
        # Transition matrix
        self.D = nn.Parameter(torch.randn(output_dim, input_dim))

    # The forward function turns the model into a callable object
    # and will be executed if the model is called.
    def forward(
        self,
        s_prev: torch.Tensor,
        x_t: torch.Tensor,
    ) -> tuple[torch.Tensor, torch.Tensor]:
        # State update
        s_t = torch.matmul(self.A, s_prev) + torch.matmul(self.B, x_t)
        # Output calculation
        y_t = torch.matmul(self.C, s_t) + torch.matmul(self.D, x_t)
        return s_t, y_t

In [None]:
X_np.shape, y_np.shape

In [None]:
# # Normalize the prices so our training is more stable
# prices_norm = (prices - np.mean(prices)) / np.std(prices)


X_tensor = torch.tensor(X_np)
y_tensor = torch.tensor(y_np)

# --------------------------------
# 3. Train-Test Split (80/20 split)
# --------------------------------
train_size = int(0.8 * X_tensor.shape[0])
X_np_train = X_np[:train_size]
y_np_train = y_np[:train_size]
X_np_test = X_np[train_size:]
y_np_test = y_np[train_size:]

# Convert to PyTorch tensors.
X_train = torch.tensor(X_np_train, dtype=torch.float32)
y_train = torch.tensor(y_np_train, dtype=torch.float32)
X_test  = torch.tensor(X_np_test, dtype=torch.float32)
y_test  = torch.tensor(y_np_test, dtype=torch.float32)

# Normalize the data

# Compute mean and std using only training data
X_train_mean = X_train.mean(dim=0)
X_train_std = X_train.std(dim=0)

# Normalize training data
X_train_norm = (X_train - X_train_mean) / X_train_std

# Normalize test data using training statistics
X_test_norm = (X_test - X_train_mean) / X_train_std

# y_mean = y_train.mean(dim=0)
# y_std = y_train.std(dim=0)

# y_train_norm = (y_train - y_mean) / y_std
# y_test_norm = (y_test - y_mean) / y_std  # Use training stats



# -----------------------------------------------------------------------------
# 3. Train the Model
# -----------------------------------------------------------------------------

state_dim = STATE_DIM
input_dim = X_tensor.shape[-1]
output_dim = y_tensor.shape[-1]
model = DSSM(state_dim, input_dim, output_dim)


optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

num_epochs = 50
for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()
    loss = 0.0
    # Initialize the state for the training sequence.
    s = torch.zeros(state_dim, 1)
    
    # Iterate sequentially over the training data.
    for i in range(train_size):
        x_t = X_train_norm[i].view(input_dim, 1)   # current feature vector.
        s, y_pred = model(s, x_t)
        # Teacher forcing: use the true stock price as target.
        loss += criterion(y_pred, y_train[i].view(output_dim, 1))
    
    loss.backward()
    optimizer.step()
    
    if (epoch + 1) % 50 == 0:
        print(f"Epoch {epoch+1}/{num_epochs} - Loss: {loss.item():.4f}")


# -----------------------------------------------------------------------------
# 4. Multi-Step Forecast on Test Data
# -----------------------------------------------------------------------------
# We want to see how well the model predicts a stock sample path.
# We'll use the external test features (from X_test) and carry forward the state.

# First, obtain the final state from the training dataset by replaying the training inputs.
model.eval()
with torch.no_grad():
    s = torch.zeros(state_dim, 1)
    for i in range(train_size):
        x_t = X_train[i].view(input_dim, 1)
        s, _ = model(s, x_t)
    
    # Now, generate predictions over the test period.
    predictions = []
    for i in range(X_test.shape[0]):
        x_t = X_test[i].view(input_dim, 1)
        s, y_pred = model(s, x_t)
        predictions.append(y_pred.item())
    
    predictions = np.array(predictions)
    actual = y_test.squeeze().numpy()



# -----------------------------------------------------------------------------
# 5. Comparison: Plot Predicted vs. Actual Stock Sample Paths
# -----------------------------------------------------------------------------
plt.figure(figsize=(10, 5))
plt.plot(predictions, label="Predicted Stock Price")
plt.plot(actual, label="Actual Stock Price", alpha=0.7)
plt.xlabel("Time Step in Test Set")
plt.ylabel("Normalized Stock Price")
plt.title("Multi-Step Forecast vs. Actual Stock Sample Path")
plt.legend()
plt.show()


In [None]:
# Model Initialisation and Training Setup

state_dim = STATE_DIM
input_dim = X_tensor.shape[-1]
output_dim = y_tensor.shape[-1]

model = DSSM(
    state_dim,
    input_dim,
    output_dim,
)

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

# Initialize the state as zeros
s_prev = torch.zeros(state_dim)

In [None]:
# Prediction

# Number of future steps to predict
FUTURE_STEPS = 10

predictions = []

# Use the last observed return as the starting input.
x_now = X_tensor[-1, -1, :]

for i in range(FUTURE_STEPS):
    # Forward pass using the last state and input.
    s_now, y_now = model(s_prev, x_now)
    predictions.append(y_now.item())
    # Update state and input.
    s_prev = s_now.detach()
    # x_now = y_now.detach()

# print("Future Predictions:", predictions)

## Tensorflow/Keras Version

In [None]:
# Data Preprocessing for TensorFlow

# We must provide writeable arrays to PyTorch with definite dtypes (here, float32).

# Size of X_seq shall be (n_samples - SW_LENGTH + 1, SW_LENGTH, n_features).
X_seq = np.lib.stride_tricks.sliding_window_view(X_np, window_shape=SW_LENGTH, axis=0)
X_seq = X_seq.transpose(0, 2, 1)
X_seq = np.array(X_seq, dtype=np.float32)

# Size of y_seq shall be (n_samples - SW_LENGTH + 1, 1).
y_seq = y_np[SW_LENGTH - 1 :].reshape(-1, 1)
y_seq = np.array(y_seq, dtype=np.float32)

# Conversion to PyTorch tensors
X_tensor = X_seq
y_tensor = y_seq

X_tensor.shape, y_tensor.shape

In [None]:
# DSSM Architecture


class DSSM(tf.keras.Model):
    def __init__(
        self,
        state_dim: int,
        input_dim: int,
        output_dim: int,
    ) -> None:
        super(DSSM, self).__init__()

        # Initialize learnable state-space matrices.
        # Transition matrix
        self.A = tf.Variable(
            tf.random.normal([state_dim, state_dim]), trainable=True, name="A"
        )
        # Control matrix
        self.B = tf.Variable(
            tf.random.normal([state_dim, input_dim]), trainable=True, name="B"
        )
        # Output matrix
        self.C = tf.Variable(
            tf.random.normal([output_dim, state_dim]), trainable=True, name="C"
        )
        # Transition matrix
        self.D = tf.Variable(
            tf.random.normal([output_dim, input_dim]), trainable=True, name="D"
        )

    def call(self, inputs, training=False):
        # Unpack the inputs
        s_prev, x_t = inputs  
        # State update
        s_t = tf.matmul(self.A, s_prev) + tf.matmul(self.B, x_t)
        # Output calculatuon
        y_t = tf.matmul(self.C, s_t) + tf.matmul(self.D, x_t)
        return s_t, y_t

In [None]:
# Example dimension setup (adjust based on your data shapes):
state_dim = 10
input_dim = X_tensor.shape[-1]  # e.g., number of features per time step
output_dim = y_tensor.shape[-1]  # e.g., 1 for a single stock return

# Initialize the DSSM model with a tanh activation.
model = DSSM(state_dim, input_dim, output_dim, activation=tf.keras.activations.tanh)

# Define a loss function and an optimizer.
loss_fn = tf.keras.losses.MeanSquaredError()
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

In [None]:
# Number of future steps to predict.
FUTURE_STEPS = 10  
predictions = []

# In a forecasting scenario, you'll have an initial state and an initial input.
# Here, we assume the state is initialized to zeros.
s_prev = tf.zeros([state_dim, 1], dtype=tf.float32)

# For the starting input, decide whether you want to use the last observed price (from y_tensor)
# or the last available feature vector (from X_tensor). For demonstration, let's use y_tensor.
# If y_tensor is (num_samples, 1), take the last row and reshape as (1, 1).
x_now = tf.reshape(y_tensor[-1], (1, 1))

for _ in range(FUTURE_STEPS):
    # Forward pass: model takes state and last_input.
    s_now, y_now = model([s_prev, x_now])
    
    # Append prediction (convert tensor to scalar float)
    predictions.append(y_now.numpy().item())
    
    # Update state: detach gradients to prevent backprop through time.
    s_prev = tf.stop_gradient(s_now)
    
    # Update last_input with the predicted output.
    # Note: Ensure the shape matches the model's expectation.
    x_now = tf.reshape(tf.stop_gradient(y_now), (1, 1))
    
print("Future Predictions:", predictions)


In [None]:
def multi_step_forecast(
    model: DSSM,
    initial_state: torch.Tensor,
    forecast_horizon: int,
    autoregressive: bool = False,
    start_input: torch.Tensor = None
) -> torch.Tensor:
    """
    Generate a multi-step forecast with the DSSM model.

    Parameters:
      model (DSSM): Instance of the DSSM model.
      initial_state (torch.Tensor): Initial state with shape [state_dim, 1].
      forecast_horizon (int): Number of time steps to forecast.
      autoregressive (bool): If True, uses model prediction as the next input.
      start_input (torch.Tensor, optional): The initial input for autoregressive mode 
            (shape [input_dim, 1]), required if autoregressive is True.

    Returns:
      torch.Tensor: Stacked forecast outputs with shape [forecast_horizon, output_dim, 1].
    """
    predictions = []  # Collect forecast outputs for each step.
    s_t = initial_state  # Starting state (shape: [state_dim, 1])

    if start_input is None:
        raise ValueError("start_input must be provided for autoregressive forecasting.")
    x = start_input.view(-1, 1)
    for _ in range(forecast_horizon):
        s_t, y_t = model(s_t, x)
        predictions.append(y_t)
        # Replace x with the model's output for the next step.
        # This assumes output_dim == input_dim.
        x = y_t
    
    # Stack the predictions into a single tensor of shape [forecast_horizon, output_dim, 1].
    return torch.stack(predictions, dim=0)

In [None]:
state_dim = STATE_DIM
input_dim = X_tensor.shape[-1]
output_dim = y_tensor.shape[-1]

# Instantiate the DSSM model.
model = DSSM(state_dim, input_dim, output_dim)

# Define an initial state (e.g., zeros).
initial_state = torch.zeros(state_dim, 1)

# Set forecast horizon.
forecast_horizon = 5

start_input = torch.randn(input_dim, 1)
forecast_autoregressive = multi_step_forecast(
    model,
    initial_state,
    forecast_horizon=forecast_horizon,
    autoregressive=True,
    start_input=start_input
)
print("\nMulti-step forecast (autoregressive):")
print(forecast_autoregressive)

In [None]:
# # Train-Test Split

# L = y_0.shape[0]

# tts_chunk_size = TTS_CHUNK_SIZE

# # Create overlapping windows
# X = np.lib.stride_tricks.sliding_window_view(
#     X_0,
#     window_shape=tts_chunk_size,
#     axis=0,
# )[:-1]
# y = y_0[tts_chunk_size:]

# X.shape, X_0.shape, y.shape, y_0.shape
# # X, y = create_sequences(scaled_data, seq_length)

In [None]:
# # Scaling

scaler = MinMaxScaler()
X_np_ = scaler.fit_transform(X_np.reshape(-1, 1))
# y_0 = y_0

X_np.shape, y_np.shape
# y_0.size

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

class DSSMForecast(nn.Module):
    """
    A simple DSSM-style model for stock forecasting.

    The model consists of two towers:
      - stock_branch: processes historical stock features.
      - aux_branch: processes auxiliary (e.g., market indicator) features.
      
    Their latent representations are concatenated and fed into a final
    fully connected layer to produce the forecasted stock value.
    """
    def __init__(self, stock_input_dim, aux_input_dim, embed_dim, hidden_dim):
        super(DSSMForecast, self).__init__()
        
        # Branch to process historical stock features.
        self.stock_branch = nn.Sequential(
            nn.Linear(stock_input_dim, embed_dim),
            nn.ReLU(),
            nn.Linear(embed_dim, embed_dim),
            nn.ReLU()
        )
        
        # Branch to process auxiliary market indicators.
        self.aux_branch = nn.Sequential(
            nn.Linear(aux_input_dim, embed_dim),
            nn.ReLU(),
            nn.Linear(embed_dim, embed_dim),
            nn.ReLU()
        )
        
        # The final fully connected network combines the two embeddings.
        self.fc = nn.Sequential(
            nn.Linear(2 * embed_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)  # Output a single forecast value (e.g., next day's closing price)
        )
    
    def forward(self, stock_input, aux_input):
        # Process each input branch separately.
        stock_embed = self.stock_branch(stock_input)  # Shape: [batch, embed_dim]
        aux_embed = self.aux_branch(aux_input)          # Shape: [batch, embed_dim]
        
        # Concatenate the embeddings from both branches.
        combined = torch.cat((stock_embed, aux_embed), dim=1)  # Shape: [batch, 2*embed_dim]
        
        # Generate the final forecast.
        output = self.fc(combined)
        return output

# Example usage of the model:
if __name__ == "__main__":
    # Configuration parameters.
    batch_size = 32
    stock_input_dim = 10    # For instance, features like open, high, low, close, volume, etc.
    aux_input_dim = 5       # For instance, sentiment scores, technical indicators, etc.
    embed_dim = 64          # Dimension of the latent space for each branch.
    hidden_dim = 32         # Intermediate hidden dimension for combining the embeddings.
    
    # Instantiate the model.
    model = DSSMForecast(stock_input_dim, aux_input_dim, embed_dim, hidden_dim)
    
    # Define a mean squared error loss function (typical for regression tasks).
    criterion = nn.MSELoss()
    
    # Use an optimizer such as Adam.
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    # Dummy training loop (replace with your own dataset and dataloader in practice).
    num_epochs = 10
    for epoch in range(num_epochs):
        # Generate dummy data for demonstration:
        stock_data = torch.randn(batch_size, stock_input_dim)
        aux_data = torch.randn(batch_size, aux_input_dim)
        target = torch.randn(batch_size, 1)  # Dummy target values (e.g., next-day closing prices)
        
        # Forward pass: compute predicted outputs by passing inputs to the model.
        outputs = model(stock_data, aux_data)
        loss = criterion(outputs, target)
        
        # Backward pass: compute gradient and update weights.
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")
