<a href="https://colab.research.google.com/github/thegallier/timeseries/blob/main/controlled_rough_paths.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch

class ControlledRoughPath:
    def __init__(self, driving_path):
        """
        Initialize a controlled rough path with a driving rough path.
        :param driving_path: Tensor representing the driving path (n_samples, d_features).
        """
        self.driving_path = driving_path
        self.gubinelli_derivative = None

    def compute_gubinelli_derivative(self, controlled_path):
        """
        Compute the Gubinelli derivative of the controlled path with respect to the driving path.
        :param controlled_path: Tensor representing the controlled path (n_samples, d_features).
        :return: Gubinelli derivative as a tensor.
        """
        delta_x = self.driving_path[1:] - self.driving_path[:-1]
        delta_y = controlled_path[1:] - controlled_path[:-1]
        self.gubinelli_derivative = delta_y / delta_x  # Approximation
        return self.gubinelli_derivative

    def control_decomposition(self, controlled_path):
        """
        Perform the decomposition Y(t) = Y(s) + G(s)(X(t) - X(s)) + R_{s,t}.
        :param controlled_path: Tensor representing the controlled path.
        :return: Remainder term R_{s,t}.
        """
        g_derivative = self.compute_gubinelli_derivative(controlled_path)
        approx_y = controlled_path[:-1] + g_derivative * (self.driving_path[1:] - self.driving_path[:-1])
        remainder = controlled_path[1:] - approx_y
        return remainder

# Example usage
if __name__ == "__main__":
    # Driving path (e.g., a Brownian motion approximation)
    driving_path = torch.cumsum(torch.randn(100, 1), dim=0)

    # Controlled path (e.g., evolving in response to the driving path)
    controlled_path = torch.sin(driving_path)

    # Controlled rough path analysis
    crp = ControlledRoughPath(driving_path)
    remainder = crp.control_decomposition(controlled_path)

    print("Remainder term (R_{s,t}):", remainder)

Remainder term (R_{s,t}): tensor([[ 0.0000e+00],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [-5.9605e-08],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [-2.9802e-08],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [ 1.4901e-08],
        [-5.9605e-08],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [-2.9802e-08],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [-2.9802e-08],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [-9.3132e-09],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [-1.4901e-08],
        [ 0.0000e+00],
        [ 0.0000e+00],
        [ 2.9802e-08],
        

In [6]:
import torch
import torch.nn as nn
import torch.optim as optim

class ControlledRoughPath:
    def __init__(self, driving_path):
        """
        Initialize a controlled rough path with a driving rough path.
        :param driving_path: Tensor representing the driving path (seq_len, d_features).
        """
        self.driving_path = driving_path

    def compute_gubinelli_derivative(self, controlled_path):
        """
        Compute the Gubinelli derivative of the controlled path with respect to the driving path.
        :param controlled_path: Tensor representing the controlled path (seq_len, d_features).
        :return: Gubinelli derivative as a tensor.
        """
        # Compute differences along the time axis
        delta_x = self.driving_path[1:, :] - self.driving_path[:-1, :]
        delta_y = controlled_path[1:, :] - controlled_path[:-1, :]

        # Ensure shape compatibility
        if delta_x.shape != delta_y.shape:
            raise ValueError(f"Shape mismatch: delta_x {delta_x.shape}, delta_y {delta_y.shape}")

        # Compute Gubinelli derivative
        gubinelli_derivative = delta_y / (delta_x + 1e-8)  # Avoid division by zero
        return gubinelli_derivative

    def control_decomposition(self, controlled_path):
        """
        Perform the decomposition Y(t) = Y(s) + G(s)(X(t) - X(s)) + R_{s,t}.
        :param controlled_path: Tensor representing the controlled path.
        :return: Tuple of Gubinelli derivative and remainder term.
        """
        g_derivative = self.compute_gubinelli_derivative(controlled_path)
        approx_y = controlled_path[:-1, :] + g_derivative * (self.driving_path[1:, :] - self.driving_path[:-1, :])
        remainder = controlled_path[1:, :] - approx_y
        return g_derivative, remainder

# Neural network for learning
class CRPNet(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(CRPNet, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Generate synthetic data
def generate_synthetic_data(seq_len=50):
    """
    Generate synthetic data for a driving path, controlled path, and target function.
    """
    driving_path = torch.cumsum(torch.randn(seq_len, 1), dim=0)  # Driving path
    controlled_path = torch.sin(driving_path)  # Controlled path
    target_function = torch.mean(controlled_path) + 0.1 * torch.sum(driving_path)  # Target
    return driving_path, controlled_path, target_function

# Main function
if __name__ == "__main__":
    # Generate data
    seq_len = 50
    n_samples = 500
    driving_paths, controlled_paths, targets = [], [], []
    for _ in range(n_samples):
        dp, cp, tgt = generate_synthetic_data(seq_len)
        driving_paths.append(dp)
        controlled_paths.append(cp)
        targets.append(tgt)
    driving_paths = torch.stack(driving_paths)  # Shape: (n_samples, seq_len, 1)
    controlled_paths = torch.stack(controlled_paths)  # Shape: (n_samples, seq_len, 1)
    targets = torch.tensor(targets).float()  # Shape: (n_samples,)

    # Prepare training data
    gubinelli_derivatives, remainders = [], []
    for i in range(n_samples):
        crp = ControlledRoughPath(driving_paths[i])
        gd, r = crp.control_decomposition(controlled_paths[i])
        gubinelli_derivatives.append(gd)
        remainders.append(r)
    gubinelli_derivatives = torch.stack(gubinelli_derivatives)  # Shape: (n_samples, seq_len-1, d_features)
    remainders = torch.stack(remainders)  # Shape: (n_samples, seq_len-1, d_features)

    # Flatten features
    features = torch.cat([gubinelli_derivatives, remainders], dim=2).view(n_samples, -1)  # Flatten per sample

    # Train-test split
    train_size = int(0.8 * n_samples)
    train_features, test_features = features[:train_size], features[train_size:]
    train_targets, test_targets = targets[:train_size], targets[train_size:]

    # Initialize the neural network
    input_dim = features.shape[1]
    hidden_dim = 64
    output_dim = 1
    model = CRPNet(input_dim, hidden_dim, output_dim)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Train the neural network
    n_epochs = 1000
    for epoch in range(n_epochs):
        model.train()
        optimizer.zero_grad()
        outputs = model(train_features)
        loss = criterion(outputs.squeeze(), train_targets)
        loss.backward()
        optimizer.step()

        # Evaluate on test data
        model.eval()
        with torch.no_grad():
            test_outputs = model(test_features)
            test_loss = criterion(test_outputs.squeeze(), test_targets).item()

        if (epoch + 1) % 10 == 0:
            print(f"Epoch [{epoch + 1}/{n_epochs}], Loss: {loss.item():.4f}, Test Loss: {test_loss:.4f}")

    print("Training complete.")

Epoch [10/1000], Loss: 428.6774, Test Loss: 466.4459
Epoch [20/1000], Loss: 426.3701, Test Loss: 465.2843
Epoch [30/1000], Loss: 423.5721, Test Loss: 463.9201
Epoch [40/1000], Loss: 420.1528, Test Loss: 462.2733
Epoch [50/1000], Loss: 416.1228, Test Loss: 460.4987
Epoch [60/1000], Loss: 411.5464, Test Loss: 458.5879
Epoch [70/1000], Loss: 406.5643, Test Loss: 456.6032
Epoch [80/1000], Loss: 401.3019, Test Loss: 454.6432
Epoch [90/1000], Loss: 395.8738, Test Loss: 452.7639
Epoch [100/1000], Loss: 390.3503, Test Loss: 450.8517
Epoch [110/1000], Loss: 384.7849, Test Loss: 448.9601
Epoch [120/1000], Loss: 379.1637, Test Loss: 447.1736
Epoch [130/1000], Loss: 373.4743, Test Loss: 445.4374
Epoch [140/1000], Loss: 367.7364, Test Loss: 444.1407
Epoch [150/1000], Loss: 361.9531, Test Loss: 443.1862
Epoch [160/1000], Loss: 356.1065, Test Loss: 442.4530
Epoch [170/1000], Loss: 350.1543, Test Loss: 441.5591
Epoch [180/1000], Loss: 344.0637, Test Loss: 440.5117
Epoch [190/1000], Loss: 337.8832, Tes

In [7]:
import torch
import numpy as np
from scipy.interpolate import interp1d

def generate_interpolated_test_data(train_driving_paths, seq_len=50, n_samples=100):
    """
    Generate test data by interpolating between existing training samples.
    :param train_driving_paths: Tensor of driving paths used for training (n_train_samples, seq_len, 1).
    :param seq_len: Length of the driving path sequences.
    :param n_samples: Number of interpolated test samples to generate.
    :return: Interpolated driving paths, controlled paths, and corresponding targets.
    """
    # Select two random training samples for interpolation
    idx1, idx2 = np.random.choice(len(train_driving_paths), size=2, replace=False)
    driving_path1 = train_driving_paths[idx1].squeeze().numpy()
    driving_path2 = train_driving_paths[idx2].squeeze().numpy()

    # Interpolate between the two paths
    interpolated_driving_paths = []
    for alpha in np.linspace(0, 1, n_samples):
        interp_path = (1 - alpha) * driving_path1 + alpha * driving_path2
        interpolated_driving_paths.append(interp_path)

    # Convert to tensor
    interpolated_driving_paths = torch.tensor(interpolated_driving_paths).unsqueeze(-1)  # Shape: (n_samples, seq_len, 1)

    # Generate controlled paths and targets
    interpolated_controlled_paths = torch.sin(interpolated_driving_paths)  # Controlled path
    interpolated_targets = torch.mean(interpolated_controlled_paths, dim=1) + 0.1 * torch.sum(interpolated_driving_paths, dim=1)  # Targets
    return interpolated_driving_paths, interpolated_controlled_paths, interpolated_targets

In [9]:
if __name__ == "__main__":
    # Generate data
    seq_len = 50
    n_samples = 500
    driving_paths, controlled_paths, targets = [], [], []
    for _ in range(n_samples):
        dp, cp, tgt = generate_synthetic_data(seq_len)
        driving_paths.append(dp)
        controlled_paths.append(cp)
        targets.append(tgt)
    driving_paths = torch.stack(driving_paths)  # Shape: (n_samples, seq_len, 1)
    controlled_paths = torch.stack(controlled_paths)  # Shape: (n_samples, seq_len, 1)
    targets = torch.tensor(targets).float()  # Shape: (n_samples,)

    # Prepare training data
    gubinelli_derivatives, remainders = [], []
    for i in range(n_samples):
        crp = ControlledRoughPath(driving_paths[i])
        gd, r = crp.control_decomposition(controlled_paths[i])
        gubinelli_derivatives.append(gd)
        remainders.append(r)
    gubinelli_derivatives = torch.stack(gubinelli_derivatives)  # Shape: (n_samples, seq_len-1, d_features)
    remainders = torch.stack(remainders)  # Shape: (n_samples, seq_len-1, d_features)

    # Flatten features
    features = torch.cat([gubinelli_derivatives, remainders], dim=2).view(n_samples, -1)  # Flatten per sample

    # Train-test split
    train_size = int(0.8 * n_samples)
    train_features, test_features = features[:train_size], features[train_size:]
    train_targets, test_targets = targets[:train_size], targets[train_size:]

    # Generate interpolated test data
    interpolated_driving_paths, interpolated_controlled_paths, interpolated_targets = generate_interpolated_test_data(driving_paths[:train_size])
    interpolated_features = []
    for i in range(len(interpolated_driving_paths)):
        crp = ControlledRoughPath(interpolated_driving_paths[i])
        gd, r = crp.control_decomposition(interpolated_controlled_paths[i])
        interpolated_features.append(torch.cat([gd, r], dim=1).view(-1))
    interpolated_features = torch.stack(interpolated_features)  # Shape: (n_interpolated_samples, feature_dim)

    # Initialize the neural network
    input_dim = features.shape[1]
    hidden_dim = 64
    output_dim = 1
    model = CRPNet(input_dim, hidden_dim, output_dim)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Train the neural network
    n_epochs = 1000
    for epoch in range(n_epochs):
        model.train()
        optimizer.zero_grad()
        outputs = model(train_features)
        loss = criterion(outputs.squeeze(), train_targets)
        loss.backward()
        optimizer.step()

        # Evaluate on test data
        model.eval()
        with torch.no_grad():
            test_outputs = model(test_features)
            interpolated_outputs = model(interpolated_features)
            test_loss = criterion(test_outputs.squeeze(), test_targets).item()
            interpolated_loss = criterion(interpolated_outputs.squeeze(), interpolated_targets).item()

        if (epoch + 1) % 10 == 0:
            print(f"Epoch [{epoch + 1}/{n_epochs}], Loss: {loss.item():.4f}, Test Loss: {test_loss:.4f}, Interpolated Loss: {interpolated_loss:.4f}")

    print("Training complete.")

Epoch [10/1000], Loss: 409.3355, Test Loss: 418.7178, Interpolated Loss: 144.3172
Epoch [20/1000], Loss: 407.8700, Test Loss: 418.6573, Interpolated Loss: 147.7181
Epoch [30/1000], Loss: 406.1148, Test Loss: 418.6833, Interpolated Loss: 152.0197
Epoch [40/1000], Loss: 403.9539, Test Loss: 418.8814, Interpolated Loss: 157.2930
Epoch [50/1000], Loss: 401.3334, Test Loss: 419.1865, Interpolated Loss: 163.6399
Epoch [60/1000], Loss: 398.2603, Test Loss: 419.6589, Interpolated Loss: 170.8922
Epoch [70/1000], Loss: 394.8114, Test Loss: 420.2373, Interpolated Loss: 179.2038
Epoch [80/1000], Loss: 391.0190, Test Loss: 421.1148, Interpolated Loss: 188.3498
Epoch [90/1000], Loss: 386.9601, Test Loss: 422.2291, Interpolated Loss: 198.3192
Epoch [100/1000], Loss: 382.6766, Test Loss: 423.5846, Interpolated Loss: 207.8900
Epoch [110/1000], Loss: 378.2228, Test Loss: 425.1456, Interpolated Loss: 217.3708
Epoch [120/1000], Loss: 373.5956, Test Loss: 426.9443, Interpolated Loss: 225.5290
Epoch [130/10