# TPI Teor√≠a de Control: Simulaci√≥n de Controlador PD

**Alumno:** Mat√≠as Ezequiel Nu√±ez

**Materia:** Teor√≠a de Control (K4572)

Este notebook implementa la simulaci√≥n din√°mica del sistema de **Rate Limiting** modelado como un lazo de control cerrado.
A diferencia del enfoque cl√°sico de Token Bucket (PI), aqu√≠ implementamos:

1.  **Controlador PD:** $G_c(s) = K_p + K_d \cdot s$.
2.  **Actuador con Memoria:** El mecanismo de asignaci√≥n de recursos (Bucket/Autoscaler) act√∫a como un integrador puro en el lazo directo.
3.  **Realimentaci√≥n Unitaria:** $H(s) = 1$.

El objetivo es validar que el sistema es estable y presenta error estacionario nulo ($e_{ss}=0$) gracias a la naturaleza "Tipo 1" del lazo completo, a pesar de que el controlador es PD.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from ipywidgets import interact, interactive, FloatSlider, IntSlider, Dropdown, VBox, HBox, Text, Output, Layout
from IPython.display import display, clear_output

# Configuraci√≥n de Matplotlib para Google Colab
%matplotlib inline
plt.style.use('default')

In [None]:
class SimuladorRateLimiter:
    def __init__(self):
        self.dt = 0.05
        self.sim_time = 30.0
        self.t = np.arange(0, self.sim_time, self.dt)
        self.n = len(self.t)

        # Controlador
        self.Kp = 0.5
        self.Kd = 0.2

        # Setpoint
        self.setpoint = 100.0

        # Perturbaciones
        self.tiempo_inicio_pert1 = 5.0
        self.duracion_pert1 = 3.0
        self.intensidad_pert1 = 150.0

        self.tiempo_inicio_pert2 = 15.0
        self.duracion_pert2 = 10.0
        self.intensidad_pert2 = 400.0

        # Escenario
        self.escenario = 'Rafagas'

        # Par√°metros del actuador
        self.capacidad_max = 500.0
        self.capacidad_min = 0.0
        self.actuator_gain = 5.0
        
        # Anti-windup
        self.anti_windup = 'freeze'
        
        # Derivative filter
        self.tau_d = 0.1
        
        # Ruido y retardo (por defecto desactivados)
        self.measurement_noise_std = 0.0
        self.measurement_delay = 0.0
        
        # Cuantizaci√≥n
        self.quantization_step = 1.0

    def generar_escenario(self):
        R = np.ones(self.n) * self.setpoint
        D = np.zeros(self.n)
        if self.escenario == 'Rafagas':
            idx_inicio1 = int(self.tiempo_inicio_pert1 / self.dt)
            idx_fin1 = int((self.tiempo_inicio_pert1 + self.duracion_pert1) / self.dt)
            idx_inicio1 = max(0, min(idx_inicio1, self.n - 1))
            idx_fin1 = max(idx_inicio1, min(idx_fin1, self.n))
            D[idx_inicio1:idx_fin1] = self.intensidad_pert1

            idx_inicio2 = int(self.tiempo_inicio_pert2 / self.dt)
            idx_fin2 = int((self.tiempo_inicio_pert2 + self.duracion_pert2) / self.dt)
            idx_inicio2 = max(0, min(idx_inicio2, self.n - 1))
            idx_fin2 = max(idx_inicio2, min(idx_fin2, self.n))
            D[idx_inicio2:idx_fin2] = self.intensidad_pert2

        elif self.escenario == 'DoS':
            idx_inicio = int(self.tiempo_inicio_pert1 / self.dt)
            idx_fin = int((self.tiempo_inicio_pert1 + self.duracion_pert1) / self.dt)
            idx_inicio = max(0, min(idx_inicio, self.n - 1))
            idx_fin = max(idx_inicio, min(idx_fin, self.n))
            D[idx_inicio:idx_fin] = self.intensidad_pert1

        return R, D

    def ejecutar_simulacion(self, return_metrics=True):
        R, D = self.generar_escenario()

        Y = np.zeros(self.n)
        e = np.zeros(self.n)
        u = np.zeros(self.n)
        capacidad = np.zeros(self.n)
        measured_Y = np.zeros(self.n)

        # variables internas
        capacidad_actual = 0.0
        y_prev = 0.0
        e_prev = 0.0

        # derivative filter state
        d_filtered = 0.0
        d_prev = 0.0

        # measurement delay buffer
        delay_samples = int(np.round(self.measurement_delay / self.dt)) if self.measurement_delay > 0 else 0
        meas_buffer = [0.0] * (delay_samples + 1)

        # par√°metros planta
        tau = 0.5

        for i in range(1, self.n):
            # medici√≥n con delay + ruido
            meas_buffer.pop(0)
            meas_buffer.append(y_prev + np.random.randn() * self.measurement_noise_std)
            y_measured = meas_buffer[0]

            # error
            e[i] = R[i] - y_measured

            # derivada filtrada
            derivative_raw = (e[i] - e_prev) / self.dt
            alpha = self.tau_d / (self.tau_d + self.dt) if self.tau_d > 0 else 0.0
            d_filtered = alpha * d_prev + (1 - alpha) * derivative_raw

            u[i] = (self.Kp * e[i]) + (self.Kd * d_filtered)

            # ACTUADOR: integraci√≥n con ganancia y anti-windup + cuantizaci√≥n + saturaci√≥n
            delta_cap = u[i] * self.dt * self.actuator_gain

            # anti-windup policy
            prospective_cap = capacidad_actual + delta_cap
            prospective_cap = max(self.capacidad_min, min(self.capacidad_max, prospective_cap))
            if self.anti_windup == 'freeze':
                if (capacidad_actual <= self.capacidad_min and delta_cap < 0) or \
                   (capacidad_actual >= self.capacidad_max and delta_cap > 0):
                    pass
                else:
                    capacidad_actual = prospective_cap
            elif self.anti_windup == 'backcalc':
                if prospective_cap != capacidad_actual + delta_cap:
                    back_gain = 0.5
                    capacidad_actual += back_gain * delta_cap
                    capacidad_actual = max(self.capacidad_min, min(self.capacidad_max, capacidad_actual))
                else:
                    capacidad_actual = prospective_cap
            else:
                capacidad_actual = prospective_cap

            # cuantizaci√≥n
            if self.quantization_step and self.quantization_step > 0:
                capacidad_actual = np.round(capacidad_actual / self.quantization_step) * self.quantization_step

            capacidad[i] = capacidad_actual

            # entrada efectiva a la planta
            entrada_planta = capacidad_actual - D[i] * 0.2

            # lag primer orden
            Y[i] = (self.dt * entrada_planta + tau * y_prev) / (tau + self.dt)

            # almacenar medici√≥n
            measured_Y[i] = y_measured

            # actualizar previos
            y_prev = Y[i]
            e_prev = e[i]
            d_prev = d_filtered

        # m√©tricas b√°sicas
        metrics = {}
        if return_metrics:
            tail = int(self.n * 0.1)
            ess = np.mean(R[-tail:] - Y[-tail:])
            overshoot = (np.max(Y) - np.max(R)) / np.max(R) * 100.0 if np.max(R) > 0 else 0.0
            max_u = np.max(np.abs(u))
            tol = 0.02 * R[0]
            settling_time = None
            steady_band = np.abs(R - Y) <= tol
            for idx in range(int(self.n*0.1), self.n):
                if np.all(steady_band[idx:]):
                    settling_time = self.t[idx]
                    break
            metrics = {
                'ess': float(ess),
                'overshoot_pct': float(overshoot),
                'max_control_effort': float(max_u),
                'settling_time_s': float(settling_time) if settling_time is not None else None
            }

        return self.t, R, Y, u, e, D, capacidad, measured_Y, metrics

## Laboratorio Interactivo

Utilice los controles a continuaci√≥n para modificar los par√°metros del controlador en tiempo real:

* **Sliders Kp / Kd:** Ajuste las ganancias Proporcional y Derivativa. Observe c√≥mo un $K_d$ mayor amortigua las oscilaciones.
* **Radio Buttons:** Cambie entre escenarios:
    * *R√°fagas:* Eval√∫a la respuesta transitoria.
    * *Ataque DoS:* Eval√∫a la estabilidad y el error en estado estacionario.

In [None]:
# Crear simulador
sim = SimuladorRateLimiter()

# Crear output widget para las gr√°ficas
output = Output()

# Funci√≥n de ploteo
def plot_simulacion():
    """Genera las gr√°ficas con los par√°metros actuales del simulador"""
    with output:
        clear_output(wait=True)
        
        t, R, Y, u, e, D, capacidad, measured_Y, _ = sim.ejecutar_simulacion(return_metrics=False)
        
        fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(14, 9), sharex=True)
        
        # Gr√°fica 1: Respuesta del sistema
        ax1.plot(t, R, 'k--', label=r'$\theta_i$ (Setpoint / R)', linewidth=1.5)
        ax1.plot(t, Y, 'b-', label='Respuesta del Sistema (Y)', linewidth=2)
        ax1.plot(t, D, 'r:', label='Perturbaci√≥n (D)', alpha=0.6)
        ax1.set_title('Respuesta Temporal del Sistema (Rate Limiter Controlado)', fontsize=12, fontweight='bold')
        ax1.set_ylabel('Throughput (req/s)')
        ax1.legend(loc='upper right', fontsize=9)
        ax1.grid(True, linestyle=':', alpha=0.6)
        
        # Gr√°fica 2: Error
        ax2.plot(t, e, 'g-', label='Error (e)', linewidth=1.5)
        ax2.set_ylabel('Error $e(t)$')
        ax2.legend(loc='upper right', fontsize=9)
        ax2.grid(True, linestyle=':', alpha=0.6)
        
        # Gr√°fica 3: Se√±al de control
        ax3.plot(t, u, 'm-', label='Salida Controlador (u)', linewidth=1.5)
        ax3.set_ylabel('Control $u(t)$')
        ax3.set_xlabel('Tiempo (s)')
        ax3.legend(loc='upper right', fontsize=9)
        ax3.grid(True, linestyle=':', alpha=0.6)
        
        plt.tight_layout()
        plt.show()

# Funci√≥n que actualiza par√°metros y redibuja
def actualizar_simulacion(Kp, Kd, Setpoint, Escenario, 
                          T_inicio_1, Duracion_1, Intensidad_1,
                          T_inicio_2, Duracion_2, Intensidad_2):
    """Callback que se ejecuta cuando cambia cualquier par√°metro"""
    sim.Kp = Kp
    sim.Kd = Kd
    sim.setpoint = Setpoint
    sim.escenario = Escenario
    
    # R√°faga 1
    sim.tiempo_inicio_pert1 = T_inicio_1
    sim.duracion_pert1 = Duracion_1
    sim.intensidad_pert1 = Intensidad_1
    
    # R√°faga 2 (solo en modo Rafagas)
    sim.tiempo_inicio_pert2 = T_inicio_2
    sim.duracion_pert2 = Duracion_2
    sim.intensidad_pert2 = Intensidad_2
    
    plot_simulacion()

# Crear widgets de control
style = {'description_width': '140px'}
layout = Layout(width='400px')

w_kp = FloatSlider(value=0.5, min=0.0, max=5.0, step=0.1, 
                   description='Kp (Proporcional):', style=style, layout=layout)
w_kd = FloatSlider(value=0.2, min=0.0, max=5.0, step=0.1, 
                   description='Kd (Derivativo):', style=style, layout=layout)
w_setpoint = FloatSlider(value=100.0, min=50.0, max=300.0, step=10.0,
                         description='Setpoint (req/s):', style=style, layout=layout)
w_escenario = Dropdown(options=['Rafagas', 'DoS'], value='Rafagas',
                       description='Modo:', style=style, layout=Layout(width='400px'))

# R√°faga 1
w_t1 = FloatSlider(value=5.0, min=0.0, max=25.0, step=1.0,
                   description='R√°faga 1 - T.Inicio (s):', style=style, layout=layout)
w_d1 = FloatSlider(value=3.0, min=1.0, max=15.0, step=1.0,
                   description='R√°faga 1 - Duraci√≥n (s):', style=style, layout=layout)
w_i1 = FloatSlider(value=150.0, min=50.0, max=500.0, step=10.0,
                   description='R√°faga 1 - Intensidad:', style=style, layout=layout)

# R√°faga 2
w_t2 = FloatSlider(value=15.0, min=0.0, max=25.0, step=1.0,
                   description='R√°faga 2 - T.Inicio (s):', style=style, layout=layout)
w_d2 = FloatSlider(value=10.0, min=1.0, max=15.0, step=1.0,
                   description='R√°faga 2 - Duraci√≥n (s):', style=style, layout=layout)
w_i2 = FloatSlider(value=400.0, min=50.0, max=500.0, step=10.0,
                   description='R√°faga 2 - Intensidad:', style=style, layout=layout)

# Crear widget interactivo
ui = interactive(actualizar_simulacion,
                 Kp=w_kp, Kd=w_kd, Setpoint=w_setpoint, Escenario=w_escenario,
                 T_inicio_1=w_t1, Duracion_1=w_d1, Intensidad_1=w_i1,
                 T_inicio_2=w_t2, Duracion_2=w_d2, Intensidad_2=w_i2)

# Organizar controles en secciones
from IPython.display import HTML
print("‚ïê" * 80)
print("SIMULADOR INTERACTIVO - CONTROLADOR PD PARA RATE LIMITER")
print("‚ïê" * 80)
print("\nüìä Instrucciones:")
print("  ‚Ä¢ Ajuste los par√°metros del controlador y las perturbaciones usando los sliders")
print("  ‚Ä¢ Modo 'Rafagas': Dos perturbaciones independientes configurables")
print("  ‚Ä¢ Modo 'DoS': Una sola perturbaci√≥n sostenida (usa par√°metros de R√°faga 1)")
print("\n" + "‚îÄ" * 80 + "\n")

# Mostrar controles
display(ui)

### Conclusiones de la Simulaci√≥n

**1. Escenario R√°fagas (Transitorio):**
Se observa c√≥mo el sistema reacciona ante picos de tr√°fico configurables:
- Al aumentar **Kp**, el sistema reacciona m√°s r√°pido pero puede presentar sobrepicos (overshoot).
- Al aumentar **Kd**, se introduce amortiguamiento, suavizando la respuesta y reduciendo oscilaciones, validando la acci√≥n derivativa del controlador PD.
- Las dos r√°fagas son completamente independientes y configurables en tiempo, duraci√≥n e intensidad.

**2. Escenario Ataque DoS (Estado Estacionario):**
Ante una perturbaci√≥n sostenida con duraci√≥n limitada:
- La salida $Y(t)$ regresa al valor de referencia (Setpoint) despu√©s de la perturbaci√≥n.
- El error $e(t)$ converge a **CERO** en estado estacionario.
- **Justificaci√≥n Te√≥rica:** Aunque el controlador es PD (sin t√©rmino integral expl√≠cito), el actuador posee memoria (acumulaci√≥n de recursos/tokens). Esto a√±ade un polo en el origen al lazo abierto, convirtiendo al sistema en **Tipo 1**, lo que garantiza $e_{ss}=0$ ante entradas escal√≥n.