# Deterministic Model

## Initialization

In [None]:
from torch.utils.data import DataLoader, TensorDataset
import torch.autograd as autograd
import matplotlib.pyplot as plt
import torch.optim as optim
import torch.nn as nn
import numpy as np
import shutil
import torch
import os

d_of_x = 1
N_rec = 40
N_mem = 1 # memory length
d_input = N_mem * d_of_x # number of input nodes
d_output = d_of_x # number of output nodes
d_outputRNN = N_rec * d_output # total nubmer of output nodes (including recurrence)

n_hidden = 3 # number of hidden layers
n_nodes = 20 # number of nodes per hidden layer
n_epochs = 100_000 #50_000
learning_rate = 1e-4
N_seeds = 1 # number of seeds for ensemble learning
batch_size = 256 # batch size
verbose = False

system = 'stochastic'
model_upper_dir = f'./models/final{system}_{n_epochs}epochs/'

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

if torch.cuda.is_available():
   device = torch.device("cuda")
elif torch.backends.mps.is_available():
   device = torch.device("mps")
else:
   device = torch.device("cpu")

print(f"Using {device} device.")

In [None]:
# stochastic data
data = np.load("./Data/training_data.npz")

inputs_train = data["inputs_train"]
outputs_train = data["outputs_train"]
full_train = data["full_train"]
S = data["S_full"]
t_grid = data["t_grid_all"]

inputs_train_torch = torch.from_numpy(inputs_train).float()
outputs_train_torch = torch.from_numpy(outputs_train).float()
full_train_torch = torch.from_numpy(full_train).float()

inputs_train_torch = inputs_train_torch.to(device)
outputs_train_torch = outputs_train_torch.to(device)
full_train_torch = full_train_torch.to(device)

## Deterministic Model and Training



In [None]:
class DeterministicSubMap(nn.Module):
    def __init__(self, d_of_x = 1, N_mem = 1, N_rec = 40, n_hidden = 3, n_nodes = 10):
        super().__init__()

        self.recurrent_steps = N_rec
        input_dim = N_mem * d_of_x
        output_dim = d_of_x

        self.ReLU = nn.ReLU()
        self.hidden_layers = nn.ModuleList()
        self.hidden_layers.append(nn.Linear(input_dim, n_nodes))        # first layer (input dim -> n_nodes)
        for _ in range(n_hidden - 1):
            self.hidden_layers.append(nn.Linear(n_nodes, n_nodes))      # remaining layers
        self.last_linear = nn.Linear(n_nodes, output_dim)               # last layer

        self.slice_last_two = lambda x: x[:, -d_of_x:]
        self.slice_layer_Two = lambda x: x[:, d_of_x:]

    # a single forward without skip connection added to residual
    def forward_single(self, inputs):
        residual = inputs
        for layer in self.hidden_layers:
            residual = self.ReLU(layer(residual))
        residual = self.last_linear(residual)
        return residual

    # forward with n_reccurence = 1 for prediction
    def forward(self, inputs):
        x_last = self.slice_last_two(inputs)
        residual_output = self.forward_single(inputs)
        x_add_res = residual_output + x_last
        return x_add_res

    # forward with reccurence for training
    def forward_recurrence(self, inputs):
        # collect outputs at each recurrent step
        outputs = []  # z = []

        x = inputs
        x_minus_first_two = self.slice_layer_Two(x)
        x_last_two = self.slice_last_two(x)

        residual = self.forward_single(x)
        x_add_res = residual + x_last_two

        outputs.append(x_add_res)

        for _ in range(self.recurrent_steps - 1):
            x = torch.cat([x_minus_first_two, x_add_res], dim=1)
            x_minus_first_two = self.slice_layer_Two(x)
            x_last_two = self.slice_last_two(x)

            residual = self.forward_single(x)
            x_add_res = residual + x_last_two

            outputs.append(x_add_res)
        return torch.cat(outputs, dim=1)


In [None]:
for seed in range(N_seeds):
    print(f"training model {seed + 1}")

    model_dir = model_upper_dir + f'model_seed_{seed}/'
    model_weights_path = os.path.join(model_dir, 'best_weights.pth')
    plot_path = os.path.join(model_dir, 'loss_semilog.png')
    # print("model_dir: ", model_dir)

    if os.path.exists(model_dir):
        shutil.rmtree(model_dir)
    os.makedirs(model_dir, exist_ok=True)

    train_dataset = TensorDataset(inputs_train_torch, outputs_train_torch)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    torch.manual_seed(seed)

    model = DeterministicSubMap(d_of_x = d_of_x, N_mem = N_mem, N_rec = N_rec, n_hidden = n_hidden, n_nodes = n_nodes)
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.MSELoss()

    best_loss = float("inf")
    history_loss = []

    print("Starting training")
    for epoch in range(n_epochs):
        model.train()

        running_loss = 0.0

        for i, data in enumerate(train_loader, 0):
            inputs, labels = data

            # reset gradients
            optimizer.zero_grad()

            outputs = model.forward_recurrence(inputs)
            loss = criterion(outputs, labels)

            # compute gradients
            loss.backward()

            # update weights
            optimizer.step()

            running_loss += loss.item()

        avg_epoch_loss = running_loss / len(train_loader)
        history_loss.append(avg_epoch_loss)

        # Keras ModelCheckpoint (monitor="loss", save_best_only=True)
        if avg_epoch_loss < best_loss:
            best_loss = avg_epoch_loss
            # Save ONLY best weights
            torch.save(model.state_dict(), model_weights_path)
            saved_best = True
        else:
            saved_best = False

        if verbose:
            if (epoch + 1) % 10 == 0 or epoch == n_epochs - 1:
                print(f'Epoch {epoch + 1:4d}/{n_epochs}, Loss: {avg_epoch_loss:.6f} {"(Saved Best)" if saved_best else ""}')
        if not verbose and (epoch + 1) % 100 == 0:
            print(f"Epoch {epoch + 1:5d} completed")

    plt.semilogy(history_loss)
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train'], loc='upper left')
    plt.savefig(model_dir + 'loss_semilog.png', bbox_inches='tight')
    plt.show()

    print()

# Stochastic GAN

## Initialization

In [None]:
L = N_rec
# x at t0
x0_all = inputs_train_torch              # (N, 1)
# x_t1 through x_tL
x_all_1_to_L = outputs_train_torch        # (N, L)
# x_0 to x_{L-1}
x_all_0_to_Lminus1 = full_train_torch[:, :-1]      # (N, L)
# Increments y_n = x_n - x_{n-1}, for all n = 1 to L
y_all = x_all_1_to_L - x_all_0_to_Lminus1          # (N, L)
# GAN training data: concatenated [x0, y1..yL]
gan_sequences = torch.cat([x0_all, y_all], dim=1)  # shape (N, 1 + L)

gan_dataset = TensorDataset(gan_sequences.to(device))
gan_loader  = DataLoader(gan_dataset, batch_size=batch_size, shuffle=True)

det_model = DeterministicSubMap(d_of_x=d_of_x, N_mem=N_mem, N_rec=N_rec,
                                n_hidden=n_hidden, n_nodes=n_nodes).to(device)

seed = 0

det_weights_path = model_upper_dir + f'model_seed_{seed}/best_weights.pth'
det_model.load_state_dict(torch.load(det_weights_path, map_location=device))

for p in det_model.parameters():
    p.requires_grad = False

det_model.eval()

## Generator and Discriminator

In [None]:
noise_dim = 4      # Z dimension
gen_hidden = 3
gen_nodes = 20

class StochasticSubMap(nn.Module):
    def __init__(self, d_of_x=1, noise_dim=4, n_hidden=3, n_nodes=20):
        super().__init__()
        in_dim  = d_of_x + noise_dim
        out_dim = d_of_x

        layers = []
        layers.append(nn.Linear(in_dim, n_nodes))
        for _ in range(n_hidden - 1):
            layers.append(nn.Linear(n_nodes, n_nodes))
        self.hidden_layers = nn.ModuleList(layers)
        self.last_linear   = nn.Linear(n_nodes, out_dim)
        self.activation    = nn.ReLU()

    def forward(self, x, z):
        # x: (B, d_of_x), z: (B, noise_dim)
        inp = torch.cat([x, z], dim=1)          # (B, d_of_x + noise_dim)
        residual   = inp
        for layer in self.hidden_layers:
            residual = self.activation(layer(residual))
        residual = self.last_linear(residual)              # (B, d_of_x)
        return residual


In [None]:
critic_hidden = 3
critic_nodes  = 20

class Critic(nn.Module):
    def __init__(self, d_of_x=1, L=40, n_hidden=3, n_nodes=64):
        super().__init__()
        in_dim = d_of_x + L * d_of_x

        layers = []
        layers.append(nn.Linear(in_dim, n_nodes))
        for _ in range(n_hidden - 1):
            layers.append(nn.Linear(n_nodes, n_nodes))
        self.hidden_layers = nn.ModuleList(layers)
        self.last_linear   = nn.Linear(n_nodes, 1)
        self.activation    = nn.ReLU()

    def forward(self, seq):
        # seq: (B, 1 + L)
        out = seq
        for layer in self.hidden_layers:
            out = self.activation(layer(out))
        out = self.last_linear(out)              # (B, 1)
        return out


In [None]:
G_stoch = StochasticSubMap(d_of_x=d_of_x, noise_dim=noise_dim, n_hidden=gen_hidden, n_nodes=gen_nodes).to(device)
C_critic = Critic(d_of_x=d_of_x, L=L, n_hidden=critic_hidden, n_nodes=critic_nodes).to(device)

lambda_gp = 10.0
n_critic  = 5     # n_ct
n_epochs_gan = n_epochs # 100_000

opt_C = optim.Adam(C_critic.parameters(), lr=5e-5, betas=(0.5, 0.999))
opt_G = optim.Adam(G_stoch.parameters(), lr=5e-5, betas=(0.5, 0.999))


In [None]:
def generate_fake_sequence(det_model, G_stoch, x0_batch, L, noise_dim):
    B = x0_batch.size(0)
    x_b = x0_batch  # (B, 1)
    y_list = []

    for j in range(L):
        z = torch.randn(B, noise_dim, device=x0_batch.device)

        with torch.no_grad():           # fix det_model
            det_next = det_model(x_b)   # (B, 1)

        det_inc  = det_next - x_b       # D(x) - x
        sto_inc  = G_stoch(x_b, z)      # S(x, z)
        y_b      = det_inc + sto_inc    # total increment
        x_b      = x_b + y_b            # update state

        y_list.append(y_b)

    y_fake = torch.cat(y_list, dim=1)          # (B, L)
    seq_fake = torch.cat([x0_batch, y_fake], dim=1)  # (B, L+1)
    return seq_fake


In [None]:
n_save_gan_checkpoints = n_epochs_gan/100
step = 0
for epoch in range(n_epochs_gan):
    for (real_batch,) in gan_loader:        # each element is (real_sequences,)
        real_batch = real_batch.to(device)  # (B, 1+L)
        B = real_batch.size(0)

        # Split into x0 and y1 till yL
        x0_batch = real_batch[:, :d_of_x]   # (B,1)

        # Update Critic
        opt_C.zero_grad()

        # Generate fake sequences with current generator
        fake_batch = generate_fake_sequence(det_model, G_stoch, x0_batch, L, noise_dim)

        # Critic scores
        D_real = C_critic(real_batch)      # (B,1)
        D_fake = C_critic(fake_batch)      # (B,1)

        # Gradient penalty: sample epslon in [0,1]
        eps = torch.rand(B, 1, device=device)
        eps = eps.expand_as(real_batch)    # (B, 1+L)

        x_hat = eps * real_batch + (1 - eps) * fake_batch
        x_hat.requires_grad_(True)

        D_hat = C_critic(x_hat)

        grad_outputs = torch.ones_like(D_hat)
        gradients = autograd.grad(
            outputs=D_hat,
            inputs=x_hat,
            grad_outputs=grad_outputs,
            create_graph=True,
            retain_graph=True,
            only_inputs=True
        )[0]                               # (B, 1+L)

        grad_norm = gradients.view(B, -1).norm(2, dim=1)   # gradient
        gp = ((grad_norm - 1.0) ** 2).mean()               # gradient penalty term

        # WGAN critic loss
        loss_C = D_fake.mean() - D_real.mean() + lambda_gp * gp
        loss_C.backward()
        opt_C.step()

        # Update Generator every n_critic steps
        if step % n_critic == 0:
            opt_G.zero_grad()

            fake_batch = generate_fake_sequence(det_model, G_stoch, x0_batch, L, noise_dim)
            D_fake_for_G = C_critic(fake_batch)

            # maximize critic's output = minimize -E[D(fake)]
            loss_G = -D_fake_for_G.mean()
            loss_G.backward()
            opt_G.step()

        step += 1

    # epoch log
    if (epoch + 1) % 20 == 0:
        if step % n_critic == 0:
           print(f"[Epoch {epoch+1}] C_loss: {loss_C.item():.4f}, G_loss: {loss_G.item():.4f}")
        else:
           print(f"[Epoch {epoch+1}] C_loss: {loss_C.item():.4f}")

    if (epoch + 1) % n_save_gan_checkpoints == 0:
        save_folder = os.path.join(model_upper_dir, f"{n_epochs_gan}gan_epochs/checkpoints")
        save_path = os.path.join(save_folder, f"gan_checkpoint_epoch_{epoch+1}.pth")
        os.makedirs(save_folder, exist_ok=True)
        torch.save({
            'generator': G_stoch.state_dict(),
            'critic': C_critic.state_dict(),
            'epoch': epoch
        }, save_path)

In [None]:
save_folder = os.path.join(model_upper_dir, f"{n_epochs_gan}gan_epochs")
save_path = os.path.join(save_folder, "gan_stochastic_submap.pth")
os.makedirs(save_folder, exist_ok=True)
torch.save(G_stoch.state_dict(), save_path)

In [None]:
def sFML_step(det_model, G_stoch, x, noise_dim, device):
    B = x.size(0)
    z = torch.randn(B, noise_dim, device=device)
    with torch.no_grad():
        det_next = det_model(x)
    det_inc = det_next - x
    sto_inc = G_stoch(x, z)
    y = det_inc + sto_inc
    return x + y


In [None]:
import numpy as np
import torch
import matplotlib.pyplot as plt

def sample_stochastic_trajectory(det_model, G_stoch, x0, n_steps, noise_dim, device):
    det_model.eval()
    G_stoch.eval()

    # Make x a batch of size 1, shape (1, d_of_x)
    if isinstance(x0, (float, int)):
        x = torch.tensor([[x0]], dtype=torch.float32, device=device)
    else:
        x0_arr = np.array(x0, dtype=np.float32).reshape(1, -1)
        x = torch.from_numpy(x0_arr).to(device)

    states = [x.detach().cpu().numpy().copy()]  # store x_0

    with torch.no_grad():
        for _ in range(n_steps):
            B = x.size(0)

            # Sample noise
            z = torch.randn(B, noise_dim, device=device)

            det_next = det_model(x)                 # (B, d_of_x)
            det_inc  = det_next - x                 # D_delta(x) - x
            sto_inc  = G_stoch(x, z)                # (B, d_of_x)
            y = det_inc + sto_inc                   # (B, d_of_x)
            x = x + y                               # (B, d_of_x)

            states.append(x.detach().cpu().numpy().copy())

    trajectory = np.concatenate(states, axis=0)          # (n_steps+1, d_of_x)
    t = np.arange(n_steps + 1)

    return t, trajectory


In [None]:
n_steps   = N_rec
noise_dim = noise_dim
x0_value  = 0.5

t, traj = sample_stochastic_trajectory(det_model, G_stoch,
                                       x0=x0_value,
                                       n_steps=n_steps,
                                       noise_dim=noise_dim,
                                       device=device)

plt.figure(figsize=(6, 4))
plt.plot(t, traj[:, 0])
plt.xlabel("Step index n")
plt.ylabel("x_n")
plt.title("Sample stochastic trajectory from sFML")
plt.grid(True)
plt.show()


In [None]:
x0_value = 1
runs = 100
n, t = 100, 1
nt = n*t
fake_generation = np.zeros((nt + 1, runs))
plot_dir = model_upper_dir

plt.figure(figsize=(6, 4))
for i in range(runs):
    t, generation = sample_stochastic_trajectory(det_model, G_stoch,
                                           x0=x0_value,
                                           n_steps=n,
                                           noise_dim=noise_dim,
                                           device=device)
    fake_generation[:, i] = generation[:, 0]

plt.plot(t_grid, fake_generation[:, :15], alpha=0.7)

plt.xlabel("Step index n")
plt.ylabel("x_n")
plt.title("Multiple stochastic trajectories")
# plt.grid(True)
# plt.savefig(f"{plot_dir}/stochastic_trajectories.png", dpi=300, bbox_inches='tight')
# plt.savefig(f"./stochastic_trajectories.png", dpi=300, bbox_inches='tight')
plt.show()


In [None]:
fake_mean = fake_generation.mean(axis=1)
fake_std = fake_generation.std(axis=1)

true_mean = S.mean(axis=1)
true_std  = S.std(axis=1)

plt.plot(t_grid, true_mean, color="blue", label="Ground Truth Mean", linewidth=2)
plt.plot(t_grid, fake_mean, color="red", label="Fake Mean", linewidth=2)
plt.fill_between(t_grid[:, 0], true_mean - true_std, true_mean + true_std, alpha=0.25, color="lightblue", label="Ground Truth Std")
plt.fill_between(t_grid[:, 0], fake_mean - fake_std, fake_mean + fake_std, alpha=0.25, color="green", label="Fake Std")

plt.xlabel("T")
plt.ylabel("X_t")
plt.title("Mean and Standard Deviation of Geometric Brownian Motion")
plt.legend()
plt.grid(False)
# plt.savefig(f"predictionvsactualstats.png", dpi=300, bbox_inches='tight')
plt.show()

## Finding the best model combination

In [None]:
seed = 0
modeldet_weights_path = model_upper_dir + f'model_seed_{seed}/best_weights.pth'
print(modeldet_weights_path)

modelgan_weights_path = model_upper_dir + f'100000gan_epochs/checkpoints/'
print(modelgan_weights_path)
# for j in range(1_000, 101_000, 1_000):
for ep in [79]:
    # if j < 5000:
        j = ep*1000
        currentgan_path = modelgan_weights_path + "gan_checkpoint_epoch_" + str(j) + ".pth"
        print(currentgan_path)
        checkpoint = torch.load(currentgan_path, map_location=device)
        G_stoch.load_state_dict(checkpoint['generator'])

        x0_value = 1
        runs = 100
        n, t = 100, 1
        nt = n*t
        fake_generation = np.zeros((nt + 1, runs))
        plot_dir = model_upper_dir

        plt.figure(figsize=(6, 4))
        for i in range(runs):
            t, generation = sample_stochastic_trajectory(det_model, G_stoch,
                                                   x0=x0_value,
                                                   n_steps=n,
                                                   noise_dim=noise_dim,
                                                   device=device)
            fake_generation[:, i] = generation[:, 0]


        fake_mean = fake_generation.mean(axis=1)
        fake_std = fake_generation.std(axis=1)

        plt.plot(t_grid, true_mean, color="blue", label="Ground Truth Mean", linewidth=2)
        plt.plot(t_grid, fake_mean, color="red", label="Fake Mean", linewidth=2)
        plt.fill_between(t_grid[:, 0], true_mean - true_std, true_mean + true_std, alpha=0.25, color="lightblue", label="Ground Truth Std")
        plt.fill_between(t_grid[:, 0], fake_mean - fake_std, fake_mean + fake_std, alpha=0.25, color="green", label="Fake Std")

        plt.xlabel("T")
        plt.ylabel("X_t")
        plt.title(f"Mean and Standard Deviation of GBM {j}")
        plt.legend()
        plt.grid(False)
        # plt.savefig(f"predictionvsactualstats.png", dpi=300, bbox_inches='tight')
        plt.show()

# Drift Diffusion plot

In [None]:
mu = 2
x_axis = np.linspace(0, 16, 100)
true_a = x_axis * mu
delta = 0.01

a_hat = []
for x_i in x_axis:
    current = torch.tensor(x_i, device=device, dtype=torch.float32)
    x0_batch = current.expand(10000, 1)
    x1_batch = sFML_step(det_model, G_stoch, x0_batch, noise_dim=noise_dim, device=device)

    increase = x1_batch - x0_batch
    drift = (increase.mean()/delta).item()
    # drift = increase.mean().items()
    a_hat.append(drift)

a_hat = np.array(a_hat)

plt.plot(x_axis, true_a, color="blue", label="Reference", linewidth=2)
plt.plot(x_axis, a_hat, color="red", label="Predicted", linewidth=2)
plt.xlabel("x")
plt.ylabel("b(x)")
plt.title("drift term a(x) = mu * x")
plt.legend()
plt.grid(False)
plt.show()

In [None]:
sigma = 1
delta = 0.01

b_hat = []

for x_i in x_axis:
    # batch of 5000 samples of x_i
    current = torch.tensor(x_i, device=device, dtype=torch.float32)
    x0_batch = current.expand(5000, 1)

    # one stochastic step
    x1_batch = sFML_step(det_model, G_stoch, x0_batch,
                         noise_dim=noise_dim, device=device)

    # increments
    increase = x1_batch - x0_batch    # (5000, 1)

    # standard deviation of increments
    std_inc = increase.std().item()   # torch â†’ float

    # diffusion estimator b(x) = std_inc / sqrt(delta)
    b_val = std_inc / np.sqrt(delta)

    b_hat.append(b_val)

b_hat = np.array(b_hat)

# true diffusion
true_b = x_axis * sigma

plt.plot(x_axis, true_b,  color="blue",  label="Reference", linewidth=2)
plt.plot(x_axis, b_hat,   color="red",   label="Predicted", linewidth=2)
plt.xlabel("x")
plt.ylabel("b(x)")
plt.title("diffusion term b(x) = sigma * x")
plt.legend()
plt.grid(alpha=0.3)
plt.show()
