# Parallel Wiener-Hammerstein system

See http://arxiv.org/pdf/1708.06543 for more information on the dataset.


In [1]:
import sys
sys.path.insert(0, '..')

import matplotlib.pyplot as plt
import nonlinear_benchmarks as nlb
import numpy as np
import optimistix as optx

from src import best_linear_approximation as bla
from src import data_manager

np.random.seed(42)  # for reproducibility

# Load data
ParWH_full_train, ParWH_full_test = nlb.ParWH() 

# Initialise variables
N = 16384  # number of samples per period
R = 5  # number of random phase multisine realisations
P = 2  # number of periods
amplitude_level = 4  # must be one of {0, 1, 2, 3, 4}

nu, ny = 1, 1  # SISO system

fs = 78e3 # [Hz]
f_idx = np.arange(1, 4096)  # frequency lines of interest (excludes DC)

# Load data
ParWH_full_train, ParWH_full_test = nlb.ParWH() 
ParWH_train = [
    data for data in ParWH_full_train
    for phase in range(R)
    if data.name == f'Est-phase-{phase}-amp-{amplitude_level}'
]
ParWH_test = [
    data for data in ParWH_full_test
    if data.name == f'Val-amp-{amplitude_level}'
][0]

# Preprocess data
u_train = np.array([data.u for data in ParWH_train]).reshape(R, nu, N, P)
y_train = np.array([data.y for data in ParWH_train]).reshape(R, ny, N, P)
u_train = np.transpose(u_train, (2, 1, 0, 3))
y_train = np.transpose(y_train, (2, 1, 0, 3))

u_test = np.transpose(ParWH_test.u.reshape(1, nu, N, 2), (2, 1, 0, 3))
y_test = np.transpose(ParWH_test.y.reshape(1, ny, N, 2), (2, 1, 0, 3))

# Create input-output training data object
io_data = data_manager.create_data_object(u_train, y_train, f_idx, fs)

Step 1: Best Linear Approximation

In [2]:
##### (i) Nonparametric estimate #####
G_nonpar = bla.compute_nonparametric(io_data)
 
##### (ii) Parametrize using frequency-domain subspace identification method #####
nx = 12  # number of states
q = nx + 1  # subspace dimensioning parameter
bla_fsid = bla.freq_subspace_id(G_nonpar, nx, q)

# Simulate and check time-domain performance
u_bar = np.mean(io_data.time.u, axis=-1)  # we take the mean over the periods
y_bar = np.mean(io_data.time.y, axis=-1)  # we take the mean over the periods
handicap = 1000  # number of samples to start 'ahead of time' for transient effects to die out (only works for periodic data!)

y_sim_bla_fsid = bla_fsid.simulate(u_bar, handicap=handicap)[0]
NRMSE_bla_fsid = 100 * np.sqrt(np.mean((y_bar - y_sim_bla_fsid)**2)) / np.sqrt(np.mean(y_bar**2)) 

print(f'NRMSE of FSID BLA: {NRMSE_bla_fsid:.2f}%\n')

##### (iii) Frequency-domain iterative optimization starting from FSID BLA #####
solver = optx.LevenbergMarquardt(rtol=1e-3, atol=1e-6)
max_iter = 1000

bla_opti = bla.freq_iterative_optimization(G_nonpar, bla_fsid, solver, max_iter)

# Simulate and check time-domain performance
y_sim_bla_opti = bla_opti.simulate(u_bar, handicap=handicap)[0]
NRMSE_bla_opti = 100 * np.sqrt(np.mean((y_bar - y_sim_bla_opti)**2)) / np.sqrt(np.mean(y_bar**2)) 

print(f'NRMSE of opti BLA: {NRMSE_bla_opti:.2f}%\n')

x_sim_bla_opti = bla_opti.simulate(u_bar, handicap=handicap)[1]
x_std = np.std(x_sim_bla_opti, axis=(0,2))
print(f'Standard deviation of states: {x_std}')


NRMSE of FSID BLA: 13.31%

Starting iterative optimization...
   Iteration 0, Loss: 1.0901e+00
   Iteration 1, Loss: 8.7186e-01
   Iteration 2, Loss: 8.7186e-01
   Iteration 3, Loss: 8.7186e-01
   Iteration 4, Loss: 8.6039e-01
   Iteration 5, Loss: 8.6039e-01
   Iteration 6, Loss: 7.9238e-01
   Iteration 7, Loss: 7.7413e-01
   Iteration 8, Loss: 7.5685e-01
   Iteration 9, Loss: 7.4428e-01
   Iteration 10, Loss: 7.3171e-01
   Iteration 11, Loss: 7.2014e-01
   Iteration 12, Loss: 7.1208e-01
   Iteration 13, Loss: 7.0800e-01
   Iteration 14, Loss: 7.0613e-01
   Iteration 15, Loss: 7.0507e-01
   Iteration 16, Loss: 7.0431e-01
   Iteration 17, Loss: 7.0369e-01
   Iteration 18, Loss: 7.0315e-01
   Iteration 19, Loss: 7.0263e-01
   Iteration 20, Loss: 7.0213e-01
   Iteration 21, Loss: 7.0162e-01
   Iteration 22, Loss: 7.0108e-01
   Iteration 23, Loss: 7.0047e-01
   Iteration 24, Loss: 6.9977e-01
   Iteration 25, Loss: 6.9889e-01
   Iteration 26, Loss: 6.9889e-01
   Iteration 27, Loss: 6.9783e

In [3]:
G_parr = G_par.frequency_response(io_data.freq.f[io_data.freq.f_idx])

plt.figure()
plt.plot( 20*np.log10(np.abs(G_parr.squeeze())), label='Parametric Model', color='C0')
plt.show()

NameError: name 'G_par' is not defined

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors

# Generate random values between 0 and 100
np.random.seed(42)
data = np.random.randint(0, 101, (5, 5))  # 5x5 table

# Create a colormap from red to green
cmap = mcolors.LinearSegmentedColormap.from_list("custom", ["red", "green"])

fig, ax = plt.subplots(figsize=(5, 5))

# Remove axis ticks
ax.set_xticks([])
ax.set_yticks([])

# Display table-like heatmap
for i in range(data.shape[0]):
    for j in range(data.shape[1]):
        value = data[i, j]
        color = cmap(value / 100)  # Normalize to [0,1] for colormap
        ax.add_patch(plt.Rectangle((j, -i), 1, 1, color=color))
        ax.text(j + 0.5, -i + 0.5, f"{value}", ha='center', va='center', color="black")

# Adjust limits
ax.set_xlim(0, data.shape[1])
ax.set_ylim(-data.shape[0], 0)
ax.set_aspect("equal")

plt.show()


In [None]:
# Estimate the best linear approximation
nx = 12
q = nx + 3

optim_options = bla_optimisation.OptiOptions(
    solver = optx.LevenbergMarquardt(rtol=1e-3, atol=1e-6),
    max_iter = 100
)

IDM.freq_subspace_id(nx, q, W='std_tot', optim_options=optim_options)

In [None]:
polynomial_degree = 7
nw = 2
nz = 2

phi = basis_functions.Polynomial(nz=nz, cross_terms=False, degree=polynomial_degree)

lambda_w = 1
fixed_point_iters = 10
solver = optx.LevenbergMarquardt(rtol=1e-3, atol=1e-6, verbose=frozenset({'step', 'loss', 'accepted'}))
# solver = optx.BFGS(rtol=1e-3, atol=1e-6)
freeze_bla = True
seed = 3

mod = nonlinear_lfr_init.init(IDM, nw=nw, phi=phi, lambda_w=lambda_w, fixed_point_iters=fixed_point_iters, solver=solver, freeze_bla=freeze_bla, seed=seed, max_iter=50)

In [None]:
# Optimise one-step-ahead state predictions (optional)
ModelNonlinearLFR_opti = nonlinear_lfr_opti.optimise_state_predictions(
    IDM,
    solver=optx.BFGS(rtol=1e-8, atol=1e-8),
)

In [None]:
# # Optimise one-step-ahead state predictions (optional)
# ModelNonlinearLFR_opti = nonlinear_lfr_opti.optimise_state_predictions(
#     IDM,
#     solver=optx.BFGS(rtol=1e-8, atol=1e-8),
# )

In [None]:
# Optimise simulation error# Optimise one-step-ahead state predictions (optional)
# ModelNonlinearLFR_opti = nonlinear_lfr_opti.optimise_state_predictions(
#     IDM,
#     solver=optx.BFGS(rtol=1e-8, atol=1e-8),
# )

from reinbos.utils import misc

custom_init = nonlinear_lfr_opti.DecisionVars(
    B_w=misc.OptiParam(1/10*ModelNonlinearLFR.B_w),
    D_yw=misc.OptiParam(1/10*ModelNonlinearLFR.D_yw),
)


ModelNonlinearLFR_opti_sim = nonlinear_lfr_opti.optimise_simulation_error(
    IDM,
    solver=optx.LevenbergMarquardt(rtol=1e-3, atol=1e-6),
    max_iter=200,
    custom_init=custom_init,
)

In [None]:
# Evaluate the model on test data
ParWH_test = [
    data for data in ParWH_full_test
    if data.name == f'Val-amp-{amp_level}' or data.name == 'ValArr'
]

test_ms = ParWH_test[0]
test_arr = ParWH_test[2]

# Multisine test
u_test_ms = np.transpose(test_ms.u.reshape(1, nu, N, 2), (2, 1, 0, 3))
y_test_ms = test_ms.y[::2]
u_test_ms = (u_test_ms - u_mean) / u_std

y_bla_ms = IDM.bla.opti.model.simulate(u_test_ms)[1]
y_lfr_ms = ModelNonlinearLFR_opti_sim.simulate(u_test_ms)[2]
y_bla_ms = np.squeeze(y_bla_ms[..., 1] * y_std + y_mean)
y_lfr_ms = np.squeeze(y_lfr_ms[..., 1] * y_std + y_mean)

e_bla_ms = y_test_ms - y_bla_ms
e_lfr_ms = y_test_ms - y_lfr_ms

E_bla_ms = 1 / N * np.fft.rfft(e_bla_ms, axis=0)
E_lfr_ms = 1 / N * np.fft.rfft(e_lfr_ms, axis=0)
Y_test_ms = 1 / N * np.fft.rfft(y_test_ms, axis=0)

print(f'Multisine test RMSE BLA: {np.sqrt(np.mean(e_bla_ms**2)):.4e} ({100*np.std(e_bla_ms)/np.std(y_test_ms):.2f}%)')
print(f'Multisine test RMSE nonlinear LFR: {np.sqrt(np.mean(e_lfr_ms**2)):.4e} ({100*np.std(e_lfr_ms)/np.std(y_test_ms):.2f}%)')

fig, axs = plt.subplots(1, 2, figsize=(15, 5))
axs[0].plot(IDM.data.time.t, y_test_ms, label='system output')
axs[0].plot(IDM.data.time.t, e_bla_ms, label='BLA error')
axs[0].plot(IDM.data.time.t, e_lfr_ms, label='nonlinear LFR error')
axs[0].set_title('Multisine - Time Domain')
axs[0].set_xlabel('time [s]')
axs[0].set_ylabel('amplitude [-]')
axs[0].legend()

axs[1].plot(IDM.data.freq.f[f_idx], 20*np.log10(np.abs(Y_test_ms[f_idx])), label='system output')
axs[1].plot(IDM.data.freq.f[f_idx], 20*np.log10(np.abs(E_bla_ms[f_idx])), label='BLA error')
axs[1].plot(IDM.data.freq.f[f_idx], 20*np.log10(np.abs(E_lfr_ms[f_idx])), label='nonlinear LFR error')
axs[1].set_title('Multisine - Frequency Domain')
axs[1].set_xlabel('frequency [Hz]')
axs[1].set_ylabel('magnitude [dB]')
axs[1].legend()
plt.tight_layout()
plt.show()


# Arrow test
u_test_arr = test_arr.u.reshape(-1, 1, 1)
u_test_arr = (u_test_arr - u_mean) / u_std
y_test_arr = test_arr.y

y_bla_ss = IDM.bla.opti.model.simulate(u_test_arr, P_trans=1)[1]
y_lfr_ss = ModelNonlinearLFR_opti_sim.simulate(u_test_arr, P_trans=1)[2]
y_bla_ss = np.squeeze(y_bla_ss * y_std + y_mean)
y_lfr_ss = np.squeeze(y_lfr_ss * y_std + y_mean)

e_bla_ss = y_test_arr - y_bla_ss
e_lfr_ss = y_test_arr - y_lfr_ss

print(f'Arrow test RMSE BLA: {np.sqrt(np.mean(e_bla_ss**2)):.4e} ({100*np.std(e_bla_ss)/np.std(y_test_arr):.2f}%)')
print(f'Arrow test RMSE nonlinear LFR: {np.sqrt(np.mean(e_lfr_ss**2)):.4e} ({100*np.std(e_lfr_ss)/np.std(y_test_arr):.2f}%)')

t_ss = np.linspace(0, len(y_test_arr) / fs, len(y_test_arr))

plt.figure(figsize=(8, 5))
plt.plot(t_ss, y_test_arr, label='system output')
plt.plot(t_ss, e_bla_ss, label='BLA error')
plt.plot(t_ss, e_lfr_ss, label='nonlinear LFR error')
plt.title('Arrow test - Time Domain')
plt.xlabel('time [s]')
plt.ylabel('amplitude [-]')
plt.legend()
plt.tight_layout()
plt.show()
