# Double integrator

The goal is to control the movement of a object on the $x$-axis in order to bring it to a stop at $x = 0$. The state of the object at time $t$ is given by its position $x$ and its velocity $\dot{x}$, i.e. the vector $\begin{bmatrix} x_t \\ \dot{x}_t \end{bmatrix}$. The control input is the acceleration $a$ applied to the object. 

The dynamics of the object are given by the following differential equation:
$$ x_{t+1} = x_t + \dot{x}_t \text{d}t $$
$$ \dot{x}_{t+1} = \dot{x}_t + a_t \text{d}t$$

This example serves to illustrate how to discretize a model with continuous state and action spaces in a way that is compatible with the *madupite* library in order to solve for its optimal control policy.

In [None]:
import numpy as np
from scipy.sparse import csr_matrix
from matplotlib import pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML

## Preprocessing

We start by declaring a few constants and model parameters.

In [None]:
NUM_X = 61
NUM_XD = 71
NUM_A = 41
MAX_X = 3.0
MAX_XD = 4.0
MAX_A = 1.0
DT = 0.1

# derived constants - do not modify
NUM_STATES = NUM_X * NUM_XD
NUM_ACTIONS = NUM_A

x_vals = np.linspace(-MAX_X, MAX_X, NUM_X)
xd_vals = np.linspace(-MAX_XD, MAX_XD, NUM_XD)
a_vals = np.linspace(-MAX_A, MAX_A, NUM_A)

Some helper functions that we need later. We use subscript `i` to denote indices (int) and `v` to denote values (float) as well as `t` ($t$) and `tpp` ($t+1$) for better readability.

In [None]:
def x2s(x, xd):
    """Convert [x, xd] to 1-dim state (row-major)."""
    return int(x * NUM_XD + xd)

def s2x(s):
    """Convert 1-dim state to [x, xd]."""
    return s // NUM_XD, s % NUM_XD

def interpolate(x, y, grid_x, grid_y):
    """
    Bilinear interpolation of a 2D grid at point (x, y).
    Returns the indices of the 4 grid points surrounding (x, y) and their weights.
    """
    x_i = np.searchsorted(grid_x, x, side='right')
    y_i = np.searchsorted(grid_y, y, side='right')

    x_i = np.clip(x_i, 1, len(grid_x) - 1)
    y_i = np.clip(y_i, 1, len(grid_y) - 1)

    xl_v, xr_v = grid_x[x_i-1], grid_x[x_i]
    yl_v, yr_v = grid_y[y_i-1], grid_y[y_i]

    wx1 = (x - xl_v) / (xr_v - xl_v)
    wx0 = 1 - wx1
    wy1 = (y - yl_v) / (yr_v - yl_v)
    wy0 = 1 - wy1

    indices = [(x_i-1, y_i-1), (x_i, y_i-1), (x_i-1, y_i), (x_i, y_i)]
    weights = [wx0*wy0, wx1*wy0, wx0*wy1, wx1*wy1]

    return indices, weights

Next, we define the model dynamics, i.e. calculate the next state given the current state and action.

In [None]:
def step(s, a):
    """
    Calculate the next state for a given state-action pair.
    
    Parameters:
    s: int, current state index(!) (value in [0, NUM_STATES))
    a: int, action index (value in [0, NUM_ACTIONS))

    Returns:
    x_next, xd_next: float(!), coordinate values of the next state in [-MAX_X, MAX_X] and [-MAX_XD, MAX_XD]
    """
    x_t_i, xd_t_i = s2x(s)
    x_t_v = x_vals[x_t_i]
    xd_t_v = xd_vals[xd_t_i]
    a_t_v = a_vals[a]

    x_tpp_v = np.clip(x_t_v + xd_t_v * DT, -MAX_X, MAX_X)
    xd_tpp_v = np.clip(xd_t_v + a_t_v * DT, -MAX_XD, MAX_XD)

    return x_tpp_v, xd_tpp_v   

Now we can construct the transition probability matrix $P$. As described in [TODO], the following indexing scheme is used:
$$ (P)_{s\cdot |\mathcal{A}|+a, s'} = P(s' | s, a) $$

That is, the matrix has the shape $(|\mathcal{S}|\cdot |\mathcal{A}|) \times |\mathcal{S}|$. For the sake of efficiency, we will use a sparse matrix representation.

In [None]:
data = []
row_idx = []
col_idx = []

for s_t_i in range(NUM_STATES):
    for a_t_i in range(NUM_ACTIONS):
        x_tpp_v, xd_tpp_v = step(s_t_i, a_t_i)
        indices, weights = interpolate(x_tpp_v, xd_tpp_v, x_vals, xd_vals)

        for (x_tpp_i, xd_tpp_i), w in zip(indices, weights):
            s_tpp_i = x2s(x_tpp_i, xd_tpp_i)
            data.append(w)
            row_idx.append(s_t_i * NUM_ACTIONS + a_t_i)
            col_idx.append(s_tpp_i)

        # Print progress percentage
        # if((s * NUM_ACTIONS + a) % 1000 == 0):
            # print(f'{(s * NUM_ACTIONS + a) / (NUM_STATES * NUM_ACTIONS) * 100:.2f}%')
            # print(f"{(s * NUM_ACTIONS + a) / (NUM_STATES * NUM_ACTIONS) * 100:.2f}%, ({s * NUM_ACTIONS + a}/{NUM_STATES * NUM_ACTIONS} rows)", end="\r", flush=True)

P = csr_matrix((data, (row_idx, col_idx)), shape=(NUM_STATES * NUM_ACTIONS, NUM_STATES))

print(f"{min(data)=}, {max(data)=}")

Next, we define the stage cost function. In this case, the goal is to get the object to stop at $x = 0$, i.e. $x = 0$ and $\dot{x} = 0$. We can define the stage cost as the squared distance to the goal, which makes the function independent of the action.

The stage cost matrix $g$ is of dimension $|\mathcal{S}| \times |\mathcal{A}|$.

In [None]:
def stage_cost(x_i, xd_i):
    return x_vals[x_i]**2 + xd_vals[xd_i]**2

# def stage_cost(x_i, xd_i):
#     return 0.0 if (x_vals[x_i]**2 + xd_vals[xd_i]**2) < 0.05 else 1.0

g_dense = np.empty((NUM_STATES, NUM_ACTIONS))
for s in range(NUM_STATES):
    x_t_i, xd_t_i = s2x(s)
    for a in range(NUM_ACTIONS):
        g_dense[s, a] = stage_cost(x_t_i, xd_t_i)

    # Print progress
    # print(f"{(s) / (NUM_STATES) * 100:.2f}%, ({s}/{NUM_STATES} rows)", end="\r", flush=True)

# convert to CSR (TODO, only until madupite can only load sparse matrices)
g = csr_matrix(g_dense)

Now we save the matrices to file in order to load them in madupite:

In [None]:
def write_sparse_matrix_as_petsc_binary(sparse_matrix, filename):
    """write dense matrix in petsc binary format

    Args:
        sparse_matrix (csr_matrix): sparse matrix
        filename (string): filename
    """
    sparse_matrix.sort_indices()

    with open(filename, "wb") as f:
        f.write(b"\x00\x12\x7b\x50")  # PETSc binary matrix class id
        f.write(np.array(sparse_matrix.shape, dtype=">i").tobytes())  # rows and cols
        f.write(np.array(sparse_matrix.nnz, dtype=">i").tobytes())  # nnz
        f.write(np.array(np.diff(sparse_matrix.indptr), dtype=">i").tobytes())  # row pointer
        f.write(np.array(sparse_matrix.indices, dtype=">i").tobytes())  # column indices
        f.write(np.array(sparse_matrix.data, dtype=">d").tobytes())  # values

write_sparse_matrix_as_petsc_binary(P, f"di_P_{NUM_STATES}_{NUM_ACTIONS}.bin")
write_sparse_matrix_as_petsc_binary(g, f"di_g_{NUM_STATES}_{NUM_ACTIONS}.bin")

print(f"saved to di_P_{NUM_STATES}_{NUM_ACTIONS}.bin and di_g_{NUM_STATES}_{NUM_ACTIONS}.bin")

# Optionally save as txt to debug/inspect (not recommended for large matrices):
# np.savetxt(f"di_P_{num_states}_{num_actions}.txt", P.toarray(), fmt="%.2f")
# np.savetxt(f"di_g_{NUM_STATES}_{NUM_ACTIONS}.txt", g.toarray(), fmt="%.2f")

# [Compute with madupite]

In [None]:
import subprocess
import os

flags = ["-file_stats", "di_stats.json", "-file_cost", "di_cost.out", "-file_policy", "di_policy.out", "-num_states", str(NUM_STATES), "-num_actions", str(NUM_ACTIONS)]
build_dir = "/home/robin/repos/madupite/build"
try:
    result = subprocess.run(["./continuous", *flags], cwd=build_dir, capture_output=True, check=True, text=True)
    print("Output:", result.stdout)
except subprocess.CalledProcessError as e:
    print("Error occurred:")
    print("Return code:", e.returncode)
    print("Output:", e.output)
    print("Error output:", e.stderr)

## Postprocessing

In [None]:
def reshape_data(data, shape):
    return np.rot90(data.reshape(shape), k=1)

def create_tick_labels(values, num_ticks):
    indices = np.linspace(0, len(values) - 1, num_ticks, dtype=int)
    return indices, [f"{val:.2f}" for val in values[indices]]

def plot_heatmap(ax, data, title, x_val, xd_val):
    im = ax.imshow(data, cmap='jet', interpolation='nearest')
    ax.set_title(title)
    ax.set_ylabel("$\dot{x}$")
    ax.set_xlabel("$x$")
    
    yticks, yticklabels = create_tick_labels(xd_val[::-1], 5)
    xticks, xticklabels = create_tick_labels(x_val, 5)
    
    ax.set_yticks(yticks)
    ax.set_xticks(xticks)
    ax.set_yticklabels(yticklabels)
    ax.set_xticklabels(xticklabels)
    
    return im

In [None]:
costs = reshape_data(np.loadtxt("../../build/di_cost.out"), (NUM_X, NUM_XD))
policy = np.loadtxt("../../build/di_policy.out", dtype=int)
user_input = reshape_data(a_vals[policy], (NUM_X, NUM_XD))

# Plotting
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5))

im1 = plot_heatmap(ax1, costs, "Costs", x_vals, xd_vals)
im2 = plot_heatmap(ax2, user_input, "Policy", x_vals, xd_vals)

fig.colorbar(im1, ax=ax1)
fig.colorbar(im2, ax=ax2)

plt.show()

## Simulate state trajectory
using the optimal policy induced Markov chain.

In [None]:
# create P_pi
row_indices = np.arange(NUM_STATES) * NUM_ACTIONS + policy
P_pi = P[row_indices, :]

def create_initial_state(x_0_v, xd_0_v):
    indices, weights = interpolate(x_0_v, xd_0_v, x_vals, xd_vals)
    s0 = np.zeros(NUM_STATES)
    for (x_i, xd_i), w in zip(indices, weights):
        s0[x2s(x_i, xd_i)] = w
    return s0

def simulate(s0, P_pi, T):
    trajectory = []
    s = s0
    trajectory.append(s2x(np.argmax(s)))
    for t in range(T):
        s = s.T @ P_pi
        trajectory.append(s2x(np.argmax(s))) # round to next discrete state for visualization
    return trajectory

def create_animation(traj, ax1, ax2):
    line1, = ax1.plot([], [], 'r-', lw=2)
    line2, = ax2.plot([], [], 'r-', lw=2)
    
    def init():
        line1.set_data([], [])
        line2.set_data([], [])
        return line1, line2

    def animate(i):
        x, y = zip(*traj[:i+1])
        line1.set_data(x, y)
        line2.set_data(x, y)
        return line1, line2

    return animation.FuncAnimation(ax1.figure, animate, init_func=init, 
                                   frames=len(traj), interval=500, blit=True)


In [None]:
x_0_v = 1.5
xd_0_v = -2.75
T = 100

s0 = create_initial_state(x_0_v, xd_0_v)
trajectory = simulate(s0, P_pi, T)

In [None]:
ani = create_animation(trajectory, ax1, ax2)
HTML(ani.to_jshtml())