[← Control Systems as Dynamical Systems](../../../getting_started/theory_to_python/control_systems_as_dynamical_systems.rst)

# Example: Car Cruise Control

Suppose we want to build a robot. If we were hobbyists, we might want a weekend project; if we were engineers, we might need an extra arm on the assembly line; if we were academics, we might dream of a quadruped with a cannon-head to threaten the government for grant money.

Of course, we must begin somewhere. In this notebook, we take a classic example of a feedback system -- a car with cruise control -- and use ``pykal`` to take it from a block diagram to a composition of dynamical systems.

## System Overview
 We consider here a gross simplification: our car will only ever drive on a perfectly flat plane with minimal drag.

We are given an acceleromoter (which will give us a measurement of our current speed), a button on the steering wheel (which will enable us to modify the cruise control speed), and the car's CPU (i.e. we can implement any algorithm we want).

## Block Diagram
 Recall the following block diagram for a simple feedback system:

<img src="../../../_static/tutorial/theory_to_python/feedback_system_trad.svg"
    width="800">

where $r$ is the **setpoint** (or **reference point**),
$u$ is the **control input**,
$x$ is the (hidden) **state** of the plant,
$\hat{x}$ is the **state estimate**,
and $e$ is the **error** term.

In the case of our car cruise control system, we can leverage this diagram with a simple relabling:

<img src="../../../_static/tutorial/theory_to_python/cruise_control_as_a_feedback_system.svg"
    width="800">

where the **PID** is a proportional-integrative-differential control algorithm, **KF** is a Kalman filter, and the **car** is, well, the car.
:::{note} If you were to ask five control theorists to design a block diagram, you would get six different diagrams. Don't fret the particulars; the diagram is simply a tool to organize our thinking.

### Discrete-time Dynamical Systems


:::{warning}
This section assumes you have already gone through the [Dynamical System](./dynamical_system.ipynb) notebook. If you have not, please do so now.
:::

We now proceed with casting the diagram blocks defined above as **dynamical system** blocks:

<img src="../../../_static/tutorial/theory_to_python/cruise_control_as_a_composition_of_dynamical_systems.svg"
    width="800">

We discuss each block in detail in the following sections.

:::{note} Although the signals between blocks have remained, for the most part, unchanged, the **summing junction** and **inverter** have dissapeared. Indeed, they've been absorbed into the PID block, as our [implementation of the algorithm](../../algorithm_library/pid_pykal.ipynb) computes the error term internally.
:::

### Block 1: Setpoint Generator

<div style="text-align: center;">
<img src="../../../_static/tutorial/theory_to_python/block_1_setpoint_gen.svg"
    width="400">
</div>

The setpoint generator maintains the cruise control speed reference based on the driver's button inputs.


In [None]:
# ---- SETPOINT BLOCK ---------------------

#### ARROWS IN ####
bk = int  # button input; bk in {-1, 0, 1} (decrement, neutral, increment)

#### PARAMETERS ############
sk = int
bk = int  # Current setpoint state (cruise control setpoint in mph)

#### ARROWS OUT ####
rk = float

# ---- $(f,h)$-representation --------------

from pykal import DynamicalSystem


def setpoint_f(sk: float, bk: int) -> float:
    """Increment or decrement setpoint based on button input."""
    return sk + bk


def setpoint_h(
    sk: float,
) -> float:
    """Return the current setpoint speed."""
    rk = sk
    return rk


setpoint_block = DynamicalSystem(f=setpoint_f, h=setpoint_h)

 To get a feel for our setpoint dynamical system, we simulate the following scenario: we press up on the button once a second for 30 seconds.

In [None]:
import numpy as np

# ---- SETPOINT GENERATOR BLOCK -------------------

#### ARROWS IN #########################
bk = 1

#### PARAMETERS #########################
sk = 20
bk = bk

#### ARROWS OUT #########################
rk = None
# ---- -------------------

### SIMULATION VARIABLES ############
dt = 1
sim_time = np.arange(0, 30, dt)
rk_hist = []

### SIMULATION ###################################################
for tk in sim_time:
    sk_next, rk = setpoint_block.step(params={"sk": sk, "bk": bk})
    sk = sk_next

    rk_hist.append(rk)

In [None]:

# plotting
from matplotlib import pyplot as plt

plt.plot(sim_time, rk_hist)
plt.xlabel("time (seconds)")
plt.ylabel("rk (setpoint speed)")
plt.title("Setpoint Speed Over Time")
plt.grid(True, alpha=0.3)
plt.show()

#### Simulating Different Scenarios


We are limited only by our creativity. Here, we simulate the following scenario:

"Our initial setpoint speed is **20 mph**. After **five seconds**, we feel bold and hold "up" on the cruise control button. Since we are holding the button, the button does not increment the speed setpoint once per second but instead at a rate of **three times a second** (many buttons have a such feature).

Out cruise control maxes out at **80 mph**. Once we reach our maximum setpoint speed, we maintain it for **five seconds** until we wiener out -- at which point, we hold "down" on the cruise control button, which decrements **three times a second** until our setpoint is back at **20 mph.**"

We can model this scenario easily (note that we are still focusing on the setpoint speed, not the actual speed of the car; that will come later).

In [None]:

# ---- SETPOINT GENERATOR BLOCK -------------------

#### ARROWS IN #########################
bk = 1

#### PARAMETERS #########################
sk = 20
bk = bk

#### ARROWS OUT #########################
rk = None
# --------------------------------------

### SIMULATION VARIABLES ##############################
# these are old
dt = 1
sim_time = np.arange(0, 60, dt)
rk_hist = []

# these are new
button_mode = "neutral"  # neutral | ramp_up | hold_max | ramp_down | done
button_rate = 3
max_rk = 80
final_rk = 20
seconds_at_which_we_ramp_up = 5
seconds_to_spend_at_max_rk = 5
seconds_passed_after_hitting_max_rk = 0

### SIMULATION ###################################################
for tk in sim_time:

    # --- Decide bk  ---
    if tk >= seconds_at_which_we_ramp_up and button_mode == "neutral":
        button_mode = "ramp_up"

    if button_mode == "ramp_up":
        bk = 1 * button_rate
        if sk >= max_rk:
            button_mode = "hold_max"

    if button_mode == "hold_max":
        bk = 0.0 * button_rate
        seconds_passed_after_hitting_max_rk += dt

        if seconds_passed_after_hitting_max_rk >= seconds_to_spend_at_max_rk:
            button_mode = "ramp_down"

    if button_mode == "ramp_down":
        bk = -1 * button_rate
        if sk <= final_rk:
            button_mode = "done"
            bk = 0.0 * button_rate

    sk_next, rk = setpoint_block.step(params={"sk": sk, "bk": bk, "max_rk": max_rk})

    rk_hist.append(rk)

    sk = np.clip(sk_next, 0, max_rk)  # set a max setpoint speed, 0 <= sk <= max_rk

In [None]:

from matplotlib import pyplot as plt

plt.plot(sim_time, rk_hist)
plt.xlabel("time (seconds)")
plt.ylabel("rk (setpoint speed)")
plt.title("Setpoint Generator Simulation")
plt.grid(True, alpha=0.3)
plt.show()


Not too shabby. This seems like a fun scenario to play with, so we will keep it around. But for the sake of convenience, we can compose the button logic above into *another* `DynamicalSystem` object.


#### Composing Dynamical Systems

We will call the scenario defined above, somewhate unimaginatively, as "scenario_1". After some thinking, we can cast the button logic in the scenario in an $(f,h)$-representation.

We define a "Scenario 1" block and update our diagram as follows:

<img src="../../../_static/tutorial/theory_to_python/bk_block_to_setpoint_generator_block.svg"
    width="800">


In [None]:
# ---- SCENARIO 1 BLOCK -------------------

#### ARROWS IN ####
# None

#### PARAMETERS ############
lk = {
    "button_mode": str,
    "time_since_hit_max_rk": float,
    "bk": float,
}  # scenario 1 state

tk = float  # current sim time
sk = float  # current setpoint state

scen_1_const_params = {
    "button_rate": float,
    "max_rk": float,
    "final_rk": float,
    "time_at_which_we_ramp_up": float,
    "time_at_max_rk": float,
}
#### ARROWS OUT ####
bk = int

# ---- SETPOINT BLOCK ---------------------
#### ARROWS IN ####
bk = int

#### PARAMETERS ############
sk = int
bk = int

#### ARROWS OUT ####
sk = float
rk = float

# ---- $(f,h)$-representation --------------

from typing import Dict

# This is the first $(f,h)$- representation that came to my mind, but there are plenty of others! Do whatever makes sense for you.


def scenario_1_f(lk: Dict, tk: float, sk: float, **scen_1_const_params) -> Dict:
    """
    Update scenario 1 state based on button logic.

    Implements a multi-stage button press scenario: neutral → ramp_up → hold_max → ramp_down → done.
    """
    # extract elements of state
    button_mode = lk["button_mode"]
    time_since_hit_max_rk = lk["time_since_hit_max_rk"]
    bk = lk["bk"]

    # extract constant params
    button_rate = scen_1_const_params["button_rate"]
    max_rk = scen_1_const_params["max_rk"]
    final_rk = scen_1_const_params["final_rk"]
    time_at_which_we_ramp_up = scen_1_const_params["time_at_which_we_ramp_up"]
    time_at_max_rk = scen_1_const_params["time_at_max_rk"]

    # --- Literally the same code from earlier, just copy-pasted ---
    if tk >= time_at_which_we_ramp_up and button_mode == "neutral":
        button_mode = "ramp_up"

    if button_mode == "ramp_up":
        bk = 1 * button_rate
        if sk >= max_rk:
            button_mode = "hold_max"

    if button_mode == "hold_max":
        bk = 0.0 * button_rate
        time_since_hit_max_rk += dt

        if time_since_hit_max_rk >= time_at_max_rk:
            button_mode = "ramp_down"

    if button_mode == "ramp_down":
        bk = -1 * button_rate
        if sk <= final_rk:
            button_mode = "done"
            bk = 0.0 * button_rate
    # ------

    # update state dictionary
    lk_next = lk
    lk_next["button_mode"] = button_mode
    lk_next["time_since_hit_max_rk"] = time_since_hit_max_rk
    lk_next["bk"] = bk

    return lk_next


def scenario_1_h(lk: Dict) -> float:
    """Extract button input from scenario 1 state."""
    return lk["bk"]


scenario_1_block = DynamicalSystem(f=scenario_1_f, h=scenario_1_h)

# ---- SETPOINT BLOCK ---------------------

#### ARROWS IN ####
bk = int  # button input; bk in {-1, 0, 1} (decrement, neutral, increment)

#### PARAMETERS ############
sk = int
bk = int  # Current setpoint state (cruise control setpoint in mph)

#### ARROWS OUT ####
rk = float

# ---- $(f,h)$-representation --------------

from pykal import DynamicalSystem


def setpoint_f(sk: float, bk: int) -> float:
    """Increment or decrement setpoint based on button input."""
    return sk + bk


def setpoint_h(
    sk: float,
) -> float:
    """Return the current setpoint speed."""
    rk = sk
    return rk


setpoint_block = DynamicalSystem(f=setpoint_f, h=setpoint_h)


Now we have a *much* cleaner simulation of our scenario above.

:::{note}
We immediately become less formal; going forward, we only will declare and initialize variables which **must** be defined prior to the simulation, and we will only define them once. This is to reduce code clutter and improve readability.
:::

In [None]:
# ---- SCENARIO 1 BLOCK -------------------
#### PARAMETERS ############
lk = {
    "button_mode": "neutral",
    "time_since_hit_max_rk": 0,
    "bk": 0,
}

sk = 20  # initial setpoint sk

scen_1_const_params = {
    "button_rate": 3,
    "max_rk": 80,
    "final_rk": 20,
    "time_at_which_we_ramp_up": 5,
    "time_at_max_rk": 5,
}

# ---- SETPOINT GENERATOR BLOCK -------------------

# no initialization needed

### SIMULATION VARIABLES ##############################
dt = 1
sim_time = np.arange(0, 60, dt)
rk_hist = []

### SIMULATION ###################################################

for tk in sim_time:
    lk_next, bk = scenario_1_block.step(
        params={"lk": lk, "sk": sk, "tk": tk, **scen_1_const_params}
    )
    sk_next, rk = setpoint_block.step(params={"sk": sk, "bk": bk})

    rk_hist.append(rk)

    sk = sk_next
    lk = lk_next

In [None]:
from matplotlib import pyplot as plt

plt.plot(sim_time, rk_hist)
plt.xlabel("time (seconds)")
plt.ylabel("rk (setpoint speed)")
plt.title("Setpoint Generator with Callback Policy")
plt.grid(True, alpha=0.3)
plt.show()

To make this even more succinct, we can wrap the composition of dynamical systems above into yet *another* dynamical system, which we will call the "scenario_1_setpoint_generator".


<div style="text-align: center;">
<img src="../../../_static/tutorial/theory_to_python/wrapped_setpoint_system.svg"
    width="800">
</div>

:::{note}
The blue box represents wrapped dynamical system blocks. Internally, it contains two subsystems that communicate via signals, but externally it presents a simpler interface
:::


In [None]:

# ---- SCENARIO 1 SETPOINTBLOCK -------------------

#### ARROWS IN ####
# None

#### PARAMETERS ############
Sk = {
    "lk": Dict,
    "sk": float,
    "rk": float,
}  # scenario 1 state and setpoint generator state w/output

tk = float  # current sim time

scen_1_const_params = {
    "button_rate": float,
    "max_rk": float,
    "final_rk": float,
    "time_at_which_we_ramp_up": float,
    "time_at_max_rk": float,
}

#### ARROWS OUT ####
rk = int

# ---- $(f,h)$-representation --------------
from typing import Dict


def scen_1_setpoint_block_f(Sk: Dict, tk: float, **scen_1_const_params) -> Dict:
    """
    Evolve wrapped scenario 1 setpoint system.

    Internally steps scenario_1_block and setpoint_block in series.
    """
    # extract elements of state
    lk = Sk["lk"]
    sk = Sk["sk"]

    # --- Literally just copy-pasted ---

    lk_next, bk = scenario_1_block.step(
        params={
            "lk": lk,
            "sk": sk,
            "tk": tk,
            **scen_1_const_params,
        }
    )
    sk_next, rk = setpoint_block.step(params={"sk": sk, "bk": bk})

    # ------

    # update state dictionary
    Sk["lk"] = lk_next
    Sk["sk"] = sk_next
    Sk["rk"] = rk

    Sk_next = Sk
    return Sk_next


def scen_1_setpoint_block_h(Sk: Dict) -> float:
    """Extract setpoint reference from wrapped system state."""
    return Sk["rk"]


scen_1_setpoint_block = DynamicalSystem(
    f=scen_1_setpoint_block_f, h=scen_1_setpoint_block_h
)

We now have a wonderfully minimal interface to play with!

In [None]:
# ---- SCENARIO 1 SETPOINT BLOCK -------------------

#### PARAMETERS ####

Sk = {
    "lk": {
        "button_mode": "neutral",
        "time_since_hit_max_rk": 0,
        "bk": 0,
    },
    "sk": 20,
    "rk": None,
}

scen_1_const_params = {
    "button_rate": 3,
    "max_rk": 80,
    "final_rk": 20,
    "time_at_which_we_ramp_up": 5,
    "time_at_max_rk": 5,
}

# ----------------------------------------------

### SIMULATION VARIABLES ############
rk_hist = []
dt = 1
sim_time = np.arange(0, 60, dt)

### SIMULATION ###################################################
for tk in sim_time:
    Sk_next, rk = scen_1_setpoint_block.step(
        params={"Sk": Sk, "tk": tk, **scen_1_const_params}
    )
    Sk = Sk_next

    rk_hist.append(rk)

In [None]:
# Plot
plt.plot(sim_time, rk_hist)
plt.xlabel("time (seconds)")
plt.ylabel("rk (setpoint speed)")
plt.title("Setpoint Generator with Button (Wrapped System)")
plt.grid(True, alpha=0.3)
plt.show()

print("✓ Wrapped system produces the same result with much simpler code!")

### Block 2: PID

<div style="text-align: center;">
<img src="../../../_static/tutorial/theory_to_python/controller_block.svg"
    width="600">
</div>

The [PID controller](../../algorithm_library/pid_pykal.ipynb) compares the setpoint to the estimated state and computes a control command to minimize tracking error. To see the derivation of the evolution and output functions, click on the notebook link in the preceding sentence.

In [None]:
from typing import List

# ---- SCENARIO 1 SETPOINTBLOCK -------------------

#### ARROWS IN ####
# None

#### PARAMETERS ############
Sk = {
    "lk": Dict,
    "sk": float,
    "rk": float,
}  # scenario 1 state and setpoint generator state w/output

tk = float  # current sim time

scen_1_const_params = {
    "button_rate": float,
    "max_rk": float,
    "final_rk": float,
    "time_at_which_we_ramp_up": float,
    "time_at_max_rk": float,
}

#### ARROWS OUT ####
rk = float

# ---- $(f,h)$-representation --------------
scen_1_setpoint_block = DynamicalSystem(
    f=scen_1_setpoint_block_f, h=scen_1_setpoint_block_h
)

# ---- PID BLOCK -------------------

#### ARROWS IN ####
rk = float

#### PARAMETERS ############
ck = List  # controller state ck = [ek, Ik, ek_prev]
rk = float  # reference speed
xhatk = float  # state estimate
pid_const_params = {
    "K_P": float,  # Proportional gain
    "K_I": float,  # Integral gain (eliminates steady-state error)
    "K_D": float,  # Derivative gain (reduces overshoot)
}

#### ARROWS OUT ####
uk = float

# ---- $(f,h)$-representation --------------
from pykal.algorithm_library.controllers import pid

pid_block = DynamicalSystem(f=pid.f, h=pid.h)

To get a feel for the PID controller, we create a simple mock system and simulate scenario_1.

In [None]:
# ---- SCENARIO 1 SETPOINTBLOCK -------------------
#### PARAMETERS #######
Sk = {
    "lk": {
        "button_mode": "neutral",
        "time_since_hit_max_rk": 0,
        "bk": 0,
    },
    "sk": 20,
    "rk": None,
}

scen_1_const_params = {
    "button_rate": 3,
    "max_rk": 80,
    "final_rk": 20,
    "time_at_which_we_ramp_up": 5,
    "time_at_max_rk": 5,
}

# ---- PID BLOCK -------------------

#### PARAMETERS #############

ck = (0.0, 0.0, 0.0)  # (ek, Ik, ek_prev)
pid_const_params = {"KP": 100.0, "KI": 10, "KD": 20}

xhatk = Sk["sk"]  # our initial guess is spot on

# --------------------------------------------------

### SIMULATION VARIABLES ############
dt = 1
sim_time = np.arange(0, 60, dt)
rk_hist = []
uk_hist = []
xhatk_hist = []
error_hist = []

### SIMULATION ###################################################
for tk in sim_time:
    Sk_next, rk = scen_1_setpoint_block.step(
        params={"Sk": Sk, "tk": tk, **scen_1_const_params}
    )

    ck_next, uk = pid_block.step(
        params={"ck": ck, "rk": rk, "xhatk": xhatk, **pid_const_params}
    )

    # the real plant dynamics come later
    xhatk += uk * dt * 0.01  # "some delay" via scaling factor

    rk_hist.append(rk)
    uk_hist.append(uk)
    xhatk_hist.append(xhatk)
    error_hist.append(ck[0])  # ck[0] is the current error

    Sk = Sk_next
    ck = ck_next

In [None]:
fig, axs = plt.subplots(1, 3, figsize=(15, 4))

# Plot 1: Setpoint and State Estimate
axs[0].plot(sim_time, rk_hist, "k--", linewidth=2, label="Setpoint (r)")
axs[0].plot(sim_time, xhatk_hist, "b-", linewidth=1.5, label="State Estimate (x̂)")
axs[0].set_xlabel("Time (s)", fontsize=11)
axs[0].set_ylabel("Speed (mph)", fontsize=11)
axs[0].set_title("Setpoint Tracking", fontsize=12, fontweight="bold")
axs[0].legend(loc="best")
axs[0].grid(True, alpha=0.3)

# Plot 2: Control Input
axs[1].plot(sim_time, uk_hist, "g-", linewidth=1.5)
axs[1].set_xlabel("Time (s)", fontsize=11)
axs[1].set_ylabel("Control Input (u)", fontsize=11)
axs[1].set_title("Controller Output", fontsize=12, fontweight="bold")
axs[1].grid(True, alpha=0.3)

# Plot 3: Tracking Error
axs[2].plot(sim_time, error_hist, "m-", linewidth=1.5)
axs[2].axhline(y=0, color="k", linestyle=":", alpha=0.5)
axs[2].set_xlabel("Time (s)", fontsize=11)
axs[2].set_ylabel("Error (mph)", fontsize=11)
axs[2].set_title("Tracking Error", fontsize=12, fontweight="bold")
axs[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

Yikes! Looks like our PID gains are incorrect. Can you fix them? And if you're able to fix them, see what happens when you change some parameters for `scen_1_setpoint_block`. Is your fix robust?

### Block 3: Car (Plant)

<div style="text-align: center;">
<img src="../../../_static/tutorial/theory_to_python/plant_block.svg"
    width="600">
</div>

The car dynamics are modeled with **position and velocity** as the state, and linear drag acting on the velocity.


In [None]:

# ---- SCENARIO 1 SETPOINTBLOCK -------------------

#### ARROWS IN ####
# None

#### PARAMETERS ############
Sk = {
    "lk": Dict,
    "sk": float,
    "rk": float,
}  # scenario 1 state and setpoint generator state w/output

tk = float  # current sim time

scen_1_const_params = {
    "button_rate": float,
    "max_rk": float,
    "final_rk": float,
    "time_at_which_we_ramp_up": float,
    "time_at_max_rk": float,
}

#### ARROWS OUT ####
rk = float

# ---- $(f,h)$-representation --------------
scen_1_setpoint_block = DynamicalSystem(
    f=scen_1_setpoint_block_f, h=scen_1_setpoint_block_h
)

# ---- PID BLOCK -------------------

from typing import List

#### ARROWS IN ####
rk = float

#### PARAMETERS ############
ck = List  # controller state ck = [ek, Ik, ek_prev]
rk = float  # reference speed
xhatk = float  # state estimate
pid_const_params = {
    "K_P": float,  # Proportional gain
    "K_I": float,  # Integral gain (eliminates steady-state error)
    "K_D": float,  # Derivative gain (reduces overshoot)
}

#### ARROWS OUT ####
uk = float

# ---- $(f,h)$-representation --------------
from pykal.algorithm_library.controllers import pid

pid_block = DynamicalSystem(f=pid.f, h=pid.h)

# ---- PLANT BLOCK -------------------
from numpy.typing import NDArray

#### ARROWS IN ####
uk = float

#### PARAMETERS ############
pk = NDArray  #  pk.shape=(2,1), pk[0,0] = pos, pk[1,0] = vel
uk = float
car_const_params = {"m": float, "b": float, "dt": float}  # mass  # drag  # dt (duh)

#### ARROWS OUT ####
yk = float  # vel measurement


# ---- $(f,h)$-representation --------------
def plant_f(
    pk: NDArray,
    uk: float,
    m: float,
    b: float,
    dt: float,
) -> NDArray:
    """
    State evolution.

    Parameters
    ----------
    pk : (2, 1) ndarray
        State column vector [position; velocity].

    Returns
    -------
    (2, 1) ndarray
        Next state column vector.
    """

    if pk.shape != (2, 1):
        raise ValueError(f"pk must have shape (2,1); got {pk.shape}")

    p_k, v_k = pk[0, 0], pk[1, 0]

    p_next = p_k + dt * v_k
    v_next = v_k + dt * (-b / m * v_k + uk / m)

    return np.array([[p_next], [v_next]])


def plant_h(
    pk: NDArray,
) -> NDArray:
    """
    Measurement model.

    Parameters
    ----------
    pk : (2, 1) ndarray
        State column vector [position; velocity].

    Returns
    -------
    (1, 1) ndarray
        Velocity measurement.
    """

    if pk.shape != (2, 1):
        raise ValueError(f"pk must have shape (2,1); got {pk.shape}")

    v_k = pk[1, 0]

    return v_k


plant_block = DynamicalSystem(f=plant_f, h=plant_h)

We can now update our previous PID controller simulation with the actual system we're controlling!

In [None]:

import numpy as np

rng = np.random.default_rng()
# ---- SCENARIO 1 SETPOINTBLOCK -------------------
#### PARAMETERS #######
Sk = {
    "lk": {
        "button_mode": "neutral",
        "time_since_hit_max_rk": 0,
        "bk": 0,
    },
    "sk": 20,
    "rk": None,
}

scen_1_const_params = {
    "button_rate": 3,
    "max_rk": 80,
    "final_rk": 20,
    "time_at_which_we_ramp_up": 5,
    "time_at_max_rk": 5,
}

# ---- PID BLOCK -------------------

#### PARAMETERS #############

ck = (0.0, 0.0, 0.0)  # (ek, Ik, ek_prev)
pid_const_params = {"KP": 100.0, "KI": 10, "KD": 20}

xhatk = Sk["sk"]  # our initial guess is spot on

# ---------PLANT BLOCK---------------
pk = np.array([[0], [20]])
car_const_params = {"m": 1500, "b": 50, "dt": 1}

### SIMULATION VARIABLES ############
dt = 1
sim_time = np.arange(0, 60, dt)

Qk = np.array([[1e-1, 0], [0, 2e-1]])  # add process and measurement noise
Rk = np.array([[3]])

rk_hist = []
uk_hist = []
error_hist = []
xhatk_hist = []
xk_hist = []  # true velocity

### SIMULATION ###################################################
for tk in sim_time:
    Sk_next, rk = scen_1_setpoint_block.step(
        params={"Sk": Sk, "tk": tk, **scen_1_const_params}
    )

    ck_next, uk = pid_block.step(
        params={"ck": ck, "rk": rk, "xhatk": xhatk, **pid_const_params}
    )

    pk_next, yk = plant_block.step(params={"pk": pk, "uk": uk, **car_const_params})

    # Apply process noise and measurement noise
    process_noise = rng.multivariate_normal(mean=np.zeros(Qk.shape[0]), cov=Qk).reshape(
        -1, 1
    )  # (2,1) vector
    measurement_noise = rng.multivariate_normal(mean=np.zeros(Rk.shape[0]), cov=Rk)[
        0
    ]  # scalar

    pk_next = pk_next + process_noise
    yk = yk + measurement_noise

    xhatk = yk  # no smoothing; our measurement is raw

    rk_hist.append(rk)
    uk_hist.append(uk)
    xhatk_hist.append(xhatk)
    xk_hist.append(pk[1, 0])  # true velocity
    error_hist.append(ck[0])

    Sk = Sk_next
    ck = ck_next
    pk = pk_next

In [None]:
fig, axs = plt.subplots(1, 3, figsize=(15, 4))

# Plot 1: Setpoint and State Estimate
axs[0].plot(sim_time, rk_hist, "k--", linewidth=2, label="Setpoint (r)")
axs[0].plot(sim_time, xhatk_hist, "b-", linewidth=1.5, label="State Estimate (x̂)")
axs[0].plot(
    sim_time, xk_hist, "r-", linewidth=1.5, alpha=0.5, label="True Velocity (x)"
)
axs[0].set_xlabel("Time (s)", fontsize=11)
axs[0].set_ylabel("Speed (mph)", fontsize=11)
axs[0].set_title("Setpoint Tracking", fontsize=12, fontweight="bold")
axs[0].legend(loc="best")
axs[0].grid(True, alpha=0.3)

# Plot 2: Control Input
axs[1].plot(sim_time, uk_hist, "g-", linewidth=1.5)
axs[1].set_xlabel("Time (s)", fontsize=11)
axs[1].set_ylabel("Control Input (u)", fontsize=11)
axs[1].set_title("Controller Output", fontsize=12, fontweight="bold")
axs[1].grid(True, alpha=0.3)

# Plot 3: Tracking Error
axs[2].plot(sim_time, error_hist, "m-", linewidth=1.5)
axs[2].axhline(y=0, color="k", linestyle=":", alpha=0.5)
axs[2].set_xlabel("Time (s)", fontsize=11)
axs[2].set_ylabel("Error (mph)", fontsize=11)
axs[2].set_title("Tracking Error", fontsize=12, fontweight="bold")
axs[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

Alas! Can you tune the gains here? And how does changing the mass or drag of the car affect the optimal gains?

### Block 4: KF (Observer)

<div style="text-align: center;">
<img src="../../../_static/tutorial/theory_to_python/observer_block.svg"
    width="400">
</div>

The [Kalman filter](../../algorithm_library/kf_pykal.ipynb) estimates the car's velocity by fusing the motion model with noisy measurements.


In [None]:

# ---- SCENARIO 1 SETPOINTBLOCK -------------------

#### ARROWS IN ####
# None

#### PARAMETERS ############
Sk = {
    "lk": Dict,
    "sk": float,
    "rk": float,
}  # scenario 1 state and setpoint generator state w/output

tk = float  # current sim time

scen_1_const_params = {
    "button_rate": float,
    "max_rk": float,
    "final_rk": float,
    "time_at_which_we_ramp_up": float,
    "time_at_max_rk": float,
}

#### ARROWS OUT ####
rk = float

# ---- $(f,h)$-representation --------------
scen_1_setpoint_block = DynamicalSystem(
    f=scen_1_setpoint_block_f, h=scen_1_setpoint_block_h
)

# ---- PID BLOCK -------------------

from typing import List

#### ARROWS IN ####
rk = float

#### PARAMETERS ############
ck = List  # controller state ck = [ek, Ik, ek_prev]
rk = float  # reference speed
xhatk = float  # state estimate
pid_const_params = {
    "K_P": float,  # Proportional gain
    "K_I": float,  # Integral gain (eliminates steady-state error)
    "K_D": float,  # Derivative gain (reduces overshoot)
}

#### ARROWS OUT ####
uk = float

# ---- $(f,h)$-representation --------------
from pykal.algorithm_library.controllers import pid

pid_block = DynamicalSystem(f=pid.f, h=pid.h)

# ---- PLANT BLOCK -------------------
from numpy.typing import NDArray

#### ARROWS IN ####
uk = float

#### PARAMETERS ############
pk = NDArray  #  pk.shape=(2,1), pk[0,0] = pos, pk[1,0] = vel
uk = float
car_const_params = {"m": float, "b": float, "dt": float}  # mass  # drag  # dt (duh)

#### ARROWS OUT ####
yk = float  # vel measurement


# ---- $(f,h)$-representation --------------
def plant_f(
    pk: NDArray,
    uk: float,
    m: float,
    b: float,
    dt: float,
) -> NDArray:
    """
    State evolution.

    Parameters
    ----------
    pk : (2, 1) ndarray
        State column vector [position; velocity].

    Returns
    -------
    (2, 1) ndarray
        Next state column vector.
    """

    if pk.shape != (2, 1):
        raise ValueError(f"pk must have shape (2,1); got {pk.shape}")

    p_k, v_k = pk[0, 0], pk[1, 0]

    p_next = p_k + dt * v_k
    v_next = v_k + dt * (-b / m * v_k + uk / m)

    return np.array([[p_next], [v_next]])


def plant_h(
    pk: NDArray,
) -> NDArray:
    """
    Measurement model.

    Parameters
    ----------
    pk : (2, 1) ndarray
        State column vector [position; velocity].

    Returns
    -------
    (1, 1) ndarray
        Velocity measurement.
    """

    if pk.shape != (2, 1):
        raise ValueError(f"pk must have shape (2,1); got {pk.shape}")

    v_k = pk[1, 0]

    return v_k


plant_block = DynamicalSystem(f=plant_f, h=plant_h)

# ---- KF BLOCK -------------------
from numpy.typing import NDArray
from typing import Callable

#### ARROWS IN ####
uk = float
yk = NDArray  # (since kf expects an array and yk is a float from plant_block, we will recast yk when we call the kf block)

#### PARAMETERS ############
zk = List  #  zk[0]=pk, zk[1] = Pk (covariance matrix)

kf_const_params = {
    "f": Callable,  # plant evolution function
    "h": Callable,  # plant output function
    "Fk": NDArray,  # plant evolution function Jacobian
    "Hk": NDArray,  # plant output function Jacobian
    "Qk": NDArray,  # process noise covariance matrix
    "Rk": NDArray,  # measurement noise covariance matrix
}

f_h_params = Dict


#### ARROWS OUT ####
xhatk = NDArray  # vel prediction measurement


# ---- $(f,h)$-representation --------------

from pykal.algorithm_library.estimators import kf

kf_block = DynamicalSystem(f=kf.f, h=kf.h)

We can now simulate the full system.

In [None]:
import numpy as np

rng = np.random.default_rng()
# ---- SCENARIO 1 SETPOINTBLOCK -------------------
#### PARAMETERS #######
Sk = {
    "lk": {
        "button_mode": "neutral",
        "time_since_hit_max_rk": 0,
        "bk": 0,
    },
    "sk": 20,
    "rk": None,
}

scen_1_const_params = {
    "button_rate": 3,
    "max_rk": 80,
    "final_rk": 20,
    "time_at_which_we_ramp_up": 5,
    "time_at_max_rk": 5,
}

# ---- PID BLOCK -------------------

#### PARAMETERS #############

ck = (0.0, 0.0, 0.0)  # (ek, Ik, ek_prev)
pid_const_params = {"KP": 100.0, "KI": 10, "KD": 20}

xhatk = Sk["sk"]  # our initial guess is spot on

# ---------PLANT BLOCK---------------
#### PARAMETERS #############
pk = np.array([[0], [20]])
car_const_params = {"m": 1500, "b": 50, "dt": 1}

# ---------KF BLOCK---------------
#### PARAMETERS #############
zk = [
    pk,  # [xhatk,Pk] - full state estimate [position, velocity]
    np.array([[0.5, 0.0], [0.0, 1.0]]),
]


def plant_F(**car_const_params) -> np.ndarray:
    """
    Compute Jacobian of plant_f with respect to state x = [x_k, v_k].
    """
    m = car_const_params["m"]
    b = car_const_params["b"]
    dt = car_const_params["dt"]
    return np.array(
        [
            [1.0, dt],
            [0.0, 1.0 - (b / m) * dt],
        ]
    )


def plant_H() -> np.ndarray:
    """
    Compute Jacobian of plant_h with respect to state x = [x_k, v_k].
    """
    return np.array([[0.0, 1.0]])


Fk = plant_F(**car_const_params)
Hk = plant_H()
Qk = np.array([[1e-1, 0], [0, 2e-1]])
Rk = np.array([[3]])

kf_const_params = {
    "f": plant_block.f,
    "h": plant_block.h,
    "Fk": Fk,
    "Hk": Hk,
}
# ----------------------------------------

### SIMULATION VARIABLES ############
dt = 1
sim_time = np.arange(0, 60, dt)

Qk = np.array([[1e-1, 0], [0, 2e-1]])  # add process and measurement noise
Rk = np.array([[3]])

rk_hist = []
uk_hist = []
error_hist = []
xhatk_hist = []
xk_hist = []
yk_hist = []

### SIMULATION ###################################################
for tk in sim_time:
    Sk_next, rk = scen_1_setpoint_block.step(
        params={"Sk": Sk, "tk": tk, **scen_1_const_params}
    )

    ck_next, uk = pid_block.step(
        params={"ck": ck, "rk": rk, "xhatk": xhatk, **pid_const_params}
    )

    pk_next, yk = plant_block.step(params={"pk": pk, "uk": uk, **car_const_params})

    # Apply process noise and measurement noise
    process_noise = rng.multivariate_normal(mean=np.zeros(Qk.shape[0]), cov=Qk).reshape(
        -1, 1
    )  # (2,1) vector
    measurement_noise = rng.multivariate_normal(mean=np.zeros(Rk.shape[0]), cov=Rk)[
        0
    ]  # scalar

    pk_next = pk_next + process_noise
    yk = yk + measurement_noise

    zk_next, phatk = kf_block.step(
        params={
            "zk": zk,
            "uk": uk,
            "yk": np.array([[yk]]),
            "Qk": Qk,
            "Rk": Rk,
            **kf_const_params,
            "f_h_params": {
                "pk": pk,
                "uk": uk,
                **car_const_params,
            },
        }
    )

    xhatk = phatk[1, 0]

    rk_hist.append(rk)
    uk_hist.append(uk)
    xhatk_hist.append(xhatk)
    xk_hist.append(pk[1, 0])  # true velocity
    error_hist.append(ck[0])
    yk_hist.append(yk)

    Sk = Sk_next
    ck = ck_next
    pk = pk_next
    zk = zk_next

In [None]:
fig, axs = plt.subplots(1, 3, figsize=(15, 4))

# Plot 1: Setpoint and State Estimate
axs[0].plot(sim_time, rk_hist, "k--", linewidth=2, label="Setpoint (r)")
axs[0].plot(sim_time, xhatk_hist, "b-", linewidth=1.5, label="State Estimate (x̂)")
axs[0].plot(
    sim_time, xk_hist, "r-", linewidth=1.5, alpha=0.5, label="True Velocity (x)"
)
axs[0].scatter(
    sim_time[::10], yk_hist[::10], c="gray", s=10, alpha=0.3, label="Measurements (y)"
)
axs[0].set_xlabel("Time (s)", fontsize=11)
axs[0].set_ylabel("Speed (mph)", fontsize=11)
axs[0].set_title("Velocity Tracking", fontsize=12, fontweight="bold")
axs[0].legend(loc="best")
axs[0].grid(True, alpha=0.3)

# Plot 2: Control Input
axs[1].plot(sim_time, uk_hist, "g-", linewidth=1.5)
axs[1].set_xlabel("Time (s)", fontsize=11)
axs[1].set_ylabel("Control Input (u)", fontsize=11)
axs[1].set_title("Controller Output", fontsize=12, fontweight="bold")
axs[1].grid(True, alpha=0.3)

# Plot 3: Tracking Error
axs[2].plot(sim_time, error_hist, "m-", linewidth=1.5)
axs[2].axhline(y=0, color="k", linestyle=":", alpha=0.5)
axs[2].set_xlabel("Time (s)", fontsize=11)
axs[2].set_ylabel("Error (mph)", fontsize=11)
axs[2].set_title("Tracking Error", fontsize=12, fontweight="bold")
axs[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

### Wrapping the Complete System

Just as we wrapped the button logic into a `DynamicalSystem` earlier, we can wrap the **entire cruise control system** into a single `DynamicalSystem` object. All system parameters can be configured at initialization, and the simulation loop becomes trivial.

<div style="text-align: center;">
<img src="../../../_static/tutorial/theory_to_python/wrapped_cruise_control_system.svg"
    width="900">
</div>

The diagram above shows the complete cruise control system in two views. The **top** view shows all five subsystems (Button, Setpoint, PID, Plant, and Observer) wrapped inside a blue box, with all internal signals visible. The **bottom** view shows the same system from the outside—a single `DynamicalSystem` block with a clean interface that accepts parameters and returns the desired variables.



In [None]:
# --------- CRUISE CONTROL BLOCK---------------
#### ARROWS IN ####
# None

#### PARAMETERS #############
Ck = {
    "Sk": Dict,
    "zk": List,
    "pk": NDArray,
    "ck": List,
    "xhatk": float,
}  # whole system state
tk = float  # current time
scen_1_const_params = Dict
car_const_params = Dict
kf_const_params = Dict
pid_const_params = Dict
Qk = NDArray
Rk = NDArray

#### ARROWS OUT ####
xhatk = float

# ---- $(f,h)$-representation --------------


def cruise_control_f(
    Ck: Dict,
    tk: float,
    Qk: NDArray,
    Rk: NDArray,
    scen_1_const_params: Dict,
    pid_const_params: Dict,
    car_const_params: Dict,
    kf_const_params: Dict,
) -> Dict:
    Sk = Ck["Sk"]
    zk = Ck["zk"]
    pk = Ck["pk"]
    ck = Ck["ck"]
    xhatk = Ck["xhatk"]

    Sk_next, rk = scen_1_setpoint_block.step(
        params={"Sk": Sk, "tk": tk, **scen_1_const_params}
    )

    ck_next, uk = pid_block.step(
        params={"ck": ck, "rk": rk, "xhatk": xhatk, **pid_const_params}
    )

    pk_next, yk = plant_block.step(params={"pk": pk, "uk": uk, **car_const_params})

    # Apply process noise and measurement noise
    process_noise = rng.multivariate_normal(mean=np.zeros(Qk.shape[0]), cov=Qk).reshape(
        -1, 1
    )  # (2,1) vector
    measurement_noise = rng.multivariate_normal(mean=np.zeros(Rk.shape[0]), cov=Rk)[
        0
    ]  # scalar

    pk_next = pk_next + process_noise
    yk = yk + measurement_noise

    zk_next, phatk = kf_block.step(
        params={
            "zk": zk,
            "uk": uk,
            "yk": np.array([[yk]]),
            "Qk": Qk,
            "Rk": Rk,
            **kf_const_params,
            "f_h_params": {
                "pk": pk,
                "uk": uk,
                **car_const_params,
            },
        }
    )

    xhatk = phatk[1, 0]

    Ck_next = Ck
    Ck["Sk"] = Sk_next
    Ck["zk"] = zk_next
    Ck["pk"] = pk_next
    Ck["ck"] = ck_next
    Ck["xhatk"] = xhatk

    return Ck_next


def cruise_control_h(Ck: Dict) -> float:
    return Ck["xhatk"]


cruise_control_block = DynamicalSystem(f=cruise_control_f, h=cruise_control_h)

In [None]:
import numpy as np

rng = np.random.default_rng()

# ---- CRUISE CONTROL BLOCK -------------------

#### PARAMETERS #######

# defining variables for state Ck
Sk = {
    "lk": {
        "button_mode": "neutral",
        "time_since_hit_max_rk": 0,
        "bk": 0,
    },
    "sk": 20,
    "rk": None,
}
pk = np.array([[0], [20]])
ck = (0.0, 0.0, 0.0)
zk = [pk, np.array([[0.5, 0.0], [0.0, 1.0]])]
xhatk = Sk["sk"]

# state
Ck = {"Sk": Sk, "zk": zk, "pk": pk, "ck": ck, "xhatk": xhatk}

scen_1_const_params = {
    "button_rate": 3,
    "max_rk": 80,
    "final_rk": 20,
    "time_at_which_we_ramp_up": 5,
    "time_at_max_rk": 5,
}

pid_const_params = {"KP": 100.0, "KI": 10, "KD": 20}
car_const_params = {"m": 1500, "b": 50, "dt": 1}


# defining variables for kf_const_params
def plant_F(**car_const_params) -> np.ndarray:
    """
    Compute Jacobian of plant_f with respect to state x = [x_k, v_k].
    """
    m = car_const_params["m"]
    b = car_const_params["b"]
    dt = car_const_params["dt"]
    return np.array(
        [
            [1.0, dt],
            [0.0, 1.0 - (b / m) * dt],
        ]
    )


def plant_H() -> np.ndarray:
    """
    Compute Jacobian of plant_h with respect to state x = [x_k, v_k].
    """
    return np.array([[0.0, 1.0]])


Fk = plant_F(**car_const_params)
Hk = plant_H()

kf_const_params = {
    "f": plant_block.f,
    "h": plant_block.h,
    "Fk": Fk,
    "Hk": Hk,
}
# ----------------------------------------

### SIMULATION VARIABLES ############
dt = 1
sim_time = np.arange(0, 60, dt)

Qk = np.array([[1e-1, 0], [0, 2e-1]])  # add process and measurement noise
Rk = np.array([[3]])

rk_hist = []
uk_hist = []
error_hist = []
xhatk_hist = []
xk_hist = []
yk_hist = []

### SIMULATION ###################################################
for tk in sim_time:
    Ck_next, xhatk = cruise_control_block.step(
        params={
            "Ck": Ck,
            "tk": tk,
            "Qk": Qk,
            "Rk": Rk,
            "scen_1_const_params": scen_1_const_params,
            "pid_const_params": pid_const_params,
            "car_const_params": car_const_params,
            "kf_const_params": kf_const_params,
        }
    )

    xhatk_hist.append(xhatk)

In [None]:
fig, axs = plt.subplots(1, 1, figsize=(15, 4))


axs.plot(sim_time, xhatk_hist, "b-", linewidth=1.5, label="State Estimate (x̂)")

axs.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

:::{note}

## Experimentation

The sacrifice for increasing levels of abstraction is necessarily a loss of information. We only have access to the final state estimate here. It is better to experiment with the complete system before it is wrapped, and then wrapping it if higher abstraction is necessary. So, in the spirit of experimentation, have some fun!

**Controller tuning**:
- Adjust PID gains (KP, KI, KD) to reduce overshoot
- What happens if you increase KP too much? (oscillations)
- What happens if KD is too small? (slow settling)
- Find optimal gains for minimal overshoot and settling time

**Setpoint policy**:
- Change button policy parameters (button_rate, hold_seconds_at_max)
- Try different target speeds and ramp rates
- Design a smooth sinusoidal setpoint trajectory

**Plant variations**:
- Modify physical parameters (m, b) to simulate different vehicles
  - Heavy truck: m=5000 kg, b=100 kg/s
  - Sports car: m=1000 kg, b=30 kg/s
- Add measurement noise to make the problem more realistic
- Tune Kalman filter covariances (Q, R) for different noise scenarios

**Challenge**: Can you design PID gains that work well for both a heavy truck and a sports car?

[← Control Systems as Dynamical Systems](../../../getting_started/theory_to_python/control_systems_as_dynamical_systems.rst)