# Do Nets Matter?
#### Author: JP Melo

In this file we explore how different shapes of nets affect performance and accuracy of the model. We will use the same dataset as before, but we will create different nets with different shapes and compare their performance.

### Imports

In [None]:
from derpinns.nn import *
from derpinns.utils import *
from derpinns.trainer import *
import torch
import kfac

## Parameters

In [None]:
# Fix seed for reproducibility
torch.manual_seed(0)
np.random.seed(0)

# Global parameters
assets = 2

sampler = "pseudo"
nn_shape = "64x3"
device = torch.device("cpu")
dtype = torch.float32

# Define option valuation params
params = OptionParameters(
    n_assets=assets,
    tau=1.0,
    sigma=np.array([0.2] * assets),
    rho=np.eye(assets) + 0.25 * (np.ones((assets, assets)) - np.eye(assets)),
    r=0.05,
    strike=100,
    payoff=payoff
)

# Define the number of samples to be used in each training stage

adam_batch_size = 500
adam_total_iter = 500
adam_boundary_samples = 20_000
adam_interior_samples = adam_boundary_samples*assets*2
adam_initial_samples = adam_boundary_samples*assets*2

lbfgs_boundary_samples = 1_000
lbfgs_interior_samples = lbfgs_boundary_samples*assets*2
lbfgs_initial_samples = lbfgs_boundary_samples*assets*2

## Training

In this case, we use the full training pipeline as the idea is to analize the expressability of the model and not the training process.

### Vanilla NN

In [None]:
model = build_nn(
    nn_shape=nn_shape,
    input_dim=assets,
    dtype=torch.float32
).apply(weights_init).to(device)
model.train()

dataset = SampledDataset(
    params, adam_interior_samples, adam_initial_samples, adam_boundary_samples, sampler, dtype, device, seed=0)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-2, amsgrad=True)
preconditioner = kfac.preconditioner.KFACPreconditioner(model)

closure = DimlessBS()\
    .with_dataset(dataset, loader_opts={'batch_size': adam_batch_size, "shuffle": True, "pin_memory": True})\
    .with_model(model)\
    .with_device(device)\
    .with_dtype(dtype)

trainer = PINNTrainer()\
    .with_optimizer(optimizer)\
    .with_device(device)\
    .with_dtype(dtype)\
    .with_training_step(closure)\
    .with_preconditioner(preconditioner)\
    .with_epochs(adam_total_iter)\

# first training stage
trainer.train()

# we create new samples for the second stage
dataset = SampledDataset(
    params, lbfgs_interior_samples, lbfgs_initial_samples, lbfgs_boundary_samples, sampler, dtype, device, seed=0)

optimizer = LBFGS(
    model.parameters(),
    max_eval=1_000,
    max_iter=5_000,
    line_search_fn="strong_wolfe",
)
batch_size = len(dataset)  # we use all samples

closure = closure.with_dataset(
    dataset, loader_opts={'batch_size': batch_size, "shuffle": False, "pin_memory": True})

trainer = trainer.with_optimizer(optimizer).with_training_step(closure)

# second training stage
trainer.train()

In [None]:
with_vanilla = trainer.closure.get_state()
plot_loss(with_vanilla, smooth=True, smooth_window=50)

vanilla_l2 = compare_with_mc(model, params, n_prices=200,
                             n_simulations=10_000, dtype=dtype, device=device, seed=42)['l2_rel_error']
print("L2 Error: ", vanilla_l2*100)

## With SPINN

In [None]:
# Build the net to be used
model = SPINN(n_layers=3, input_dim=assets+1, hidden_dim=10,
output_dim=10, dtype=dtype).apply(weights_init).to(device)

dataset = SampledDataset(
    params, adam_interior_samples, adam_initial_samples, adam_boundary_samples, sampler, dtype, device, seed=0)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, amsgrad=True)
preconditioner = kfac.preconditioner.KFACPreconditioner(model)

closure = DimlessBS()\
    .with_dataset(dataset, loader_opts={'batch_size': adam_batch_size, "shuffle": True, "pin_memory": True})\
    .with_model(model)\
    .with_device(device)\
    .with_dtype(dtype)

trainer = PINNTrainer()\
    .with_optimizer(optimizer)\
    .with_device(device)\
    .with_dtype(dtype)\
    .with_training_step(closure)\
    .with_preconditioner(preconditioner)\
    .with_epochs(adam_total_iter)\

# first training stage
trainer.train()

# we create new samples for the second stage
dataset = SampledDataset(
    params, lbfgs_interior_samples, lbfgs_initial_samples, lbfgs_boundary_samples, sampler, dtype, device, seed=0)

optimizer = LBFGS(
    model.parameters(),
    max_eval=1_000,
    max_iter=5_000,
    line_search_fn="strong_wolfe",
)
batch_size = len(dataset)  # we use all samples

closure = closure.with_dataset(
    dataset, loader_opts={'batch_size': batch_size, "shuffle": False, "pin_memory": True})

trainer = trainer.with_optimizer(optimizer).with_training_step(closure)

# second training stage
trainer.train()

In [None]:
with_spinn = trainer.closure.get_state()
plot_loss(with_spinn, smooth=True, smooth_window=50)

spinn_l2 = compare_with_mc(model, params, n_prices=200,
                           n_simulations=10_000, dtype=dtype, device=device, seed=42)['l2_rel_error']
print("L2 Error: ", spinn_l2*100)

## With NN+Anzats

In [None]:
model = NNAnzats(n_layers=3, input_dim=assets+1, hidden_dim=64,
                 output_dim=1, dtype=dtype).apply(weights_init).to(device)

dataset = SampledDataset(
    params, adam_interior_samples, adam_initial_samples, adam_boundary_samples, sampler, dtype, device, seed=0)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-2, amsgrad=True)
preconditioner = kfac.preconditioner.KFACPreconditioner(model)

closure = DimlessBS()\
    .with_dataset(dataset, loader_opts={'batch_size': adam_batch_size, "shuffle": True, "pin_memory": True})\
    .with_model(model)\
    .with_device(device)\
    .with_dtype(dtype)

trainer = PINNTrainer()\
    .with_optimizer(optimizer)\
    .with_device(device)\
    .with_dtype(dtype)\
    .with_training_step(closure)\
    .with_preconditioner(preconditioner)\
    .with_epochs(adam_total_iter)\

# first training stage
trainer.train()

In [None]:
with_anzats = trainer.closure.get_state()
plot_loss(with_anzats, smooth=True, smooth_window=50)

anzats_l2 = compare_with_mc(model, params, n_prices=200,
                            n_simulations=10_000, dtype=dtype, device=device, seed=42)['l2_rel_error']
print("L2 Error: ", anzats_l2*100)

6.2328877/3.9504097

### Compare both runs

In [None]:

def _moving_average(arr, window):
    """Simple moving average that keeps the original length."""
    if window <= 1:
        return np.asarray(arr)
    cumsum = np.cumsum(np.insert(arr, 0, 0))
    smoothed = (cumsum[window:] - cumsum[:-window]) / float(window)
    # pad the left side so lengths match
    left_pad = np.full(window - 1, smoothed[0])
    return np.concatenate([left_pad, smoothed])

def compare_error_histories(
    runs,
    labels=None,
    backend="plotly",
    fig_size=(900, 500),
    smooth=True,
    smooth_window=50,
):
    """
    Compare relative and max error histories from multiple runs.

    Parameters
    ----------
    runs : list of dict – each dict must contain keys
           'l2_rel_err', 'max_err'
    labels : list of str – legend labels, default "Run 1", "Run 2", ...
    """

    n_runs = len(runs)
    assert n_runs > 0, "runs list cannot be empty"
    if labels is None:
        labels = [f"Run {i+1}" for i in range(n_runs)]
    assert len(labels) == n_runs, "`labels` length must match `runs` length"

    colors = {"rel_err": "#d62728", "max_err": "#2ca02c"}
    dashes = ["solid", "dash", "dot", "dashdot", "longdash", "longdashdot"]

    def prep(d):
        rel = np.asarray(d["l2_rel_err"])
        mx = np.asarray(d["max_err"])
        if smooth:
            rel, mx = (_moving_average(x, smooth_window) for x in (rel, mx))
        return rel, mx

    processed = [prep(r) for r in runs]
    x = np.arange(len(processed[0][0]))  # assume equal length

    # --- Plotly backend ---
    if backend.lower() == "plotly":
        fig = make_subplots(
            rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.07,
            subplot_titles=("L2 Relative Error", "Maximum Error")
        )

        def add(row, y_values, color, dash, label, showlegend):
            fig.add_trace(
                go.Scatter(
                    x=x, y=y_values, mode="lines",
                    name=label,
                    line=dict(color=color, dash=dash)
                ),
                row=row, col=1,
            )

        for run_idx, (rel, mx) in enumerate(processed):
            dash = dashes[run_idx % len(dashes)]
            label = labels[run_idx]
            show = (run_idx == 0)
            add(1, rel, colors["rel_err"], dash, label, show)
            add(2, mx, colors["max_err"], dash, label, False)

        fig.update_yaxes(type="log")
        fig.update_layout(
            height=fig_size[1], width=fig_size[0],
            title_text="Error Comparison",
            legend_title="Run"
        )
        fig.show()

    # --- Matplotlib backend ---
    else:
        _, axes = plt.subplots(2, 1,
                               figsize=(fig_size[0] / 100, fig_size[1] / 100),
                               sharex=True)

        titles = ["L2 Relative Error", "Maximum Error"]

        for run_idx, (rel, mx) in enumerate(processed):
            dash = dashes[run_idx % len(dashes)]
            label = labels[run_idx]
            axes[0].plot(x, rel, label=label,
                         color=colors["rel_err"], linestyle=dash)
            axes[1].plot(x, mx, label=label,
                         color=colors["max_err"], linestyle=dash)

        for ax, title in zip(axes, titles):
            ax.set_yscale("log")
            ax.set_title(title, fontsize=10)

        axes[-1].set_xlabel("Epoch")
        axes[0].set_ylabel("Relative Error (log)")
        axes[1].set_ylabel("Max Error (log)")
        axes[0].legend(loc="upper right")
        plt.tight_layout()
        plt.show()

In [None]:
compare_error_histories(
    [with_vanilla, with_spinn, with_anzats],
    labels=["Vanilla", "SPINN", "NN+Anzatz"],
    smooth=True,
    smooth_window=10,
)