In [1]:
import argparse
import time
from pathlib import Path
from typing import Tuple, List

import numpy as np
import h5py
import torch
import torch.nn as nn


def set_seed(seed: int = 1234) -> None:
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

In [2]:
# nominal values

adam_epochs = 45000
adam_lr = 1e-3
lbfgs_epochs = 50000
L = 7.25*1e-4
RL = 0.314
C = 1.645 *1e-4
RC = 2.01 * 1e-1
Rdson = 0.221
Rload1 = 3.1
Rload2 = 10.2
Rload3 = 6.1
Vin = 48
VF = 1.0


## Let's Test The Accuracy of dt of Runge-Kutta

In [3]:
def predict_next_state(
    i0: float,
    v0: float,
    D: float,
    dt: float,
    L: float,
    RL: float,
    C: float,
    RC: float,
    Rdson: float,
    Rload: float,
    Vin: float,
    Vf: float,
    n_substeps: int = 100,
) -> Tuple[float, float]:
    """Predict next state using RK4 with optional sub-stepping."""
    h = dt / n_substeps  # smaller time step
    i, v = i0, v0

    def f(i, v):
        di = (-(RL + Rdson * D) * i - v + D * Vin - (1 - D) * Vf) / L
        dv = (Rload * i - v + C * RC * Rload * di) / (C * (RC + Rload))
        return di, dv

    for _ in range(n_substeps):
        k1_i, k1_v = f(i, v)
        k2_i, k2_v = f(i + 0.5 * h * k1_i, v + 0.5 * h * k1_v)
        k3_i, k3_v = f(i + 0.5 * h * k2_i, v + 0.5 * h * k2_v)
        k4_i, k4_v = f(i + h * k3_i, v + h * k3_v)

        i += (h / 6) * (k1_i + 2 * k2_i + 2 * k3_i + k4_i)
        v += (h / 6) * (k1_v + 2 * k2_v + 2 * k3_v + k4_v)

    return i, v


import numpy as np
from pathlib import Path
from h5_auxiliaries.datatransients import TransientData

# Load data
db_dir = Path(r"C:\Users\JC28LS\OneDrive - Aalborg Universitet\Desktop\Work\Databases")
db_name = "buck_converter_Shuai_processed.h5"
tr1 = TransientData.from_h5(db_dir / db_name, "ideal", 1)

# Index to test
idx = 51
i0 = tr1.i[idx]
v0 = tr1.v[idx]
i1 = tr1.i[idx + 1]
v1 = tr1.v[idx + 1]
D = tr1.D[idx]
dt = tr1.dt[idx]

# Nominal parameters
i1_pred_ssteps, v1_pred_ssteps = predict_next_state(
    i0=i0,
    v0=v0,
    D=D,
    dt=dt,
    L=7.25e-4,
    RL=0.314,
    C=1.645e-4,
    RC=0.201,
    Rdson=0.221,
    Rload=3.1,
    Vin=48.0,
    Vf=1.0,
    n_substeps=1000,
)

# Predict without sub-steps
i1_pred, v1_pred = predict_next_state(
    i0=i0,
    v0=v0,
    D=D,
    dt=dt,
    L=L,
    RL=RL,
    C=C,
    RC=RC,
    Rdson=Rdson,
    Rload=Rload1,  # Using Rload1 as an example
    Vin=Vin,
    Vf=VF,
    n_substeps=1,  # No sub-steps
)

print("Without sub-steps:")
print(f"i1_pred = {i1_pred:.6f}, i1 = {i1:.6f}, Delta_i = {i1_pred - i1:.6e}")
print(f"v1_pred = {v1_pred:.6f}, v1 = {v1:.6f}, Delta_v = {v1_pred - v1:.6e}")



print("With 1000 sub-steps:")
print(f"i1_pred = {i1_pred_ssteps:.6f}, i1 = {i1:.6f}, Delta_i = {i1_pred_ssteps - i1:.6e}")
print(f"v1_pred = {v1_pred_ssteps:.6f}, v1 = {v1:.6f}, Delta_v = {v1_pred_ssteps - v1:.6e}")

Without sub-steps:
i1_pred = 8.805077, i1 = 8.805084, Delta_i = -6.498587e-06
v1_pred = 26.041748, v1 = 26.041720, Delta_v = 2.817933e-05
With 1000 sub-steps:
i1_pred = 8.805077, i1 = 8.805084, Delta_i = -6.574731e-06
v1_pred = 26.041749, v1 = 26.041720, Delta_v = 2.886677e-05


**There is virtually no difference in the prediction error!**

In [4]:
## Create the datasets
from h5_auxiliaries.datatransients import TransientData

db_dir = Path(r"C:\Users\JC28LS\OneDrive - Aalborg Universitet\Desktop\Work\Databases")
db_name = "buck_converter_Shuai_processed.h5"

tr1 = TransientData.from_h5(db_dir / db_name, "ideal", 1)

i_n = tr1.i[:-1]
v_n = tr1.v[:-1]
i_np1 = tr1.i[1:]
v_np1 = tr1.v[1:]
D = tr1.D[:-1]
dt = tr1.dt[:-1]


X = np.stack([i_n, v_n, i_np1, v_np1, D, dt], axis=1)

class Normalizer:
    """Normalizer for the input data."""
    def __init__(self, X: np.ndarray):
        i_mean, i_std, v_mean, v_std, dt_mean, dt_std = self._get_means(X)
        # add dummy values for D
        self.mean = np.array([i_mean, v_mean, i_mean, v_mean, 0.0, dt_mean])
        self.std = np.array([i_std, v_std, i_std, v_std, 1, dt_std])
        
    
    def _get_means(self, X: np.ndarray) -> Tuple[float, float, float, float]:
        # i_all is i_n with concatenated the LAST VALUE of i_np1
        i_n_last = X[:, 0]  # i_n
        i_np1_last = X[-1, 2]
        i_all = np.concatenate((i_n_last, [i_np1_last]), axis=0)
        i_mean = i_all.mean()
        i_std = i_all.std()
        
        # v_all is v_n with concatenated the LAST VALUE of v_np1
        v_n_last = X[:, 1]
        v_np1_last = X[-1, 3]
        v_all = np.concatenate((v_n_last, [v_np1_last]), axis=0)
        v_mean = v_all.mean()
        v_std = v_all.std()
        
        dt_mean = X[:, 5].mean()
        dt_std = X[:, 5].std()
        return i_mean, i_std, v_mean, v_std, dt_mean, dt_std


    def normalize(self, X: np.ndarray) -> np.ndarray:
        # Normalize the input data X using the mean and std
        X_norm = (X - self.mean) / self.std
        return X_norm
    
    def denormalize(self, X_norm: np.ndarray) -> np.ndarray:
        # Denormalize the input data X_norm using the mean and std
        X_denorm = X_norm * self.std + self.mean
        return X_denorm
        
normalizer = Normalizer(X)
X_norm = normalizer.normalize(X)    


In [5]:
## create simple feedforward neural network that estimates parameters
class ParamEstimator(nn.Module):
    """Predicts physical parameters [L, RL, C, RC, Rdson, Rload, Vin, Vf] from a single sample."""

    def __init__(self, input_dim: int, hidden_layers: List[int]):
        super().__init__()
        dims = [input_dim] + hidden_layers + [8]
        layers: List[nn.Module] = []
        for in_dim, out_dim in zip(dims[:-1], dims[1:]):
            layers.append(nn.Linear(in_dim, out_dim))
            # last layer linear, others tanh
            if out_dim != 8:
                layers.append(nn.Tanh())
        self.net = nn.Sequential(*layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # x: [i_n, v_n, i_np1, v_np1, D, dt]
        return self.net(x)


def denorm_physical_params(
    L: torch.Tensor,
    RL: torch.Tensor,
    C: torch.Tensor,
    RC: torch.Tensor,
    Rdson: torch.Tensor,
    Rload: torch.Tensor,
    Vin: torch.Tensor,
    Vf: torch.Tensor,
) -> Tuple[
    torch.Tensor,
    torch.Tensor,
    torch.Tensor,
    torch.Tensor,
    torch.Tensor,
    torch.Tensor,
    torch.Tensor,
    torch.Tensor,
]:
    """
    Denormalize the physical parameters from their logarithmic form.
    The parameters are expected to be in the logarithmic scale.
    """
    L = torch.exp(L) * 1e-6  # assume the network gets the uH value
    RL = torch.exp(RL)
    C = torch.exp(C) * 1e-6  # assume the network gets the uF value
    RC = torch.exp(RC)
    Rdson = torch.exp(Rdson) * 1e-3  # the Rdson is quite small, so we assume it is in mOhm
    Rload = torch.exp(Rload)*10  # Rload is in Ohm
    Vin = torch.exp(Vin)*10  # Vin is in V
    Vf = torch.exp(Vf)  # Vf is in V
    return L, RL, C, RC, Rdson, Rload, Vin, Vf


# --- Physics Forward RK4 ---
def physics_forward(
    x_n: torch.Tensor, params: torch.Tensor, normalizer: Normalizer
) -> torch.Tensor:
    """
    Given x_n = [i_n, v_n, i_np1, v_np1, D, dt] and predicted params,
    reconstruct x_np1_pred = [i_np1, v_np1].
    """
    # unnormalize the input data
    x_n = normalizer.denormalize(x_n)
    
    # unpack inputs
    i_n = x_n[:, 0:1]
    v_n = x_n[:, 1:2]
    D = x_n[:, 4:5]
    dt = x_n[:, 5:6]
    
    # unpack params
    L, RL, C, RC, Rdson, Rload, Vin, Vf = torch.split(params, 1, dim=1)

    # the model actually predicts the logarithm of the parameters:denormalize parameters

    L, RL, C, RC, Rdson, Rload, Vin, Vf = denorm_physical_params(
        L, RL, C, RC, Rdson, Rload, Vin, Vf
    )

    i_np1_pred, v_np1_pred = predict_next_state(
        i_n, v_n, D, dt, L, RL, C, RC, Rdson, Rload, Vin, Vf
    )

    return torch.cat([i_np1_pred, v_np1_pred], dim=1)


def predict_next_state(
    i_n: torch.Tensor,
    v_n: torch.Tensor,
    D: torch.Tensor,
    dt: torch.Tensor,
    L: torch.Tensor,
    RL: torch.Tensor,
    C: torch.Tensor,
    RC: torch.Tensor,
    Rdson: torch.Tensor,
    Rload: torch.Tensor,
    Vin: torch.Tensor,
    Vf: torch.Tensor,
) -> torch.Tensor:
    """Predict the next state [i_np1, v_np1] using the RK4 method."""

    def f(i: torch.Tensor, v: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        di = -((D * Rdson + RL) * i + v - D * Vin + (1 - D) * Vf) / L
        dv = (C * RC * Rload * di + Rload * i - v) / (C * (RC + Rload))
        return di, dv

    # RK4 steps
    k1_i, k1_v = f(i_n, v_n)
    k2_i, k2_v = f(i_n + 0.5 * dt * k1_i, v_n + 0.5 * dt * k1_v)
    k3_i, k3_v = f(i_n + 0.5 * dt * k2_i, v_n + 0.5 * dt * k2_v)
    k4_i, k4_v = f(i_n + dt * k3_i, v_n + dt * k3_v)

    i_np1_pred = i_n + (dt / 6) * (k1_i + 2 * k2_i + 2 * k3_i + k4_i)
    v_np1_pred = v_n + (dt / 6) * (k1_v + 2 * k2_v + 2 * k3_v + k4_v)
    return i_np1_pred, v_np1_pred


# --- Loss ---
def compute_loss(x_np1_pred: torch.Tensor, x_np1_true: torch.Tensor) -> torch.Tensor:
    return torch.sum((x_np1_pred - x_np1_true) ** 2)

In [48]:
Rload = Rload1 * np.ones_like(i_n)

# train the model for 10 epochs

X_t = torch.tensor(X_norm, dtype=torch.float32)
Rload_t = torch.tensor(Rload, dtype=torch.float32).view(-1, 1)
model = ParamEstimator(input_dim=6, hidden_layers=[32, 32])
device = "cpu"

model.to(device)
X_t = X_t.to(device)
Rload_t = Rload_t.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=adam_lr)

for epoch in range(int(10e4)):
    optimizer.zero_grad()
    
    # forward pass
    params_pred = model(X_t)
    x_n1_pred = physics_forward(X_t, params_pred, normalizer)
    loss = compute_loss(X_t[:, :2], x_n1_pred)
    
    # backward propagation
    loss.backward()
    optimizer.step()
    
    
    
    # get explicit predictions
    L, RL, C, RC, Rdson, Rload, Vin, Vf = torch.split(params_pred, 1, dim=1)
    
    # denormalize parameters and print the predicted values
    L, RL, C, RC, Rdson, Rload, Vin, Vf = denorm_physical_params(
        L, RL, C, RC, Rdson, Rload, Vin, Vf
    )
    
    
    if epoch % 1000 == 0:
        print(f"Epoch {epoch + 1}:")
        print(f"loss: {loss.item():.3e}, L: {L.mean().item():.6f} H, RL: {RL.mean().item():.6f} Ohm, C: {C.mean().item():.6f} F")
        print(f"RC: {RC.mean().item():.6f} Ohm, Rdson: {Rdson.mean().item():.6f} Ohm, Rload: {Rload.mean().item():.6f} Ohm, Vin: {Vin.mean().item():.6f} V, Vf: {Vf.mean().item():.6f} V")

    


  X_denorm = X_norm * self.std + self.mean


Epoch 1:
loss: 2.493e+15, L: 0.000001 H, RL: 0.839566 Ohm, C: 0.000001 F
RC: 1.084265 Ohm, Rdson: 0.001093 Ohm, Rload: 0.922742 Ohm, Vin: 0.897291 V, Vf: 1.017835 V
Epoch 1001:
loss: 2.419e+11, L: 0.000002 H, RL: 0.561840 Ohm, C: 0.000002 F
RC: 0.873726 Ohm, Rdson: 0.000925 Ohm, Rload: 1.038574 Ohm, Vin: 0.958946 V, Vf: 0.675467 V
Epoch 2001:
loss: 5.547e+10, L: 0.000002 H, RL: 0.523290 Ohm, C: 0.000002 F
RC: 0.820186 Ohm, Rdson: 0.000882 Ohm, Rload: 1.094938 Ohm, Vin: 0.971882 V, Vf: 0.627829 V
Epoch 3001:
loss: 2.097e+10, L: 0.000002 H, RL: 0.499905 Ohm, C: 0.000003 F
RC: 0.784087 Ohm, Rdson: 0.000855 Ohm, Rload: 1.135669 Ohm, Vin: 0.982134 V, Vf: 0.598641 V
Epoch 4001:
loss: 9.577e+09, L: 0.000002 H, RL: 0.482155 Ohm, C: 0.000003 F
RC: 0.754917 Ohm, Rdson: 0.000833 Ohm, Rload: 1.169956 Ohm, Vin: 0.991397 V, Vf: 0.576304 V
Epoch 5001:
loss: 4.801e+09, L: 0.000002 H, RL: 0.467246 Ohm, C: 0.000003 F
RC: 0.729265 Ohm, Rdson: 0.000815 Ohm, Rload: 1.200933 Ohm, Vin: 1.000329 V, Vf: 0.5574

In [None]:
pysical_param_bounds = (1e-5, 1e-2)

def denorm_physical_params(
    L: torch.Tensor,
    RL: torch.Tensor,
    C: torch.Tensor,
    RC: torch.Tensor,
    Rdson: torch.Tensor,
    Rload: torch.Tensor,
    Vin: torch.Tensor,
    Vf: torch.Tensor,
) -> Tuple[
    torch.Tensor,
    torch.Tensor,
    torch.Tensor,
    torch.Tensor,
    torch.Tensor,
    torch.Tensor,
    torch.Tensor,
    torch.Tensor,
]:
    """
    Denormalize the physical parameters from their logarithmic form.
    The parameters are expected to be in the logarithmic scale.
    """
    L = torch.clamp(L, *pysical_param_bounds)*1e-6
    RL = torch.clamp(RL, *pysical_param_bounds)
    C = torch.clamp(C, *pysical_param_bounds) * 1e-6  # assume the network gets the uF value
    RC = torch.clamp(RC, *pysical_param_bounds)
    Rdson = torch.clamp(Rdson, *pysical_param_bounds) * 1e-1  # the Rdson is quite small
    Rload = torch.clamp(Rload, *pysical_param_bounds) * 10  # Rload is in Ohm
    Vin = torch.clamp(Vin, *pysical_param_bounds) * 10  # Vin is in V
    Vf = torch.clamp(Vf, *pysical_param_bounds)  # Vf is in V
    
    # since the network favors small values, let's artificially increase the prediction scale
    for param in [L, RL, C, RC, Rdson, Rload, Vin, Vf]:
        param *= 10.0
    return L, RL, C, RC, Rdson, Rload, Vin, Vf

In [7]:
Rload = Rload1 * np.ones_like(i_n)
X_t = torch.tensor(X_norm, dtype=torch.float32)
Rload_t = torch.tensor(Rload, dtype=torch.float32).view(-1, 1)

device = "cpu"
# --- Scheduler setup ---
base_lr = 1e-3
min_lr = 1e-5
cosine_amp = base_lr - min_lr
cosine_floor = min_lr
warmup_epochs = 5000
total_epochs = int(50e4)


# Custom amplitude decay controller
plateau_counter = 0
prev_loss = None
plateau_patience = 6000
cosine_shrink_factor = 0.7


def get_cosine_lr(epoch: int, amp: float, period: int) -> float:
    if epoch < warmup_epochs:
        return cosine_floor + amp * (epoch + 1) / warmup_epochs
    else:
        progress = (epoch - warmup_epochs) / (period - warmup_epochs)
        return cosine_floor + amp * 0.5 * (1 + np.cos(np.pi * progress))


# --- Model setup ---
model = ParamEstimator(input_dim=6, hidden_layers=[32, 32])
model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=base_lr)
scheduler_plateau = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode="min", factor=0.7, patience=3000
)


# --- Tracking ---
loss_history = []
param_history = []
lr_history = []
best_loss = float("inf")
best_state = None

for epoch in range(total_epochs):

    current_lr = get_cosine_lr(epoch, cosine_amp, total_epochs/10)
    # Update learning rate manually
    for g in optimizer.param_groups:
        g["lr"] = current_lr

    optimizer.zero_grad()

    # forward pass
    params_pred = model(X_t)
    x_n1_pred = physics_forward(X_t, params_pred, normalizer)
    loss = compute_loss(X_t[:, :2], x_n1_pred)

    # backward propagation
    loss.backward()
    optimizer.step()

    # Track loss
    loss_val = loss.item()

    # Detect plateau
    if prev_loss is not None and abs(loss_val - prev_loss) / (prev_loss + 1e-8) < 1e-4:
        plateau_counter += 1
    else:
        plateau_counter = 0
    prev_loss = loss_val

    # Decay cosine amplitude if loss plateaus
    if plateau_counter >= plateau_patience:
        cosine_amp *= cosine_shrink_factor
        plateau_counter = 0
        print(f"[Epoch {epoch}] Plateau detected. Shrinking cosine amplitude to {cosine_amp:.2e}")

    # get explicit predictions
    L, RL, C, RC, Rdson, Rload, Vin, Vf = torch.split(params_pred, 1, dim=1)

    # denormalize parameters and print the predicted values
    L, RL, C, RC, Rdson, Rload, Vin, Vf = denorm_physical_params(
        L, RL, C, RC, Rdson, Rload, Vin, Vf
    )

    # checkpoint
    if loss.item() < best_loss:
        best_loss = loss.item()
        best_state = {
            "epoch": epoch,
            "model_state": model.state_dict(),
            "optimizer_state": optimizer.state_dict(),
            "physical_paramers": {
                "L": L.mean().item(),
                "RL": RL.mean().item(),
                "C": C.mean().item(),
                "RC": RC.mean().item(),
                "Rdson": Rdson.mean().item(),
                "Rload": Rload.mean().item(),
                "Vin": Vin.mean().item(),
                "Vf": Vf.mean().item(),
            },
            "loss": best_loss,
        }
    if epoch % 1000 == 0:
        print(f"Epoch {epoch + 1}:")
        print(
            f"loss: {loss.item():.3e}, lr: {optimizer.param_groups[0]["lr"]:.6f}, L: {L.mean().item():.6f} H, RL: {RL.mean().item():.6f} Ohm, C: {C.mean().item():.6f} F"
        )
        print(
            f"RC: {RC.mean().item():.6f} Ohm, Rdson: {Rdson.mean().item():.6f} Ohm, Rload: {Rload.mean().item():.6f} Ohm, Vin: {Vin.mean().item():.6f} V, Vf: {Vf.mean().item():.6f} V"
        )

    # adaptive schedule
    scheduler_plateau.step(loss)

    loss_history.append(loss_val)
    param_history.append({
        'L': L.mean().item(), 'RL': RL.mean().item(), 'C': C.mean().item(),
        'RC': RC.mean().item(), 'Rdson': Rdson.mean().item(), 'Rload': Rload.mean().item(),
        'Vin': Vin.mean().item(), 'Vf': Vf.mean().item()
    })
    # save the lr parameter
    lr_history.append(optimizer.param_groups[0]["lr"])

  X_denorm = X_norm * self.std + self.mean


Epoch 1:
loss: 4.683e+68, lr: 0.000010, L: 0.000000 H, RL: 0.010516 Ohm, C: 0.000000 F
RC: 0.012520 Ohm, Rdson: 0.009827 Ohm, Rload: 0.689042 Ohm, Vin: 1.000000 V, Vf: 0.040396 V
Epoch 1001:
loss: nan, lr: 0.000208, L: nan H, RL: nan Ohm, C: nan F
RC: nan Ohm, Rdson: nan Ohm, Rload: nan Ohm, Vin: nan V, Vf: nan V
Epoch 2001:
loss: nan, lr: 0.000406, L: nan H, RL: nan Ohm, C: nan F
RC: nan Ohm, Rdson: nan Ohm, Rload: nan Ohm, Vin: nan V, Vf: nan V


KeyboardInterrupt: 

In [2]:
best_state["physical_paramers"] = {
    "L": best_state["physical_paramers"]["L"],
    "RL": best_state["physical_paramers"]["RL"],
    "C": best_state["physical_paramers"]["C"],
    "RC": best_state["physical_paramers"]["RC"],
    "Rdson": best_state["physical_paramers"]["Rdson"],
    "Rload": best_state["physical_paramers"]["Rload"],
    "Vin": best_state["physical_paramers"]["Vin"],
    "Vf": best_state["physical_paramers"]["Vf"],
}


# compare with nominal values
def tensor_to_average(tensor: torch.Tensor) -> float:
    """Convert a tensor to a float and return its average."""
    return tensor.mean().item() if isinstance(tensor, torch.Tensor) else float(tensor)

print("Best state:")
print(f"Epoch: {best_state['epoch']}")
print(f"Loss: {best_state['loss']:.6e}")


NameError: name 'best_state' is not defined