# MPC Derivation

In [8]:
import numpy as np
from scipy.optimize import minimize

class MPC:
    def __init__(self, A, B, Q, R, Np, Nc, x0):
        """
        Initialize the MPC object.
        
        Args:
        A (np.ndarray): State transition matrix.
        B (np.ndarray): Input matrix.
        Q (np.ndarray): State cost matrix.
        R (np.ndarray): Input cost matrix.
        Np (int): Prediction horizon.
        Nc (int): Control horizon.
        x0 (np.ndarray): Initial state.
        """
        self.A = A
        self.B = B
        self.Q = Q
        self.R = R
        self.Np = Np  # Prediction horizon
        self.Nc = Nc  # Control horizon
        self.x0 = x0  # Initial state

    def predict_state(self, x, u_sequence):
        """
        Predict the future state trajectory based on the current state and a sequence of control inputs.
        
        Args:
        x (np.ndarray): Current state.
        u_sequence (np.ndarray): Control input sequence.

        Returns:
        np.ndarray: Predicted state trajectory.
        """
        n = self.A.shape[0]
        m = self.B.shape[1]
        state_trajectory = np.zeros((self.Np + 1, n))
        state_trajectory[0] = x
        
        for i in range(self.Np):
            u = u_sequence[i] if i < len(u_sequence) else u_sequence[-1]
            state_trajectory[i + 1] = self.A @ state_trajectory[i] + self.B @ u
        
        return state_trajectory

    def cost_function(self, u_sequence, x, x_ref):
        """
        The cost function to minimize.
        
        Args:
        u_sequence (np.ndarray): Control input sequence (flattened).
        x (np.ndarray): Current state of the system.
        x_ref (np.ndarray): Reference trajectory for the state.

        Returns:
        float: The cost for the given control input sequence.
        """
        # Reshape the control input sequence back into 2D form
        u_sequence = u_sequence.reshape(self.Nc, self.B.shape[1])
        
        state_trajectory = self.predict_state(x, u_sequence)
        cost = 0

        for i in range(self.Np):
            x_diff = state_trajectory[i] - x_ref
            u = u_sequence[i] if i < len(u_sequence) else u_sequence[-1]
            cost += x_diff.T @ self.Q @ x_diff + u.T @ self.R @ u
        
        return cost

    def solve(self, x, x_ref):
        """
        Solve the MPC optimization problem using scipy.optimize.minimize.
        
        Args:
        x (np.ndarray): Current state of the system.
        x_ref (np.ndarray): Reference trajectory for the state.

        Returns:
        np.ndarray: Optimal control input sequence.
        """
        # Initial guess for the control input (a flat input sequence)
        u_initial = np.zeros(self.Nc * self.B.shape[1])

        # Minimize the cost function using SciPy
        result = minimize(
            self.cost_function, 
            u_initial, 
            args=(x, x_ref), 
            method='SLSQP', 
            bounds=[(-1, 1)] * self.Nc * self.B.shape[1]
        )

        if not result.success:
            raise ValueError("MPC optimization problem did not converge")

        # Return the first control input
        u_optimal = result.x.reshape(self.Nc, self.B.shape[1])
        return u_optimal[0]

    def apply_control(self, x, x_ref):
        """
        Apply control by solving the MPC and updating the state.
        
        Args:
        x (np.ndarray): Current state of the system.
        x_ref (np.ndarray): Reference trajectory for the state.
        
        Returns:
        np.ndarray: New state after applying the control.
        """
        u_opt = self.solve(x, x_ref)
        x_next = self.A @ x + self.B @ u_opt
        return x_next, u_opt

# Example problem to demonstrate the MPC class
def example():
    # System matrices (for a simple 1D system)
    A = np.array([[1]])
    B = np.array([[1]])
    Q = np.array([[1]])  # State cost
    R = np.array([[1]])  # Input cost
    Np = 10  # Prediction horizon
    Nc = 3   # Control horizon
    x0 = np.array([5])  # Initial state
    x_ref = np.array([0])  # Desired state

    # Initialize the MPC controller
    mpc = MPC(A, B, Q, R, Np, Nc, x0)

    # Simulation loop
    x = x0
    for t in range(10):
        print(f"Time step {t}:")
        print(f"Current state: {x}")
        x, u = mpc.apply_control(x, x_ref)
        print(f"Control applied: {u}")
        print(f"New state: {x}")
        print("-" * 30)

# Run the example
example()


Time step 0:
Current state: [5]
Control applied: [-1.]
New state: [4.]
------------------------------
Time step 1:
Current state: [4.]
Control applied: [-1.]
New state: [3.]
------------------------------
Time step 2:
Current state: [3.]
Control applied: [-1.]
New state: [2.]
------------------------------
Time step 3:
Current state: [2.]
Control applied: [-1.]
New state: [1.]
------------------------------
Time step 4:
Current state: [1.]
Control applied: [-0.63368983]
New state: [0.36631017]
------------------------------
Time step 5:
Current state: [0.36631017]
Control applied: [-0.23212701]
New state: [0.13418316]
------------------------------
Time step 6:
Current state: [0.13418316]
Control applied: [-0.08503041]
New state: [0.04915275]
------------------------------
Time step 7:
Current state: [0.04915275]
Control applied: [-0.0311968]
New state: [0.01795595]
------------------------------
Time step 8:
Current state: [0.01795595]
Control applied: [-0.0114397]
New state: [0.00651

In [9]:
import numpy as np
from scipy.optimize import minimize

class MPC:
    def __init__(self, A, B, Q, R, Np, Nc, x0):
        """
        Initialize the MPC object.
        
        Args:
        A (np.ndarray): State transition matrix.
        B (np.ndarray): Input matrix.
        Q (np.ndarray): State cost matrix.
        R (np.ndarray): Input cost matrix.
        Np (int): Prediction horizon.
        Nc (int): Control horizon.
        x0 (np.ndarray): Initial state.
        """
        self.A = A
        self.B = B
        self.Q = Q
        self.R = R
        self.Np = Np  # Prediction horizon
        self.Nc = Nc  # Control horizon
        self.x0 = x0  # Initial state

    def predict_state(self, x, u_sequence):
        """
        Predict the future state trajectory based on the current state and a sequence of control inputs.
        
        Args:
        x (np.ndarray): Current state.
        u_sequence (np.ndarray): Control input sequence.

        Returns:
        np.ndarray: Predicted state trajectory.
        """
        n = self.A.shape[0]
        state_trajectory = np.zeros((self.Np + 1, n))
        state_trajectory[0] = x
        
        for i in range(self.Np):
            u = u_sequence[i] if i < len(u_sequence) else u_sequence[-1]
            state_trajectory[i + 1] = self.A @ state_trajectory[i] + self.B @ u
        
        return state_trajectory

    def cost_function(self, u_sequence, x, x_ref):
        """
        The cost function to minimize.
        
        Args:
        u_sequence (np.ndarray): Control input sequence (flattened).
        x (np.ndarray): Current state of the system.
        x_ref (np.ndarray): Reference trajectory for the state.

        Returns:
        float: The cost for the given control input sequence.
        """
        # Reshape the control input sequence back into 2D form
        u_sequence = u_sequence.reshape(self.Nc, self.B.shape[1])
        
        state_trajectory = self.predict_state(x, u_sequence)
        cost = 0

        for i in range(self.Np):
            x_diff = state_trajectory[i] - x_ref
            u = u_sequence[i] if i < len(u_sequence) else u_sequence[-1]
            cost += x_diff.T @ self.Q @ x_diff + u.T @ self.R @ u
        
        return cost

    def solve(self, x, x_ref):
        """
        Solve the MPC optimization problem using scipy.optimize.minimize.
        
        Args:
        x (np.ndarray): Current state of the system.
        x_ref (np.ndarray): Reference trajectory for the state.

        Returns:
        np.ndarray: Optimal control input sequence.
        """
        # Initial guess for the control input (a flat input sequence)
        u_initial = np.zeros(self.Nc * self.B.shape[1])

        # Minimize the cost function using SciPy
        result = minimize(
            self.cost_function, 
            u_initial, 
            args=(x, x_ref), 
            method='SLSQP', 
            bounds=[(-1, 1)] * self.Nc * self.B.shape[1]
        )

        if not result.success:
            raise ValueError("MPC optimization problem did not converge")

        # Return the first control input
        u_optimal = result.x.reshape(self.Nc, self.B.shape[1])
        return u_optimal[0]

    def apply_control(self, x, x_ref):
        """
        Apply control by solving the MPC and updating the state.
        
        Args:
        x (np.ndarray): Current state of the system.
        x_ref (np.ndarray): Reference trajectory for the state.
        
        Returns:
        np.ndarray: New state after applying the control.
        """
        u_opt = self.solve(x, x_ref)
        x_next = self.A @ x + self.B @ u_opt
        return x_next, u_opt

# Example problem for the cascaded tanks system
def cascaded_tanks_example():
    # System matrices for cascaded tanks
    A = np.array([[0.95, 0], [0.05, 0.9]])  # Example dynamics
    B = np.array([[0.1], [0]])  # Input only affects the first tank
    Q = np.diag([1, 1])  # Penalize both heights equally
    R = np.array([[0.1]])  # Penalize input effort
    Np = 10  # Prediction horizon
    Nc = 3   # Control horizon
    x0 = np.array([3, 1])  # Initial water heights in both tanks
    x_ref = np.array([1, 1])  # Desired water heights in both tanks

    # Initialize the MPC controller
    mpc = MPC(A, B, Q, R, Np, Nc, x0)

    # Simulation loop
    x = x0
    for t in range(20):
        print(f"Time step {t}:")
        print(f"Current state (Tank heights): {x}")
        x, u = mpc.apply_control(x, x_ref)
        print(f"Control applied (Input flow rate): {u}")
        print(f"New state (Tank heights): {x}")
        print("-" * 30)

# Run the example
cascaded_tanks_example()


Time step 0:
Current state (Tank heights): [3 1]
Control applied (Input flow rate): [-1.]
New state (Tank heights): [2.75 1.05]
------------------------------
Time step 1:
Current state (Tank heights): [2.75 1.05]
Control applied (Input flow rate): [-1.]
New state (Tank heights): [2.5125 1.0825]
------------------------------
Time step 2:
Current state (Tank heights): [2.5125 1.0825]
Control applied (Input flow rate): [-1.]
New state (Tank heights): [2.286875 1.099875]
------------------------------
Time step 3:
Current state (Tank heights): [2.286875 1.099875]
Control applied (Input flow rate): [-1.]
New state (Tank heights): [2.07253125 1.10423125]
------------------------------
Time step 4:
Current state (Tank heights): [2.07253125 1.10423125]
Control applied (Input flow rate): [-1.]
New state (Tank heights): [1.86890469 1.09743469]
------------------------------
Time step 5:
Current state (Tank heights): [1.86890469 1.09743469]
Control applied (Input flow rate): [-1.]
New state (Ta

In [None]:
import numpy as np
from scipy.optimize import minimize

class MPC:
    def __init__(self, model_tank1, model_tank2, Q, R, Np, Nc, x0):
        """
        Initialize the MPC object.
        
        Args:
        model_tank1 (callable): Trained model to predict Tank 1 dynamics.
        model_tank2 (callable): Trained model to predict Tank 2 dynamics.
        Q (np.ndarray): State cost matrix.
        R (np.ndarray): Input cost matrix.
        Np (int): Prediction horizon.
        Nc (int): Control horizon.
        x0 (np.ndarray): Initial state (heights of both tanks).
        """
        self.model_tank1 = model_tank1
        self.model_tank2 = model_tank2
        self.Q = Q
        self.R = R
        self.Np = Np  # Prediction horizon
        self.Nc = Nc  # Control horizon
        self.x0 = x0  # Initial state

    def predict_state(self, x, u_sequence):
        """
        Predict the future state trajectory based on the current state and a sequence of control inputs.
        
        Args:
        x (np.ndarray): Current state (heights of both tanks).
        u_sequence (np.ndarray): Control input sequence.

        Returns:
        np.ndarray: Predicted state trajectory.
        """
        n = len(x)
        state_trajectory = np.zeros((self.Np + 1, n))
        state_trajectory[0] = x
        
        h1, h2 = x[0], x[1]
        
        for i in range(self.Np):
            u = u_sequence[i] if i < len(u_sequence) else u_sequence[-1]
            
            # Use the trained models to predict the next heights
            h1_next = self.model_tank1.predict([h1, h2, u])[0]  # Model prediction for Tank 1
            h2_next = self.model_tank2.predict([h1_next, h2])[0]  # Model prediction for Tank 2

            state_trajectory[i + 1] = [h1_next, h2_next]
            h1, h2 = h1_next, h2_next  # Update for the next prediction
        
        return state_trajectory

    def cost_function(self, u_sequence, x, x_ref):
        """
        The cost function to minimize.
        
        Args:
        u_sequence (np.ndarray): Control input sequence (flattened).
        x (np.ndarray): Current state of the system.
        x_ref (np.ndarray): Reference trajectory for the state.

        Returns:
        float: The cost for the given control input sequence.
        """
        # Reshape the control input sequence back into 2D form
        u_sequence = u_sequence.reshape(self.Nc, 1)
        
        state_trajectory = self.predict_state(x, u_sequence)
        cost = 0

        for i in range(self.Np):
            x_diff = state_trajectory[i] - x_ref
            u = u_sequence[i] if i < len(u_sequence) else u_sequence[-1]
            cost += x_diff.T @ self.Q @ x_diff + u.T @ self.R @ u
        
        return cost

    def solve(self, x, x_ref):
        """
        Solve the MPC optimization problem using scipy.optimize.minimize.
        
        Args:
        x (np.ndarray): Current state of the system.
        x_ref (np.ndarray): Reference trajectory for the state.

        Returns:
        np.ndarray: Optimal control input sequence.
        """
        # Initial guess for the control input (a flat input sequence)
        u_initial = np.zeros(self.Nc)

        # Minimize the cost function using SciPy
        result = minimize(
            self.cost_function, 
            u_initial, 
            args=(x, x_ref), 
            method='SLSQP', 
            bounds=[(-1, 1)] * self.Nc
        )

        if not result.success:
            raise ValueError("MPC optimization problem did not converge")

        # Return the first control input
        u_optimal = result.x
        return u_optimal[0]

    def apply_control(self, x, x_ref):
        """
        Apply control by solving the MPC and updating the state.
        
        Args:
        x (np.ndarray): Current state of the system.
        x_ref (np.ndarray): Reference trajectory for the state.
        
        Returns:
        np.ndarray: New state after applying the control.
        """
        u_opt = self.solve(x, x_ref)
        h1_next = self.model_tank1.predict([x[0], x[1], u_opt])[0]
        h2_next = self.model_tank2.predict([h1_next, x[1]])[0]
        x_next = np.array([h1_next, h2_next])
        return x_next, u_opt

# Example problem for the cascaded tanks system using trained models
def cascaded_tanks_example(model_tank1, model_tank2):
    # System parameters
    Q = np.diag([1, 1])  # Penalize both heights equally
    R = np.array([[0.1]])  # Penalize input effort
    Np = 10  # Prediction horizon
    Nc = 3   # Control horizon
    x0 = np.array([3, 1])  # Initial water heights in both tanks
    x_ref = np.array([1, 1])  # Desired water heights in both tanks

    # Initialize the MPC controller with trained models
    mpc = MPC(model_tank1, model_tank2, Q, R, Np, Nc, x0)

    # Simulation loop
    x = x0
    for t in range(20):
        print(f"Time step {t}:")
        print(f"Current state (Tank heights): {x}")
        x, u = mpc.apply_control(x, x_ref)
        print(f"Control applied (Input flow rate): {u}")
        print(f"New state (Tank heights): {x}")
        print("-" * 30)

# Assume model_tank1 and model_tank2 are pre-trained models, replace them with actual trained models
class DummyModel:
    def predict(self, inputs):
        # This is a placeholder for your trained models, which should predict based on inputs
        return [inputs[0] * 0.9 + 0.1]  # Dummy behavior, just for illustration

# Run the example with dummy models
cascaded_tanks_example(DummyModel(), DummyModel())


In [None]:
import numpy as np
from scipy.optimize import minimize

class MPC:
    def __init__(self, model_tank1, model_tank2, Q2, R, Np, Nc, x0):
        """
        Initialize the MPC object.
        
        Args:
        model_tank1 (callable): Trained model to predict Tank 1 dynamics.
        model_tank2 (callable): Trained model to predict Tank 2 dynamics.
        Q2 (np.ndarray): State cost matrix for Tank 2 (h2).
        R (np.ndarray): Input cost matrix.
        Np (int): Prediction horizon.
        Nc (int): Control horizon.
        x0 (np.ndarray): Initial state (heights of both tanks).
        """
        self.model_tank1 = model_tank1
        self.model_tank2 = model_tank2
        self.Q2 = Q2  # Cost only for h2 (Tank 2)
        self.R = R
        self.Np = Np  # Prediction horizon
        self.Nc = Nc  # Control horizon
        self.x0 = x0  # Initial state

    def predict_state(self, x, u_sequence):
        """
        Predict the future state trajectory based on the current state and a sequence of control inputs.
        
        Args:
        x (np.ndarray): Current state (heights of both tanks).
        u_sequence (np.ndarray): Control input sequence.

        Returns:
        np.ndarray: Predicted state trajectory.
        """
        n = len(x)
        state_trajectory = np.zeros((self.Np + 1, n))
        state_trajectory[0] = x
        
        h1, h2 = x[0], x[1]
        
        for i in range(self.Np):
            u = u_sequence[i] if i < len(u_sequence) else u_sequence[-1]
            
            # Use the trained models to predict the next heights
            h1_next = self.model_tank1.predict([h1, h2, u])[0]  # Model prediction for Tank 1
            h2_next = self.model_tank2.predict([h1_next, h2])[0]  # Model prediction for Tank 2

            state_trajectory[i + 1] = [h1_next, h2_next]
            h1, h2 = h1_next, h2_next  # Update for the next prediction
        
        return state_trajectory

    def cost_function(self, u_sequence, x, x_ref):
        """
        The cost function to minimize, where we only penalize deviations of Tank 2 (h2).
        
        Args:
        u_sequence (np.ndarray): Control input sequence (flattened).
        x (np.ndarray): Current state of the system.
        x_ref (np.ndarray): Reference trajectory for Tank 2's height (h2).

        Returns:
        float: The cost for the given control input sequence.
        """
        # Reshape the control input sequence back into 2D form
        u_sequence = u_sequence.reshape(self.Nc, 1)
        
        state_trajectory = self.predict_state(x, u_sequence)
        cost = 0

        for i in range(self.Np):
            h2_diff = state_trajectory[i][1] - x_ref[1]  # Only consider Tank 2's height deviation
            u = u_sequence[i] if i < len(u_sequence) else u_sequence[-1]
            cost += h2_diff.T @ self.Q2 @ h2_diff + u.T @ self.R @ u
        
        return cost

    def solve(self, x, x_ref):
        """
        Solve the MPC optimization problem using scipy.optimize.minimize.
        
        Args:
        x (np.ndarray): Current state of the system.
        x_ref (np.ndarray): Reference trajectory for Tank 2's height.

        Returns:
        np.ndarray: Optimal control input sequence.
        """
        # Initial guess for the control input (a flat input sequence)
        u_initial = np.zeros(self.Nc)

        # Minimize the cost function using SciPy
        result = minimize(
            self.cost_function, 
            u_initial, 
            args=(x, x_ref), 
            method='SLSQP', 
            bounds=[(0, 255)] * self.Nc # Bound Input Space
        )

        if not result.success:
            raise ValueError("MPC optimization problem did not converge")

        # Return the first control input
        u_optimal = result.x
        return u_optimal[0]

    def apply_control(self, x, x_ref):
        """
        Apply control by solving the MPC and updating the state.
        
        Args:
        x (np.ndarray): Current state of the system.
        x_ref (np.ndarray): Reference trajectory for Tank 2's height.
        
        Returns:
        np.ndarray: New state after applying the control.
        """
        u_opt = self.solve(x, x_ref)
        h1_next = self.model_tank1.predict([x[0], x[1], u_opt])[0]
        h2_next = self.model_tank2.predict([h1_next, x[1]])[0]
        x_next = np.array([h1_next, h2_next])
        return x_next, u_opt

# Example problem for the cascaded tanks system controlling only Tank 2
def cascaded_tanks_example(model_tank1, model_tank2):
    # System parameters
    Q2 = np.array([[1]])  # Penalize only Tank 2's height
    R = np.array([[0.1]])  # Penalize input effort
    Np = 10  # Prediction horizon
    Nc = 3   # Control horizon
    x0 = np.array([3, 1])  # Initial water heights in both tanks
    x_ref = np.array([1, 1])  # Desired water height for Tank 2 (h2 = 1)

    # Initialize the MPC controller with trained models
    mpc = MPC(model_tank1, model_tank2, Q2, R, Np, Nc, x0)

    # Simulation loop
    x = x0
    for t in range(20):
        print(f"Time step {t}:")
        print(f"Current state (Tank heights): {x}")
        x, u = mpc.apply_control(x, x_ref)
        print(f"Control applied (Input flow rate): {u}")
        print(f"New state (Tank heights): {x}")
        print("-" * 30)

# Assume model_tank1 and model_tank2 are pre-trained models, replace them with actual trained models
class DummyModel:
    def predict(self, inputs):
        # This is a placeholder for your trained models, which should predict based on inputs
        return [inputs[0] * 0.9 + 0.1]  # Dummy behavior, just for illustration

# Run the example with dummy models
cascaded_tanks_example(DummyModel(), DummyModel())
