# Simulation of the plant (reflow oven) and the controller

In this notebook we simulate both the plant along with the controller.

## System identification of the plant

First of all, we need to have a sufficiently accurate model of the plant.

Previous tests showed that it can be modelled as two first oder lags and one delay in series.<br />
This models the (relatively fast) heating process and the slow cooling process, as well as some
delay.

A more accurate physical model shows that we have four first order lags and one delay.

In [391]:
import numpy as np
import matplotlib.pyplot as plt
import control as ctrl
import pandas as pd
from scipy.optimize import minimize

In [392]:
# 100% PWM signal with stop after 100 deg C (October)
path_prefix = './../../measurements/2024-10-27/'
meas_file = 'signals_step_100_percent_until_100_degC_cooling_down_door_closed.log'
signal_100_deg_C_stop = path_prefix + meas_file

# 100% PWM signal with stop after 200 deg C (October)
path_prefix = './../../measurements/2024-10-27/'
meas_file = 'signals_step_100_percent_until_200_degC_cooling_down_door_closed.log'
signal_200_deg_C_stop = path_prefix + meas_file

measurement_files = [signal_100_deg_C_stop, signal_200_deg_C_stop]
N_measurements = len(measurement_files)

In [None]:
# Peek into the data.
df = pd.read_csv(measurement_files[0], skiprows=2)

fig = plt.figure(meas_file, figsize=(10,5))
plt.clf()

ax = fig.add_subplot(1,1,1)
ax.plot(df['Time (ms)']/1000, df['Oven temperature (C)'],
         linestyle='-', label='Oven temperature (C)')
ax.plot(df['Time (ms)']/1000, df['Ambient temperature (C)'],
         linestyle='--', label='Ambient temperature (C)')
ax.plot(df['Time (ms)']/1000, df['PWM controller (percent)'],
         linestyle='--', label='PWM controller (percent)')
ax.plot(df['Time (ms)']/1000, df['Thermocouple open']*25,
         linestyle='--', label='Thermocouple open (x25)')
ax.grid(True)
ax.set(ylim=(0, 300))
ax.set(xlabel='t (ms)')
ax.set(title='Measured signals')
ax.legend()

plt.show()

### Simple 2nd order model (with delay)

$$G_1(s) = \frac{6.7}{30s + 1}$$

$$G_2(s) = \frac{1.0}{480s + 1}$$

$$G_3(s) = \exp(-22 s)$$

In [394]:
# Simple model of the reflow oven fitted to the 100% PWM step that turns off once 100 deg C are reached.
# The parameters of the model have been obtained by manual tuning (not by numerical optimization).
tf_pt1_a = ctrl.tf([6.7], [30.0, 1.0])    # <-- Heating process with 30 seconds time constant
tf_pt1_b = ctrl.tf([1.0], [480.0, 1.0])   # <-- Cooling process with 480 seconds time constants
num, den = ctrl.pade(22, 5)               # <-- 22 seconds delay
tf_Td = ctrl.tf(num, den)
G = ctrl.series(tf_Td, tf_pt1_a, tf_pt1_b)

In [None]:
# Read the first 2 lines of the log file.
# There we find the sample time and the maximum runtime.
with open(measurement_files[0]) as fid:
    first_lines = [l for idx, l in enumerate(fid.readlines()) if idx <= 1]

sample_time_ms = int(first_lines[0].strip('Sample time (ms): '))
max_runtime_seconds = int(first_lines[1].strip('Maximum runtime (s): '))

print(f'Sample time: {sample_time_ms} ms')
print(f'Maximum runtime: {max_runtime_seconds} seconds')


Model as linear system using the linear systems functions.

In [None]:
# Make sure t is equally spaced
sample_time_seconds = sample_time_ms * 1.e-3
sample_frequency = 1.0 / sample_time_seconds
t = np.arange(0,max_runtime_seconds*sample_frequency+1) * sample_time_seconds

theta = df['Oven temperature (C)'].to_numpy()
theta_0 = np.mean(theta[:40])
print(f'theta_0 = {theta_0} deg C')

u = df['PWM controller (percent)'].to_numpy()

time_response = ctrl.forced_response(G, t, u, 0.0)

fig = plt.figure('simulation', figsize=(10,5))
plt.clf()
ax = fig.add_subplot(1,1,1)

ax.plot(t, u, linestyle='--', label='PWM (percent)')
ax.plot(t, theta - theta_0, linestyle='-', label='Meas. temperature (minus offset)')
ax.plot(t, time_response.outputs, linestyle='-', label='Sim. temperature (minus offset)')
ax.legend()
ax.grid(True)
ax.set(title='Simulation with simple model')

plt.show()

Model as (potentially) nonlinear system.

Also, consider the ambient temperature, that is, do not remove the constant offset in the oven temperature like before.

In [None]:
#
# Oven dynamics
#
def oven_update_fun(t, x, u, params):
    theta_o_k = x[0] # <-- We have one state, the oven temperature
    theta_h_k = u[0] # <-- The first input is the actual input: the heating temperature
    theta_a_k = u[1] # <-- The second input is the disturbance: the ambient temperature
    q_zu_k = params['c']*(theta_h_k - theta_o_k)
    q_ab_o_k = params['alpha']*(theta_o_k - theta_a_k)
    dt = params['dt']
    theta_o_kp1 = theta_o_k + dt*(q_zu_k - q_ab_o_k)
    # Emulate the simple system
    # theta_o_kp1 = theta_o_k + dt*(1.0/480.0 * theta_h_k - 1/480.0 * theta_o_k)
    return theta_o_kp1

def oven_output_fun(t, x, u, params):
    theta_o_k = x[0]
    return theta_o_k

oven_params = { 'c': 1.0/480.0,
                'alpha': 0.0,
                'dt': sample_time_seconds }

H_oven = ctrl.NonlinearIOSystem(
    oven_update_fun,
    oven_output_fun,
    inputs=['theta_h', 'theta_a'],
    outputs=['theta_o'],
    states=1,
    dt=oven_params['dt'],
    name='Oven',
    params=oven_params)

print(H_oven)

#
# Heating element dynamics
#
def heating_elem_update_fun(t, x, u, params):
    theta_h_k = x[0] # <-- We have one state, the heating element temperature
    u_k = u[0]       # <-- The first input is the actual input: the control input
    theta_a_k = u[1] # <-- The second input is the disturbance: the ambient temperature
    q_ab_h_k = params['beta']*(theta_h_k - theta_a_k)
    dt = params['dt']
    theta_h_kp1 = theta_h_k + dt*(params['K']*u_k - q_ab_h_k)
    # Emulate the simple system
    # theta_h_kp1 = theta_h_k + dt*(6.7/30.0 * u_k - 1.0/30.0 * theta_h_k)
    return theta_h_kp1

def heating_elem_output_fun(t, x, u, params):
    theta_h_k = x[0]
    return theta_h_k

heating_elem_params = { 'K': 6.7/30.0,
                        'beta': 1.0/30.0,
                        'dt': sample_time_seconds }

H_heating_elem = ctrl.NonlinearIOSystem(
    heating_elem_update_fun,
    heating_elem_output_fun,
    inputs=['u_pwm', 'theta_a'],
    outputs=['theta_h'],
    states=1,
    dt=heating_elem_params['dt'],
    name='HeatingElement',
    params=heating_elem_params)

print(H_heating_elem)

#
# Time delay
#
time_delay_seconds = 12 # 22
H_delay_num = [1.0]
H_delay_den = np.zeros((time_delay_seconds*int(sample_frequency)+1,))
H_delay_den[0] = 1.0
H_delay = ctrl.tf(H_delay_num, H_delay_den, sample_time_seconds, inputs='u', outputs='y')
print(H_delay)
H_delay = ctrl.ss(H_delay, name='TimeDelay')



In [None]:
# Compare individual systems against linear systems.

fig = plt.figure(1, figsize=(12,4))
plt.clf()

resp_heating_ref = ctrl.step_response(tf_pt1_a)
T = np.arange(0.0, 200.0, sample_time_seconds)
U = np.ones_like(T)
resp_heating_nonlinear_impl = ctrl.input_output_response(H_heating_elem,
                                    T=T,
                                    U=np.vstack((U, np.zeros_like(U))),
                                    X0=[0.0])

ax = fig.add_subplot(1, 3, 1)
ax.plot(resp_heating_ref.t, np.squeeze(resp_heating_ref.y))
ax.plot(resp_heating_nonlinear_impl.t, np.squeeze(resp_heating_nonlinear_impl.y), '--')
ax.grid(True)



resp_delay_ref = ctrl.step_response(tf_Td)
T = np.arange(0.0, 200.0, sample_time_seconds)
U = np.ones_like(T)
resp_delay_nonlinear_impl = ctrl.input_output_response(H_delay,
                                    T=T,
                                    U=U,
                                    X0=[0.0])

ax = fig.add_subplot(1, 3, 2)
ax.plot(resp_delay_ref.t, np.squeeze(resp_delay_ref.y))
ax.plot(resp_delay_nonlinear_impl.t, np.squeeze(resp_delay_nonlinear_impl.y), '--')
ax.grid(True)


resp_oven_ref = ctrl.step_response(tf_pt1_b)
T = np.arange(0.0, 3000.0, sample_time_seconds)
U = np.ones_like(T)
resp_oven_nonlinear_impl = ctrl.input_output_response(H_oven,
                                    T=T,
                                    U=np.vstack((U, np.zeros_like(U))),
                                    X0=[0.0])

ax = fig.add_subplot(1, 3, 3)
ax.plot(resp_oven_ref.t, np.squeeze(resp_oven_ref.y))
ax.plot(resp_oven_nonlinear_impl.t, np.squeeze(resp_oven_nonlinear_impl.y), '--')
ax.grid(True)

plt.show()

In [None]:
#
# Connect the invidivual systems in order to create the plant
#
H_plant = ctrl.interconnect(
    [H_oven, H_heating_elem, H_delay],
    connections=[['TimeDelay.u', 'HeatingElement.theta_h'],
                 ['Oven.theta_h', 'TimeDelay.y']
                ],
    inplist=['HeatingElement.u_pwm',
             ['Oven.theta_a',
             'HeatingElement.theta_a']
             ],
    outlist=['Oven.theta_o'],
    inputs=['u_pwm_overall', 'theta_a_overall'],
    outputs=['theta_o_overall'],
    name='Plant',
    dt=sample_time_seconds
)

print(H_plant)

In [400]:
# Load all measurement files.
# Store them in arrays of the shape (no. of measurements, size of the signal, sample count).
# Sample count is the same as long as we are using 2 Hz and 720 seconds.
# The size of the signal is '1' for the time, and '2' for the input 'u' (which consists of the PWM and the disturbant ambient temperature).

sample_count = int(max_runtime_seconds*sample_frequency+1)

# Make sure t is equally spaced
sample_time_seconds = sample_time_ms * 1.e-3
sample_frequency = 1.0 / sample_time_seconds
t_input = np.arange(0,max_runtime_seconds*sample_frequency+1) * sample_time_seconds

t_stacked = np.vstack([t_input for idx in range(N_measurements)])
t_stacked = np.reshape(t_stacked, (N_measurements, 1, sample_count))

theta_o_stacked = np.zeros((N_measurements, 1, sample_count))
u_input_stacked = np.zeros((N_measurements, 2, sample_count))

X0_stacked = np.zeros((N_measurements, H_delay.nstates + 2))

for idx, filename in enumerate(measurement_files):
    df = pd.read_csv(filename, skiprows=2)
    theta_o = df['Oven temperature (C)'].to_numpy()
    u_pwm_input = df['PWM controller (percent)'].to_numpy()
    theta_a_input = df['Ambient temperature (C)'].to_numpy()

    u_input_stacked[idx, ...] = np.vstack((u_pwm_input, theta_a_input))
    theta_o_stacked[idx, ...] = theta_o

    # Initialize with the measured oven temperature.
    X0 = np.hstack(([theta_o[0], theta_o[0]], theta_o[0]*np.ones((H_delay.nstates,))))
    X0_stacked[idx, ...] = X0


In [None]:
# Adapt the parameters for the simulation
params_union = {**oven_params, **heating_elem_params}
# Parameters of optimization for both measurements
params_union['c'] = 0.0002539390333566856
params_union['alpha'] = 0.0020248192667539446
params_union['K'] = 1.9767183297280781
params_union['beta'] = 0.036032596097208075



fig = plt.figure(1, figsize=(12, 4))
plt.clf()

for idx in range(N_measurements):
    results = ctrl.input_output_response(H_plant, np.squeeze(t_stacked[idx, ...]), u_input_stacked[idx], X0=X0_stacked[idx], params=params_union)

    ax = fig.add_subplot(1, N_measurements, idx+1)
    ax.plot(df['Time (ms)'] * 1.0e-3, np.squeeze(theta_o_stacked[idx, ...]), label='theta_o measured')
    ax.plot(results.t, results.y[0,:], '--', label='theta_o simulated')
    ax.grid(True)
    ax.legend()


plt.show()

In [None]:
# Numerically optimize the parameters

def cost_fun(x):
    # Change the parameters
    params_union = {**oven_params, **heating_elem_params}
    # Scaling for better numeric properties (heuristic scaling factors)
    params_union['c'] = x[0]/100
    params_union['K'] = x[1]/10
    params_union['alpha'] = x[2]/1000
    params_union['beta'] = x[3]/10

    total_err = 0.0
    measurement_cost_weights = [1.0, 1.0]
    # Consider measurements from all files
    for idx in range(N_measurements):
        results = ctrl.input_output_response(H_plant, np.squeeze(t_stacked[idx, ...]), u_input_stacked[idx], X0=X0_stacked[idx], params=params_union)

        err = np.squeeze(theta_o_stacked[idx, ...]) - results.y[0,:]
        # rel_err_values = err / np.squeeze(theta_o_stacked[idx, ...])
        # abs_err = np.linalg.norm(err, 2)
        # rel_err = abs_err / np.linalg.norm(theta_o, 2)
        # rel_err = np.linalg.norm(rel_err_values, 2) * measurement_cost_weights[idx]
        # total_err += rel_err
        total_err += np.linalg.norm(err, 1)

    return total_err

opt_x0 = [oven_params['c']*100, heating_elem_params['K']*10, oven_params['alpha']*1000, heating_elem_params['beta']*10] # <-- Use values from individual params
# opt_x0 = [params_union['c']*100, params_union['K']*10, params_union['alpha']*1000, params_union['beta']*10] # <-- Use values from the param union
optimization_result = minimize(cost_fun, x0 = opt_x0)
print(optimization_result)


In [None]:
# Take over the parameters from the optimization
params_union = {**oven_params, **heating_elem_params}
params_union['c'] = optimization_result.x[0]/100
params_union['K'] = optimization_result.x[1]/10
params_union['alpha'] = optimization_result.x[2]/1000
params_union['beta'] = optimization_result.x[3]/10

fig = plt.figure(1, figsize=(12, 4))
plt.clf()

for idx in range(N_measurements):
    results = ctrl.input_output_response(H_plant, np.squeeze(t_stacked[idx, ...]), u_input_stacked[idx], X0=X0_stacked[idx], params=params_union)

    ax = fig.add_subplot(1, N_measurements, idx+1)
    ax.plot(df['Time (ms)'] * 1.0e-3, np.squeeze(theta_o_stacked[idx, ...]), label='theta_o measured')
    ax.plot(results.t, results.y[0,:], '--', label='theta_o simulated')
    ax.grid(True)
    ax.legend()


plt.show()

In [None]:
print(params_union)

#
# Optimization based on single measurement (for time delay of 22 sec)
#

# Optimization with step to 100 deg C (initial optimization conditions taken from simple system):
# {'c': 4.926111189101776e-05, 'alpha': 0.001841564587659022, 'dt': 0.5, 'K': 12.927602344359068, 'beta': 0.04789952343640434}
# Optimization with step to 200 deg C (initial optimization conditions taken from simple system):
# {'c': 0.012049377847259197, 'alpha': -0.009722738429462177, 'dt': 0.5, 'K': 0.12530112801557586, 'beta': 0.10520764501815723}

# Optimization with step to 200 deg C (initial optimization conditions taken from 100 deg C optimization):
# {'c': 9.589400956512674e-05, 'alpha': 0.0022512347817292837, 'dt': 0.5, 'K': 12.927964020300056, 'beta': 0.08758068885986277}



#
# Optimization based on both measurements (initial optimization conditions taken from simple system):
# (for time delay of 22 sec)
#
# {'c': 0.00011250476620284177, 'alpha': 0.001960106965292331, 'dt': 0.5, 'K': 7.066191655178498, 'beta': 0.058449876507310816}

# Optimization with "real" relative norm (that is, normalizing each sample)
# worked quite well.
# I had to weigh the 200 deg C step twice for good results (as relative errors decline for large temperatures).

# Optimization with (absolute) 1-norm
# {'c': 0.0002539390333566856, 'alpha': 0.0020248192667539446, 'dt': 0.5, 'K': 1.9767183297280781, 'beta': 0.036032596097208075}

## Controller design

Take over the values from `read_signals.py` which I think proved already not that bad.

In [None]:
# Start with a regular PI controller (no constraints yet)

# Kp = ctrl.tf([0.5], [1.0])
# Ki = ctrl.tf([0.0010], [1.0, 0.0])
Kp = ctrl.tf([0.8], [1.0])
Ki = ctrl.tf([0.15], [1.0, 0.0])
K = ctrl.parallel(Kp, Ki)

def controller_udpate_fun(t, x, u, params):
    integrated_control_err = x[0]
    control_err = u[0]
    integrated_control_err += params['dt'] * control_err
    return integrated_control_err

def controller_output_fun(t, x, u, params):
    control_err = u[0]
    integrated_control_err = x[0]
    control_signal = params['kP']*control_err + params['kI']*integrated_control_err
    return control_signal

# H_controller = ctrl.tf(K, name='Controller', inputs='e', outputs='u', dt=sample_time_seconds)

controller_params = { 'kP': 0.5, 'kI': 0.0010 }
H_controller = ctrl.NonlinearIOSystem(
    controller_udpate_fun,
    controller_output_fun,
    inputs=['e'],
    outputs=['u'],
    states=1,
    dt=sample_time_seconds,
    name='Controller',
    params=controller_params
)
print(H_controller)

H_sum = ctrl.summing_junction(name='ControlErrorComputation', inputs=['r', '-y'], outputs=['e'])

H_total = ctrl.interconnect(
    [H_plant, H_sum, H_controller],
    connections=[['ControlErrorComputation.y', 'Plant.theta_o_overall'],
                 ['Plant.u_pwm_overall', 'Controller.u'],
                 ['Controller.e', 'ControlErrorComputation.e']],
    inplist=['ControlErrorComputation.r', 'Plant.theta_a_overall'],
    outlist=['Plant.theta_o_overall', 'Controller.u'],
    inputs=['r', 'theta_a'],
    outputs=['theta_o', 'control_signal'],
    name='TotalSystem',
    dt=sample_time_seconds
)

In [None]:
scalar_ambient_temp = 20.0

t_input = np.arange(0,max_runtime_seconds*sample_frequency+1) * sample_time_seconds
r_input = 100*np.ones_like(t_input)
theta_a_input = scalar_ambient_temp*np.ones_like(t_input)
u_input = np.vstack((r_input, theta_a_input))

X0 = np.hstack(([scalar_ambient_temp, scalar_ambient_temp], theta_o[0]*np.ones((H_delay.nstates,)), 0.0))

print(H_total)

params_union = {**oven_params, **heating_elem_params, **controller_params}
params_union['c'] = 0.0002539390333566856
params_union['alpha'] = 0.0020248192667539446
params_union['K'] = 1.9767183297280781
params_union['beta'] = 0.036032596097208075
params_union['kP'] = 0.8 # 0.5
params_union['kI'] = 0.0018 # 0.0010

results = ctrl.input_output_response(H_total, t_input, u_input, X0=X0, params=params_union)

fig = plt.figure(1)
plt.clf()
plt.plot(results.t, results.y[0,:], '-', label='theta_o')
plt.plot(results.t, r_input, '--', label='reference')
plt.plot(results.t, results.y[1,:], '-', label='control signal')
plt.grid(True)
plt.legend()
plt.show()

In [None]:
print(results.y.shape)