# Initializing the problem

In [1]:
import numpy as np
from scipy.signal import butter, filtfilt
import python_anesthesia_simulator as pas
import pickle

# load patient data
with open('patients_data_1100.pkl', 'rb') as f:
    patient_dic = pickle.load(f)

# define output function
def output(x, PD_par):
  c50p, c50r, gamma, _, _, _ = PD_par
  Up, Ur = x[3]/c50p, x[7]/c50r
  U = np.abs(Up + Ur)
  y = 97.4 * (1 - np.divide(U**gamma, 1 + U**gamma))
  return y

# define measurement noise
class measurement_noise:
    def __init__(self, num_samples, std_dev=3, cutoff=0.03, fs=1.0, order=2):
        self.num_samples = num_samples
        self.std_dev = std_dev
        self.cutoff = cutoff
        self.fs = fs
        self.order = order

    def generate_white_noise(self):
        return np.random.normal(0, self.std_dev, self.num_samples)

    def butter_lowpass_filter(self):
        nyquist = 0.5 * self.fs
        normal_cutoff = self.cutoff / nyquist
        b, a = butter(self.order, normal_cutoff, btype='low', analog=False)
        return b, a

    def apply_filter(self, data):
        b, a = self.butter_lowpass_filter()
        return filtfilt(b, a, data)

    def generate(self):
        white_noise = self.generate_white_noise()
        return self.apply_filter(white_noise)

# Defining the controller

In [2]:
class PID_A():
    def __init__(self, Kp, Ti, Td, gamma_p, ts):
        self.Kp = Kp
        self.Ti = Ti
        self.Td = Td
        self.gamma_p = gamma_p
        self.ts = ts
        self.N = 5

        self.last_error = 97.4 - 50 # initial error
        self.integral_part = 0
        self.derivative_part = 0
        self.ratio = 2
        self.Kp_history = []

    def one_step(self, BIS, y_ref):
        self.error = BIS - y_ref
        self.integral_part += self.ts / self.Ti * self.error
        self.derivative_part = (self.derivative_part * self.Td / self.N + self.Td * (self.error - self.last_error)) / (self.ts + self.Td / self.N)
        
        self.control_input = self.Kp * (self.error + self.integral_part + self.derivative_part)
        up = self.control_input # propofol infusion
        ur = self.control_input * self.ratio # remifentanil infusion

        # adapt Kp
        self.Kp += np.tanh(self.error) * self.gamma_p * self.error
        self.Kp = np.clip(self.Kp, 1e-4, 1)
        self.Kp_history.append(self.Kp)

        # apply anti-windup
        if ((up > 6.67 ) or (ur > 16.67)) or (((up > 6.67 ) or (ur > 16.67)) and (self.control_input * self.error >= 0)):
            up, ur = min(6.67, up), min(16.67, ur)
            self.integral_part -= self.ts / self.Ti * self.error # freeze the integral at its past value

        if ((up < 0 ) or (ur < 0)) or (((up < 0 ) or (ur < 0)) and (self.control_input * self.error >= 0)):
            up, ur = max(0, up), max(0, ur)
            self.integral_part -= self.ts / self.Ti * self.error # freeze the integral at its past value
        
        control_signal = [np.clip(up, 0, 6.67), np.clip(ur, 0, 16.67)] # saturation blocks

        self.last_error = self.error

        return control_signal

# Defining the simulation function

In [3]:
def pid_A_sim(Kp_i, Ti_i, Td_i, gamma_p_i, Kp_m, Ti_m, Td_m, t_sim, ts, patient_index, model_type='uncertain', noise=False):
    Ad_nom, Bd_nom, Ad_pert, Bd_pert, _, PD_nom, PD_real, _ = patient_dic[f'patient_{patient_index+1}']
    
    # two PID controllers for induction and maintenance
    pid_ind = PID_A(Kp=Kp_i, Ti=Ti_i, Td=Td_i, gamma_p = gamma_p_i, ts=ts)
    pid_mnt = PID_A(Kp=Kp_m, Ti=Ti_m, Td=Td_m, gamma_p = 0, ts=ts)

    # run the pid
    u_pid = np.zeros((2, t_sim))
    x_real = np.zeros(8)
    x_nom = np.zeros(8)
    y_meas = np.ones(t_sim)*97.4
    y_clean = np.zeros(t_sim)
    # y_clean is the actual BIS of the patient that we don't have access to in reality,
    # not the measured one that is affected by noise. The measured BIS is used as a
    # feedback to the controller. y_clean is used to study the actual effect of the
    # control action on the BIS of the patient in the presence of noisy measurements.
    Kp_hist = np.ones(600)

    if noise == True:
        noise = measurement_noise(t_sim).generate()
    else:
        noise = np.zeros(t_sim)

    for k in range(t_sim):
        if model_type == 'nominal':
            y_clean[k] = output(x_nom, PD_nom) + pas.disturbances.compute_disturbances(k, 'step', 600, 1200)[0]
        elif model_type == 'uncertain':
            y_clean[k] = output(x_real, PD_real) + pas.disturbances.compute_disturbances(k, 'step', 600, 1200)[0]
        y_meas[k] = np.clip(y_clean[k] + noise[k], 0, 100)
        
        if np.mod(k, ts) == 0:
            if k >= 600:
                delta = y_meas[k] - 50
                if delta > 0:
                    y_ref = np.clip(100 - y_meas[k], 40, 50)
                else:
                    y_ref = 50
                u_pid[:, k] = pid_mnt.one_step(y_meas[k], y_ref)
            else:
                u_pid[:, k] = pid_ind.one_step(y_meas[k], 50)
                Kp_hist[k] = pid_ind.Kp_history[-1]
        else:
            u_pid[:, k] = u_pid[:, k-1]
            if k < 600:
                Kp_hist[k] = Kp_hist[k-1]
        x_real = Ad_pert @ x_real + Bd_pert @ u_pid[:, k]
        x_nom = Ad_nom @ x_nom + Bd_nom @ u_pid[:, k]

    return y_clean, u_pid, Kp_hist

# Tuning the controller

In [4]:
import optuna

def cost_iae(y_clean):
    # Integral absolut error
    iae = np.sum(np.abs(y_clean - 50))
    return iae

def cost_constraints(y_clean, u_pid, alpha, eta, beta1, beta2, nu1, nu2, nu3):
    # Compute Undershoot Penalty: Adds penalty when BIS < 50
    undershoot_penalty = np.sum(np.maximum(0, 45 - y_clean[:600]))
    # Compute Arrival Penalty: Adds penalty when BIS != 50 at time t_arrival
    arrival_penalty = np.abs(y_clean[599] - 50)
    # Compute Terminal Penalty: Adds penalty when BIS != 50 at time t_term
    terminal_penalty = np.abs(y_clean[-1] - 50)
    # Compute Interval 4 Penalty: Adds penalty when BIS > 52 or BIS < 48
    Pen4 = np.sum(np.maximum(0, 48 - y_clean[600:])) + np.sum(np.maximum(0, y_clean[600:] - 52))
    # Compute Interval 10 Penalty: Adds penalty when BIS > 55 or BIS < 45
    Pen10 = np.sum(np.maximum(0, 45 - y_clean[600:])) + np.sum(np.maximum(0, y_clean[600:] - 55))
    # Compute Interval 20 Penalty: Adds penalty when BIS > 60 or BIS < 40
    Pen20 = np.sum(np.maximum(0, 40 - y_clean[600:])) + np.sum(np.maximum(0, y_clean[600:] - 60))
    # Compute Control Penalty: Penalizes how much drugs are consumed
    PenC = u_pid.sum()
    
    # Weighted Cost Function
    cost = alpha * PenC + eta*undershoot_penalty + beta1*arrival_penalty + beta2*terminal_penalty + nu1 * Pen20 + nu2 * Pen10 + nu3 * Pen4
    return cost

t_sim, ts = 1800, 5

# tuning the induction phase
def pid_gains_ind(trial):
    Kp_i = trial.suggest_float("Kp_i", 1e-2, 1, step=1e-4)
    Ti_i = trial.suggest_int("Ti_i", 50, 600, step=5)
    Td_i = trial.suggest_int("Td_i", 10, 100, step=2)
    gamma_p_i = trial.suggest_float("gamma_p_i", 1e-7, 1e-3, log=True)

    cost = []
    for patient_index in range(100):
        y_clean, u_pid, _ = pid_A_sim(Kp_i, Ti_i, Td_i, gamma_p_i, 0, 0, 0, 600, ts, patient_index, model_type='uncertain', noise=True)
        # Compute combined cost function
        cost.append(
            cost_iae(y_clean) +
            cost_constraints(y_clean, u_pid, 1, 1e1, 1e2, 0, 0, 0, 0)
        )
    return 0.7 * np.mean(cost) + 0.3 * np.std(cost)
study_ind = optuna.create_study(direction="minimize")
study_ind.optimize(pid_gains_ind, n_trials=1000, n_jobs=-1)
Kp_i = study_ind.best_params["Kp_i"]
Ti_i = study_ind.best_params["Ti_i"]
Td_i = study_ind.best_params["Td_i"]
gamma_p_i = study_ind.best_params["gamma_p_i"]

# tuning the maintenance phase
def pid_gains_mnt(trial):
    Kp_m = trial.suggest_float("Kp_m", 1e-2, 1, step=1e-4)
    Ti_m = trial.suggest_int("Ti_m", 50, 600, step=5)
    Td_m = trial.suggest_int("Td_m", 10, 100, step=2)

    cost = []
    for patient_index in range(100):
        y_clean, u_pid, _ = pid_A_sim(Kp_i, Ti_i, Td_i, gamma_p_i, Kp_m, Ti_m, Td_m, t_sim, ts, patient_index, model_type='uncertain', noise=True)
        # Compute combined cost function
        cost.append(
            cost_iae(y_clean) +
            cost_constraints(y_clean, u_pid, 10, 0, 0, 1e2, 5e1, 2e1, 1e1)
        )
    return 0.7 * np.mean(cost) + 0.3 * np.std(cost)
study_mnt = optuna.create_study(direction="minimize")
study_mnt.optimize(pid_gains_mnt, n_trials=1000, n_jobs=-1)
Kp_m = study_mnt.best_params["Kp_m"]
Ti_m = study_mnt.best_params["Ti_m"]
Td_m = study_mnt.best_params["Td_m"]

  from .autonotebook import tqdm as notebook_tqdm
[I 2025-03-17 20:19:20,268] A new study created in memory with name: no-name-584964a0-a7d2-4b33-ad48-d830e1ac69d6
[I 2025-03-17 20:20:12,434] Trial 8 finished with value: 32458.741462827318 and parameters: {'Kp_i': 0.44020000000000004, 'Ti_i': 205, 'Td_i': 90, 'gamma_p_i': 4.2547553310326964e-07}. Best is trial 8 with value: 32458.741462827318.
[I 2025-03-17 20:20:12,860] Trial 14 finished with value: 35713.93683869397 and parameters: {'Kp_i': 0.43620000000000003, 'Ti_i': 215, 'Td_i': 70, 'gamma_p_i': 0.0005815339960895746}. Best is trial 8 with value: 32458.741462827318.
[I 2025-03-17 20:20:13,009] Trial 1 finished with value: 35175.727764917305 and parameters: {'Kp_i': 0.5384, 'Ti_i': 470, 'Td_i': 84, 'gamma_p_i': 7.544897353854291e-06}. Best is trial 8 with value: 32458.741462827318.
[I 2025-03-17 20:20:13,281] Trial 5 finished with value: 34556.97918024659 and parameters: {'Kp_i': 0.24580000000000002, 'Ti_i': 260, 'Td_i': 92, 'gamma

KeyboardInterrupt: 