# Training and validation notebook

This notebook trains a small diffusion denoiser on synthetic degradation traces and saves a checkpoint (`checkpoints/best.pth`) and an optional safetensors copy.`

In [1]:
# Imports and device selection
import os
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt

device = 'mps' if torch.backends.mps.is_available() else ('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

Using device: mps


In [2]:
# Small synthetic generator (Paris-law) - extracted from code
import numpy as _np
class ParisLawDegradation:
    def __init__(self, length, dim, C=1e-12, m=3, delta_sigma=100, beta=1):
        self.length = int(length)
        self.dim = int(dim)
        self.C = float(C)
        self.m = float(m)
        self.delta_sigma = float(delta_sigma)
        self.beta = float(beta)
    def delta_K(self, a):
        a = _np.atleast_1d(_np.asarray(a))
        return self.delta_sigma * _np.sqrt(_np.pi * a) * self.beta
    def xdot(self, a):
        a = _np.atleast_1d(_np.asarray(a))
        return self.C * (self.delta_K(a) ** self.m)
    def generate_episode(self, x0):
        x0 = _np.atleast_1d(_np.asarray(x0))
        episode = _np.zeros((x0.shape[0], self.length + 1))
        episode[:, 0] = x0
        for i in range(self.length):
            episode[:, i + 1] = episode[:, i] + self.xdot(episode[:, i])
        return episode

In [3]:
# Build a small dataset for training (quick)
N = 1024
length = 64
gen = ParisLawDegradation(length=length-1, dim=1, C=1e-8)
x0 = _np.abs(_np.random.randn(N)) * 1e-3 + 1e-4
episodes = gen.generate_episode(x0)
# clean
episodes = episodes[~_np.isnan(episodes).any(axis=1)]
# to torch and normalize per-sample min/max
data = torch.tensor(episodes, dtype=torch.float32).to(device)
X = data[:, None, ...]  # (N, C, L)
mn = torch.min(X, 2)[0][..., None]
mx = torch.max(X, 2)[0][..., None]
den = (mx - mn)
den[den == 0] = 1.0
X = (X - mn) / den
print('X shape', X.shape)

X shape torch.Size([1024, 1, 64])


In [4]:
# Import models from src package
from degdiff.model_def import TimeSeriesDiffusionModel, DegDiffusion
# instantiate a small model
C = X.shape[1]
L = X.shape[2]
model = TimeSeriesDiffusionModel(channels=C, hidden_dim=32, num_blocks=1, T=50).to(device)
print(model)

ModuleNotFoundError: No module named 'degdiff'

In [None]:
# Minimal training loop with saving of a checkpoint at the end
bs = 16
opt = torch.optim.Adam(model.parameters(), lr=1e-4)
loss_fn = nn.MSELoss()
s0_len = L // 2
epochs = 2
for epoch in range(epochs):
    idx = torch.randint(0, X.shape[0], (bs,))
    x0 = X[idx].to(device)
    t = torch.randint(0, model.T, (bs,)).to(device)
    noise = torch.randn_like(x0).to(device)
    xt = model.q_sample(x0, t, noise)
    s0 = x0[:, :, :s0_len]
    ns1 = xt[:, :, s0_len:]
    noise1 = noise[:, :, s0_len:]
    noise0 = torch.zeros_like(s0).to(device)
    output_noise = torch.cat([noise0, noise1], dim=2)
    pred_noise = model(s0, ns1, t)
    loss = loss_fn(pred_noise, output_noise)
    opt.zero_grad()
    loss.backward()
    opt.step()
    if epoch % 1 == 0:
        print(f'Epoch {epoch}, Loss {loss.item():.6f}')

In [None]:
# Save checkpoint after training
import os
ckpt_dir = 'checkpoints'
os.makedirs(ckpt_dir, exist_ok=True)
ckpt_path = os.path.join(ckpt_dir, 'best.pth')
torch.save({'state_dict': model.state_dict(), 'epoch': epoch, 'metrics': {'loss': loss.item()}}, ckpt_path)
print(f'Saved checkpoint to {ckpt_path}')
# Optional safetensors copy
try:
    from safetensors.torch import save_file as safe_save
    safe_save({k: v.cpu().numpy() for k, v in model.state_dict().items()}, os.path.join(ckpt_dir, 'best.safetensors'))
    print('Saved safetensors copy')
except Exception:
    pass

Next steps:
- Verify `checkpoints/best.pth` is created.
- Locally test publishing: `PYTHONPATH=src MODEL_CHECKPOINT=checkpoints/best.pth HF_TOKEN=... python scripts/publish_to_hf.py`
- Add CI workflow to upload the artifact from training job and run the publisher in a second job.