# **TCLab Closed-Loop PID with FeedForward**

In [963]:
import time
import threading
import numpy as np
import package_LAB
from importlib import reload
package_LAB = reload(package_LAB)

from package_LAB import LL_RT, PID_RT, IMCTuning, install_and_import
from package_DBR import SelectPath_RT, Delay_RT, FO_RT

In [964]:
#plotly imports
install_and_import('plotly')
install_and_import('plotly')
install_and_import('ipywidgets')
from plotly.subplots import make_subplots
import plotly.graph_objs as go
from ipywidgets import interactive, VBox, IntRangeSlider, IntSlider, Checkbox, FloatSlider, Label, Layout, HBox
import ipywidgets as widgets


## TCLab parameters

In [965]:
#DV 
Kp_ODV_SOPDT = 0.2951290424136788
T1_ODV_SOPDT = 182.2549613489765
T2_ODV_SOPDT = 13.184430234847984
theta_ODV_SOPDT = 28.999891911961512

#MV
Kp_OMV_SOPDT = 0.30788564834253684
T1_OMV_SOPDT = 183.81942938046797
T2_OMV_SOPDT = 3.2920224028341535e-12
theta_OMV_SOPDT = 20.015407110302775

#Operating points 
DV0 = 50 
MV0 = 50
PV0 = 49.3

# Set maximum and minimum MV values
MVmin = 0
MVmax = 100

# Coefficients
alpha = 0.7
gamma = 0.5

#IMC Tuning
Kc, TI, TD = IMCTuning(Kp_OMV_SOPDT, T1_OMV_SOPDT, T2_OMV_SOPDT, theta_OMV_SOPDT, gamma, model="SOPDT")
print(f"Kc: {Kc}, TI: {TI}, TD: {TD}")

Kc: 5.33426261068635, TI: 183.81942938047126, TD: 3.2920224028340946e-12


## Arrays, Paths and Variables Initialization

In [966]:
#default scenario
SPPath = {0: PV0}
ManPath = {0: False}
MVManPath = {0: MV0}
DVPath = {0: DV0}
FF = True
ManFF = False # Not needed

FFBuffer = True
ManFFBuffer = True

#Time parameters
TSim = 3000
Ts = 1    
N = int(TSim/Ts) + 1  
TimeStep = Ts # Time step for the simulation



## PID and plot

### Figure

In [967]:
fig = go.FigureWidget(make_subplots(rows=4, cols=1, specs = [[{}], [{}], [{}], [{}]], vertical_spacing = 0.15, row_heights=[0.1, 0.4, 0.4, 0.1], subplot_titles=("Manual Mode", "MV and Components", "PV, SP and E", "Perturbation DV")))
def initialize():
	fig.add_trace(go.Scatter(name="SP"), row=3, col=1)
	fig.add_trace(go.Scatter(name="PV"), row=3, col=1)
	fig.add_trace(go.Scatter(name="E", line=dict(dash='dash')), row=3, col=1)
	fig.add_trace(go.Scatter(name="MV"), row=2, col=1)
	fig.add_trace(go.Scatter(name="MVp", line=dict(dash='dash')), row=2, col=1)
	fig.add_trace(go.Scatter(name="MVi", line=dict(dash='dash')), row=2, col=1)
	fig.add_trace(go.Scatter(name="MVd", line=dict(dash='dash')), row=2, col=1)
	fig.add_trace(go.Scatter(name="Man"), row=1, col=1)
	fig.add_trace(go.Scatter(name="MVMan"), row=1, col=1)
	fig.add_trace(go.Scatter(name="DV"), row=4, col=1)
	# Update layout
	fig['layout'].update(height=800, width=800)
	fig['layout']['xaxis1'].update(title='Time (s)')
	fig['layout']['yaxis1'].update(title='(°C)')
	fig['layout']['xaxis2'].update(title='Time (s)')
	fig['layout']['yaxis2'].update(title='MV (%)')
	fig['layout']['xaxis3'].update(title='Time (s)')
	fig['layout']['yaxis3'].update(title='(°C)')
	fig['layout']['xaxis4'].update(title='Time (s)')
	fig['layout']['yaxis4'].update(title='DV (%)')
 
 
def reinitialize():    
	global t, SP, MAN, MV_MAN, DV, PV, MVFF, MV, MVp, MVi, MVd, E, PV_p, PV_d, MVFF_Delay, MVFF_LL1, MV_Delay_P, MV_FO_P, MV_Delay_D, MV_FO_D

	t = []
	SP = []
	PV = []
	MAN = []
	MV_MAN = []
	DV = []
	MVFF = []
	MV = []
	MVp = []
	MVi = []
	MVd = []
	E = []
	PV_p = []
	PV_d = []

	MVFF_Delay = []
	MVFF_LL1 = []
	MV_Delay_P = []
	MV_FO_P = []
	MV_Delay_D = []
	MV_FO_D = []
 
initialize()

### Widget Functions

In [968]:

def RunExp(Exp):
    global should_continue    
    global KcBuffer, TIBuffer, TDBuffer, alphaBuffer, ManPathBuffer, MVmanPathBuffer, FFBuffer, SPPathBuffer, DVPathBuffer, ManFFBuffer
    
    Kc = KcBuffer
    TI = TIBuffer
    TD = TDBuffer
    alpha = alphaBuffer
    ManPath = ManPathBuffer
    MVManPath = MVmanPathBuffer
    FF = FFBuffer
    SPPath = SPPathBuffer
    DVPath = DVPathBuffer
    ManFF = ManFFBuffer
    
    update_frequency = 50 # Update the plot every 50 iterations
    data_chunk = {'t': [], 'SP': [], 'PV': [], 'E': [], 'MV': [], 'MVp': [], 'MVi': [], 'MVd': [], 'MAN': [], 'MV_MAN' : [], 'DV': []}
    
    if Exp:
        reinitialize()
        for i in range(0, N):
            
            if not should_continue:
                break  # Exit the loop if should_continue is False
            
            t.append(i * Ts)
            SelectPath_RT(SPPath, t, SP)
            SelectPath_RT(ManPath, t, MAN)
            SelectPath_RT(MVManPath, t, MV_MAN)
            SelectPath_RT(DVPath, t, DV)
            
            # FeedForward
            Delay_RT(DV - DV0*np.ones_like(DV), max(theta_ODV_SOPDT-theta_OMV_SOPDT, 0), Ts, MVFF_Delay)
            LL_RT(MVFF_Delay, -Kp_ODV_SOPDT/Kp_OMV_SOPDT, T1_OMV_SOPDT, T1_ODV_SOPDT, Ts, MVFF_LL1)
            if FF == True:
                LL_RT(MVFF_LL1, 1, T2_OMV_SOPDT, T2_ODV_SOPDT, Ts, MVFF)
            else:
                LL_RT(MVFF_LL1, 0, T2_OMV_SOPDT, T2_ODV_SOPDT, Ts, MVFF) # Set MVFF to 0 if FF is disabled
            
            # PID
            PID_RT(SP, PV, MAN, MV_MAN, MVFF, Kc, TI, TD, alpha, Ts, MVmin, MVmax, MV, MVp, MVi, MVd, E, ManFF, PV0)
            
            # Process
            Delay_RT(MV, theta_OMV_SOPDT, Ts, MV_Delay_P, MV0)
            FO_RT(MV_Delay_P, Kp_OMV_SOPDT, T1_OMV_SOPDT, Ts, MV_FO_P)
            FO_RT(MV_FO_P, 1, T2_OMV_SOPDT, Ts, PV_p)
            
            # Disturbance
            Delay_RT(DV - DV0*np.ones_like(DV), theta_ODV_SOPDT, Ts, MV_Delay_D)
            FO_RT(MV_Delay_D, Kp_ODV_SOPDT, T1_ODV_SOPDT, Ts, MV_FO_D)
            FO_RT(MV_FO_D, 1, T2_ODV_SOPDT, Ts, PV_d)
            
            PV.append(PV_p[-1] + PV_d[-1] + PV0 - Kp_OMV_SOPDT*MV0)
            
            data_chunk['t'].append(t[-1])
            data_chunk['SP'].append(SP[-1])
            data_chunk['PV'].append(PV[-1])
            data_chunk['E'].append(E[-1])
            data_chunk['MV'].append(MV[-1])
            data_chunk['MVp'].append(MVp[-1])
            data_chunk['MVi'].append(MVi[-1])
            data_chunk['MVd'].append(MVd[-1])
            data_chunk['MAN'].append(MAN[-1])
            data_chunk['MV_MAN'].append(MV_MAN[-1])
            data_chunk['DV'].append(DV[-1])
            

            if i % update_frequency == 0 and TimeStep == 0 :
                if i == 0:
                    fig.data[0].x, fig.data[0].y = t, SP
                    fig.data[1].x, fig.data[1].y = t, PV
                    fig.data[2].x, fig.data[2].y = t, E
                    fig.data[3].x, fig.data[3].y = t, MV
                    fig.data[4].x, fig.data[4].y = t, MVp
                    fig.data[5].x, fig.data[5].y = t, MVi
                    fig.data[6].x, fig.data[6].y = t, MVd
                    fig.data[7].x, fig.data[7].y = t, MAN
                    fig.data[8].x, fig.data[8].y = t, MV_MAN
                    fig.data[9].x, fig.data[9].y = t, DV    
                else : 
                    with fig.batch_update():
                        for j, key in enumerate(data_chunk.keys()):
                            if key != 't':  # Prevent trying to extend x-axis data onto itself
                                fig.data[j-1]['x'] = fig.data[j-1]['x'] + tuple(data_chunk['t'])
                                fig.data[j-1]['y'] = fig.data[j-1]['y'] + tuple(data_chunk[key])
                #Reset data_chunk for the next batch of updates
                data_chunk = {key: [] for key in data_chunk}
                
            if TimeStep > 0: 
                # wait to the next loop
                elapsed = time.time() - last_time
                time.sleep(max(0, TimeStep - elapsed))
                last_time = time.time()
                
                with fig.batch_update():                
                    fig.data[0].x, fig.data[0].y = t, SP
                    fig.data[1].x, fig.data[1].y = t, PV
                    fig.data[2].x, fig.data[2].y = t, E
                    fig.data[3].x, fig.data[3].y = t, MV
                    fig.data[4].x, fig.data[4].y = t, MVp
                    fig.data[5].x, fig.data[5].y = t, MVi
                    fig.data[6].x, fig.data[6].y = t, MVd
                    fig.data[7].x, fig.data[7].y = t, MAN
                    fig.data[8].x, fig.data[8].y = t, MV_MAN
                    fig.data[9].x, fig.data[9].y = t, DV    
                
def Update_gamma(gamma):
    global KcBuffer, TIBuffer, TDBuffer
    KcBuffer, TIBuffer, TDBuffer = IMCTuning(Kp_OMV_SOPDT, T1_OMV_SOPDT, T2_OMV_SOPDT, theta_OMV_SOPDT, gamma, model="SOPDT")
  
def stop_experiment_button_clicked(b):
    global should_continue
    should_continue = False  # Set the flag to False to stop the loop

def Update_alpha(alphaP):
    global alphaBuffer
    alphaBuffer = alphaP
    
def Update_Manual(manual_time, MVManual, Man):
    global ManPathBuffer
    global MVmanPathBuffer
    
    ManPathBuffer = {0: False, Man: True}
    MVmanPathBuffer = {0: 0, manual_time[0]: MVManual[0], manual_time[1]: MVManual[1]}


def Update_Perturbation(perturbation, time_perturbation):
    global DVPathBuffer
    DVPathBuffer = {0: DV0, time_perturbation[0]: perturbation[0], time_perturbation[1]: perturbation[1]}
    
def Update_SetPoint(setpoint, time_setpoint):
    global SPPathBuffer
    SPPathBuffer = {0: PV0, time_setpoint[0]: setpoint[0], time_setpoint[1]: setpoint[1]}

def Update_TimeStep(_timeStep):
    global TimeStep
    TimeStep = _timeStep
    
def run_experiment_threaded():
    # Wrapper function to run the experiment in a separate thread
    RunExp(Exp=True)

def run_experiment_button_clicked(b):
    global should_continue
    should_continue = True  # Ensure the flag is reset to True when starting
    
    if not fig.data:
        initialize()
        reinitialize()
    
    # Start the experiment in a separate thread to keep the UI responsive
    experiment_thread = threading.Thread(target=run_experiment_threaded)
    experiment_thread.start()


def stop_experiment_button_clicked(b):
    global should_continue
    should_continue = False  # Set the flag to False to stop the loop
    
def clear_graph_button_clicked(b):
    # Clear the graph by resetting its data
    fig.data = []
    
def Update_FF(change):
    global FFBuffer
    FFBuffer = change.new
    
def Update_ManFF(change):
    global ManFFBuffer
    ManFFBuffer = change.new

### Widgets

In [969]:
run_exp_button = widgets.Button(
    description='Run Experiment',
    button_style='info', 
    tooltip='Click to run the experiment',
    icon='play' 
)

stop_exp_button = widgets.Button(
    description='Stop Experiment',
    button_style='danger',
    tooltip='Click to stop the experiment',
    icon='stop'
)

clear_graph_button = widgets.Button(
    description='Clear Graph',
    button_style='warning',  # Adjust the button style as needed
    tooltip='Click to clear the graph',
    icon='eraser'  # FontAwesome icon name
)

run_exp_button.on_click(run_experiment_button_clicked)
stop_exp_button.on_click(stop_experiment_button_clicked)
clear_graph_button.on_click(clear_graph_button_clicked)


slider_style = {'description_width': 'initial'}
manual_time_slider = IntRangeSlider(
    min=0, max=TSim, step=1, value=[0, 500], 
    description="Manual Time", 
    layout=Layout(width='500px')
)
manual_time_slider.style.description_width = slider_style['description_width']

MVManual_Time_slider = IntSlider(
    min=0, max=TSim, step=1, value=500,
    description="MV Manual Time (to activate manual mode)",
    layout=Layout(width='500px')
)
MVManual_Time_slider.style.description_width = slider_style['description_width']

MVManual_slider = IntRangeSlider(
    min=0, max=100, step=1, value=[MV0+15, MV0+15],  # Example value
    description="MV Manual Value", 
    layout=Layout(width='500px')
)
MVManual_slider.style.description_width = slider_style['description_width']

# Apply the same for other sliders
PerturbationSlider = IntRangeSlider(
    min=0, max=100, step=1, value=[DV0, DV0+10],  # Example values
    description="Perturbation Value", 
    layout=Layout(width='500px')
)
PerturbationSlider.style.description_width = slider_style['description_width']

time_perturbation = IntRangeSlider(
    min=0, max=TSim, step=10, value=[0, 1600],
    description="Perturbation Time",
    layout=Layout(width='500px')
)
time_perturbation.style.description_width = slider_style['description_width']

SetPointSlider = IntRangeSlider(
    min=0, max=100, step=1, value=[PV0+5, PV0+10],  # Example values
    description="SetPoint Value",
    layout=Layout(width='500px')
)
SetPointSlider.style.description_width = slider_style['description_width']

time_setpoint = IntRangeSlider(
    min=0, max=TSim, step=10, value=[0, 1000],
    description="SetPoint Time",
    layout=Layout(width='500px')
)
time_setpoint.style.description_width = slider_style['description_width']

time_slider = FloatSlider(
    min=0, max=4, step=0.5, value=0,
    description="Simulation Time (per loop)",
    layout=Layout(width='500px')
)
time_slider.style.description_width = slider_style['description_width']

_GammaSlider = FloatSlider(
    min=0.2, max=0.9, step=0.02, value=0.5,
    description="Gamma",
    layout=Layout(width='500px')
)
_GammaSlider.style.description_width = slider_style['description_width']

_AlphaSlider = FloatSlider(
    min=0.2, max=0.9, step=0.02, value=0.7,
    description="Alpha",
    layout=Layout(width='500px')
)
_AlphaSlider.style.description_width = slider_style['description_width']


FFCheckBox = Checkbox(value=False, description='FeedForward')
ManFFCheckBox = Checkbox(value=False, description='Manual FeedForward')
FFCheckBox.observe(Update_FF, names='value')
ManFFCheckBox.observe(Update_ManFF, names='value')

# Create interactive widget
GammaSlider = interactive(Update_gamma, gamma=_GammaSlider)
AlphaSlider = interactive(Update_alpha, alphaP=_AlphaSlider)
Manual = interactive(Update_Manual, manual_time=manual_time_slider, MVManual=MVManual_slider, Man=MVManual_Time_slider)
Perturbation = interactive(Update_Perturbation, perturbation=PerturbationSlider, time_perturbation=time_perturbation, description='Perturbation')
SetPoint = interactive(Update_SetPoint, setpoint=SetPointSlider, time_setpoint=time_setpoint, description='SetPoint')
timeStep = interactive(Update_TimeStep, _timeStep=time_slider, description='Time Step')


# Adding labels for sections
pid_parameters_title = Label(value='**PID Parameters**', layout={'width': '500px'})
pid_parameters_title.style.font_weight = 'bold'
pid_parameters_title.layout.justify_content = 'center'

simulation_title = Label(value='**Simulation**', layout={'width': '500px'})
simulation_title.style.font_weight = 'bold'
simulation_title.layout.justify_content = 'center'

container = VBox([fig,
    pid_parameters_title,
    GammaSlider,
    AlphaSlider,
    Manual,
    ManFFCheckBox,
    Perturbation,
    FFCheckBox,
    SetPoint,
    simulation_title,
    timeStep,
    run_exp_button,
    stop_exp_button,
    clear_graph_button
], layout=Layout(align_items='center'))

container

VBox(children=(FigureWidget({
    'data': [{'name': 'SP',
              'type': 'scatter',
              'uid'…