In [1]:
import time
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
from pybounds import Simulator, SlidingEmpiricalObservabilityMatrix, FisherObservability, SlidingFisherObservability, ObservabilityMatrixImage, colorline



# Define system dynamics and measurements
This example uses a model of an insect flying in the presence of wind.

See the following reference for details:

Floris van Breugel
A Nonlinear Observability Analysis of Ambient Wind Estimation with Uncalibrated Sensors, Inspired by Insect Neural Encoding
2021 60th IEEE Conference on Decision and Control (CDC)
DOI: 10.1109/CDC45484.2021.9683219

The system dynamics are described by seven primary states:
* altitude $z$
* parallel velocity $v_{\parallel}$
* perpendicular velocity $v_{\perp}$
* heading $\phi$
* angular velocity $\dot{\phi}$
* wind speed $w$
* wind direction $\zeta$

And the system dynamics are given by
$$
\dot{\mathbf{x}} = \begin{bmatrix} \dot{z} \\ \dot{v}_{\parallel} \\ \dot{v}_{\perp} \\ \dot{\phi} \\ \ddot{\phi} \\ \dot{w}  \\ \dot{\zeta} \end{bmatrix} = 
f(\mathbf{x}) = \begin{bmatrix} 
\dot{z} \\
\frac{1}{m}(k_{m_1}u_{\parallel} - C_{\parallel} a_{\parallel}) + v_{\perp} \dot{\phi} \\
 \frac{1}{m}(k_{m_3}u_{\perp} - C_{\perp} a_{\perp}) - v_{\parallel} \dot{\phi} \\
  \dot{\phi} \\
   \frac{1}{I}(k_{m_4}u_{\phi} - C_{\phi} \dot{\phi} + k_{m_2} u_{\perp}) \\
    \dot{w} \\
     \dot{\zeta} \\
\end{bmatrix}
$$

where the air velocity is given by

$$
\begin{bmatrix} a_{\parallel} \\ a_{\perp} \end{bmatrix} =  \begin{bmatrix} v_{\parallel} - w \cos(\phi - \zeta) \\ v_{\perp} + w \sin(\phi - \zeta) \end{bmatrix}
$$
The inputs $u_{\bullet}$ are
* parallel thrust force $u_{\parallel}$
* perpendicular thrust force $u_{\perp}$
* turning torque $u_{\phi}$

The inertia parameters (mass $m$ and inertia $I$), damping terms $C_{\bullet}$, and motor calibration coefficients $k_{m_{\bullet}}$ can also be considered states. Other auxiliary states, like the $x$ and $y$ position can also be added.

The putative system measurements are:
* heading $\phi$
* ground speed angle $\psi$
* apparent airflow angle $\gamma$
* apparent airflow magnitude $a$
* ground speed magnitude $g$
* optic flow $g/z$

Where the measurement function is given by:

$$
\mathbf{y} = h(\mathbf{x}) = \begin{bmatrix} \phi \\ \psi \\ \gamma  \\ a \\ g \\ r \end{bmatrix} = 
\begin{bmatrix} \phi \\
\arctan(v_{\perp}/ v_{\parallel}) \\
\arctan(a_{\perp} / a_{\parallel}) \\
\sqrt{a_{\parallel}^2 + a_{\perp}^2} \\
\sqrt{v_{\parallel}^2 + v_{\perp}^2} \\
\sqrt{v_{\parallel}^2 + v_{\perp}^2} / z \\
\end{bmatrix}
$$



## Define dynamics function
The dynamics function takes in a list of states $X$ and a list of inputs $U$ and outputs the derivative of the states.

The optional state & input names must be in the same order as the states & inputs in $X$ & $U$.

In [5]:
log_fname = '/src/data/wind_sensing/apparent_wind_visual_feedback/sw_dist_logstep_wind_0.001_debug_yes_vec_norm_train_actor_std/eval/plume_14421_37e2cd4be4c96943d0341849b40f81eb/constantx5b5.pkl'
# load pkl file
import pickle
# with open(log_fname, 'rb') as f_handle:
    # episode_logs = pickle.load(f_handle)
print('Loaded episode logs from', log_fname)
print('Number of episodes:', len(episode_logs))
print('Episodes contain:', episode_logs[0].keys())
print('For more info on pkl content see /src/JH_boilerplate/dev_test/env/explore_pkl.ipynb')

Loaded episode logs from /src/data/wind_sensing/apparent_wind_visual_feedback/sw_dist_logstep_wind_0.001_debug_yes_vec_norm_train_actor_std/eval/plume_14421_37e2cd4be4c96943d0341849b40f81eb/constantx5b5.pkl
Number of episodes: 240
Episodes contain: dict_keys(['trajectory', 'observations', 'actions', 'rewards', 'infos', 'activity'])


In [None]:
# an example of how actions are resolved
# # head_direction_t0 = head_direction[0] # norm'd by pi
# head_direction_t1 = head_direction[1] # norm'd by pi
# agent_angle_complex_vec_t0 = episode_log['infos'][0][0]['angle']
# agent_angle_complex_vec_t1 = episode_log['infos'][1][0]['angle']
# print(f"agent_angle_t0: {agent_angle_complex_vec_t0}")
# print(f"agent_angle_t1: {agent_angle_complex_vec_t1}")

# agent_angle_rad_t0 = np.angle(agent_angle_complex_vec_t0[0] + 1j*agent_angle_complex_vec_t0[1], deg=False) # not norm'd by pi 0.30103736300307615
# agent_angle_rad_t1 = np.angle(agent_angle_complex_vec_t1[0] + 1j*agent_angle_complex_vec_t1[1], deg=False) # not norm'd by pi 0.6844864019832495
# print(f"t0 angle rad = {agent_angle_rad_t0}")
# print(f"t1 angle rad = {agent_angle_rad_t1}")


# action_t0 = episode_log['actions'][0]
# action_t1 = episode_log['actions'][1]
# action_t0_sq = np.clip((np.tanh(action_t0) + 1)/2, 0.0, 1.0) # squash action
# action_t1_sq = np.clip((np.tanh(action_t1) + 1)/2, 0.0, 1.0) # squash action
# turn_action_t0 = action_t0_sq[1]
# turn_action_t1 = action_t1_sq[1]
# # turn_action = 1 - turn_action # invert turn action
# # print(f"turn_action: {turn_action}")
# # # print(f"action_t0_sq: {action_t0_sq}")
# turn_capacity = 6.25*np.pi
# turnx = 1
# dt = env_dt = 0.04
# # print(f"turn by {turn_capacity*turnx*(turn_action_t1 - 0.5)*dt} rad")
# my_agent_angle_rad_t1 = agent_angle_rad_t0 + turn_capacity*turnx*(turn_action_t1 - 0.5)*dt # in radians; (Turn~[0, 1], with 0.5 = no turn, <0.5 turn cw, >0.5 turn ccw)
# print(f"sim new angle rad = {my_agent_angle_rad_t1}")




In [14]:
# episode_logs[0]['trajectory'] # x, y locations 
# episode_logs[0]['observations'] # 
# episode_logs[0]['actions'] # needs to be squashed 
episode_logs[0]['infos'][0][0].keys() # needs to be squashed 


dict_keys(['t_val', 'tidx', 'flipx', 'location', 'location_last', 'location_initial', 'stray_distance', 'ambient_wind', 'angle', 'reward', 'r_radial_step', 'movex', 'done', 'radiusx', 'air_velcity', 'ground_velocity', 'wind_obs', 'odor_obs', 'allocentric_head_direction', 'egocentric_course_direction'])

In [2]:
state_names = ['x',  # x position [m]
               'y',  # y position [m]
               'v_para',  # parallel ground velocity [m/s]
               'v_perp',  # perpendicular ground velocity [m/s]
               'phi', # heading [rad]
               'phi_dot',  # angular velocity [rad/s]
               'w',  # ambient wind speed [m/s]
               'zeta',  # ambient wind angle [rad]
               ]

input_names = ['u_para',  # translational speed [m/s]
               'u_phi'  # angular velocity [rad/s]
               ] 
def f(X, U):
    '''
    Return Xdot given X and U
    
    X: state vector
        v_para: ground velocity in the direction parallel to head direction (egocentric frame) [m/s]
        v_perp: ground velocity perpendicular to head direction (egocentric frame) [m/s]
        phi: heading [rad]
        w: wind speed [m/s]
        zeta: wind angle [rad]
    U: input vector
        u_para: translational speed [m/s]
        u_phi: angular velocity [rad/s]
        u_para_dot: translational acceleration [m/s^2]
    '''
    v_para, v_perp, phi, phi_dot, w, zeta = X # TODO calculate phi_dot from agent trajectory
    
    # Inputs
    u_para, u_phi, u_para_dot = U 
    
    # Air velocity - known from agent trajectory no need to calculate
    # a_para = v_para - w * np.cos(phi - zeta)
    # a_perp = v_perp + w * np.sin(phi - zeta)
    # air_velocity_para = u_speed
    
    # movement due to wind, in the agent frame
    Vx_wind_ego = w*np.sin(zeta - phi)  # wind speed perpendicular to HD = W*sin(zeta - phi)
    Vy_wind_ego = w*np.cos(zeta - phi)  # wind speed parallel to HD = W*cos(zeta - phi)
    
    # Ground velocity in the agent frame
    Vx_ego = Vx_wind_ego
    Vy_ego = u_speed + Vy_wind_ego # ground speed parallel to HD = U_s + W*sin(zeta) 
    Vx_ego_dot = -w * np.cos(phi - zeta) * phi_dot + w * np.cos(phi - zeta) * zeta_dot
    Vy_ego_dot = -w * np.sin(phi - zeta) * phi_dot + w * np.cos(phi - zeta) * zeta_dot + u_speed_dot
    # Agent action
    x_old, y_old = x, y
    u_x = u_para * np.cos(phi) * 2.0 # Max agent speed in m/s
    x = x_old + a_para * np.cos(phi)
    y = y_old + a_para * np.sin(phi)
    a = phi * 6.25*np.pi # Max agent CW/CCW turn per second
    
    # Agent heading
    phi_dot = u_phi

    # Other dynamics
    x_dot = v_para * np.cos(phi) - v_perp * np.sin(phi)
    y_dot = v_para * np.sin(phi) + v_perp * np.cos(phi)
    w_dot = 0*x # wind speed is constant
    zeta_dot = 0*x # wind angle is constant

    # Package and return xdot
    x_dot = [x_dot, y_dot, phi_dot, w_dot, zeta_dot]

    return x_dot

## Define measurement function
The measurement function takes in a list of states $X$ and a list of inputs $U$ and outputs the measurements $Y$.

The optional measurement names must be in the same order as the measurements in $Y$.

In [4]:
measurement_names = ['phi', 'psi', 'gamma']
def h(X, U):
    '''
    Measurement functions - input is the state and control input; output is the measurement
    v_para: ground velocity in the direction parallel to head direction (egocentric frame) [m/s]
    v_perp: ground velocity perpendicular to head direction (egocentric frame) [m/s]
    phi: heading [rad]
    w: wind speed [m/s]
    zeta: wind angle [rad]
    '''
    # States
    v_para, v_perp, phi, w, zeta = X
    # x, y, v_para, v_perp, phi, phi_dot, w, zeta = X # not sure why I'd include x and y here 

    # Inputs
    u_para, u_phi, u_para_dot = U
    
    # Air velocity
    # a_para = v_para - w * np.cos(phi - zeta)
    # a_perp = v_perp + w * np.sin(phi - zeta)
    # a = np.sqrt(a_para ** 2 + a_perp ** 2) # air velocity magnitude
    # gamma = np.arctan2(a_perp, a_para)  # air velocity angle
    
    # Apparent airflow
    A_para = -u_para # equal and opposite to translational speed which is in line with head direction and thus have only a parallel component
    
    # Course direction in fly reference frame
    # g = np.sqrt(v_para ** 2 + v_perp ** 2) # ground speed
    psi = np.arctan2(v_perp, v_para) # drift angle / egocentric course angle # TODO numerically check if this is actually the drift angle - yes according to Ben
    
    # Unwrap angles TODO what is this?
    if np.array(phi).ndim > 0:
        if np.array(phi).shape[0] > 1:
            phi = np.unwrap(phi)
            psi = np.unwrap(psi)
            # gamma = np.unwrap(gamma)

    # Measurements
    Y  = [phi, A_para, psi]

    # Return measurement
    return Y


## Set time-step

In [5]:
dt = 0.04  # [s]

# Create simulator object

In [6]:
simulator = Simulator(f, h, dt=dt, state_names=state_names, input_names=input_names, measurement_names=measurement_names)

# Can also set the number of state (n) & inputs (m0 instead of state & input names)
# simulator = Simulator(f, h, dt=dt, n=len(state_names), m=len(input_names))

# Set up model predictive control

In [7]:
# # Parameters in SI units
# m = 0.25e-6  # [kg]
# I = 5.2e-13  # [N*m*s^2] yaw mass moment of inertia: 10.1242/jeb.02369
# # I = 4.971e-12  # [N*m*s^2] yaw mass moment of inertia: 10.1242/jeb.038778
# C_phi = 27.36e-12  # [N*m*s] yaw damping: 10.1242/jeb.038778
# C_para = m / 0.170  # [N*s/m] calculate using the mass and time constant reported in 10.1242/jeb.098665
# C_perp = C_para  # assume same as C_para

# # Scale Parameters
# m = m * 1e6  # [mg]
# I = I * 1e6 * (1e3) ** 2  # [mg*mm/s^2 * mm*s^2]
# C_phi = C_phi * 1e6 * (1e3) ** 2  # [mg*mm/s^2 *m*s]
# C_para = C_para * 1e6  # [mg/s]
# C_perp = C_perp * 1e6  # [mg/s]

In [8]:
# # Define the set-point(s) to follow
# tsim = np.arange(0, 0.4, step=dt)

# setpoint = {'x': 0.0 * np.ones_like(tsim),
#             'y': 0.0 * np.ones_like(tsim),
#             'z': 0.2 * np.ones_like(tsim),
#             'v_para': 0.3 * np.ones_like(tsim) + 0.01*tsim,
#             'v_perp': 0.01 * np.ones_like(tsim) + 0.01*tsim,
#             'phi': (np.pi/4) * np.ones_like(tsim),
#             'phi_dot': 0.0*np.ones_like(tsim),
#             'w': 0.4 * np.ones_like(tsim),
#             'zeta': (np.pi) * np.ones_like(tsim),
#             'm': m * np.ones_like(tsim),
#             'I': I * np.ones_like(tsim),
#             'C_para': C_para * np.ones_like(tsim),
#             'C_perp': C_perp * np.ones_like(tsim),
#             'C_phi': C_phi * np.ones_like(tsim),
#             'km1': 1.0 * np.ones_like(tsim),
#             'km2': 0.0 * np.ones_like(tsim),
#             'km3': 1.0 * np.ones_like(tsim),
#             'km4': 1.0 * np.ones_like(tsim),
#             }

# # Add a turn
# setpoint['phi'][20:] = setpoint['phi'][20] - np.pi/2

# # Update the simulator set-point
# simulator.update_dict(setpoint, name='setpoint')

In [9]:
# # Define cost function: penalize the squared error between parallel & perpendicular velocity and heading
# cost = ((simulator.model.x['v_para'] - simulator.model.tvp['v_para_set']) ** 2 + 
#         (simulator.model.x['v_perp'] - simulator.model.tvp['v_perp_set']) ** 2 +
#         (simulator.model.x['phi'] - simulator.model.tvp['phi_set']) ** 2)

# # Set cost function
# simulator.mpc.set_objective(mterm=cost, lterm=cost)

In [10]:
# # Set input penalty: make this small for accurate state following
# simulator.mpc.set_rterm(u_para=1e-6, u_perp=1e-6, u_phi=1e-6)

# Run model predictive control

In [None]:
# st = time.time()
# t_sim, x_sim, u_sim, y_sim = simulator.simulate(x0=None, mpc=True, return_full_output=True)
# et = time.time()
# print('elapsed time:', et-st)

In [None]:
# # Plot set-point tracking
# # Note that the cost function defined above & the controllability of each state will determine if a set-point is tracked or not
# simulator.plot('setpoint')

In [None]:
# # Plot inputs
# simulator.plot('u')

# Re-run simulator with MPC inputs
This step isn't required, but it shows how the closed-loop MOC inputs can be replayed in open-loop to get the same state trajectory

In [13]:
t_sim, x_sim, u_sim, y_sim = simulator.simulate(x0=None, mpc=False, u=u_sim, return_full_output=True)

In [None]:
# Plot state
simulator.plot('x')

# Observability

## Construct observability matrix in sliding windows

In [15]:
w = 4  # window size, set to None to use entire time-series as one window

In [None]:
# Construct O in sliding windows
st = time.time()
SEOM = SlidingEmpiricalObservabilityMatrix(simulator, t_sim, x_sim, u_sim, w=w, eps=1e-4)
et = time.time()
print('elapsed time:', et-st)

In [17]:
# Get O's
O_sliding = SEOM.get_observability_matrix()

In [None]:
n_window = len(O_sliding)
print(n_window, 'windows')

In [None]:
# Visualize first sliding observability matrix, this will throw errors if O is too big
OI = ObservabilityMatrixImage(O_sliding[0], cmap='bwr')
OI.plot(scale=1.0)

## Compute Fisher information matrix & inverse for first window


In [20]:
# Set each sensor noise level
sensor_noise = {'phi': 0.1, 'psi': 0.1, 'gamma': 0.1, 'a': 0.1, 'g': 0.1, 'r': 0.1}

In [21]:
# Compute the Fisher information & Chernoff inverse
FO = FisherObservability(SEOM.O_df_sliding[0], R=None, sensor_noise_dict=sensor_noise, lam=1e-6)

# Can also set R directly as matrix or as scalar
# FO = FisherObservability(SEOM.O_df_sliding[0], R=0.1*np.eye(O_sliding[0].shape[0]), lam=1e-6)
# FO = FisherObservability(SEOM.O_df_sliding[0], R=0.1, lam=1e-6)

In [None]:
# Get the Fisher information, inverse, and R matrix
F, F_inv, R = FO.get_fisher_information()
F_inv

## Compute Fisher information matrix & inverse for each sliding window

In [23]:
# Choose sensors to use from O
o_sensors = ['phi', 'psi', 'gamma']

# Chose states to use from O
o_states = ['v_para', 'v_perp', 'phi', 'phi_dot', 'w', 'zeta', 'z', 'm', 'I', 'C_para', 'C_perp', 'C_phi']

# Choose time-steps to use from O
window_size = 4
o_time_steps = np.arange(0, window_size, step=1)

In [24]:
# Compute the Fisher information & inverse for each window and store the minimum error variance
SFO = SlidingFisherObservability(SEOM.O_df_sliding, time=SEOM.t_sim, lam=1e-6, R=None, sensor_noise_dict=sensor_noise,
                                 states=o_states, sensors=o_sensors, time_steps=o_time_steps, w=None)

In [25]:
# Pull out minimum error variance, 'time' column is the time vector shifted forward by w/2 and 'time_initial' is the original time
EV_aligned = SFO.get_minimum_error_variance()

In [None]:
EV_aligned

In [None]:
# Visualize observability matrix subset
OI = ObservabilityMatrixImage(SFO.FO[-1].O)
OI.plot()

# Plot error variance as color on state time-series

In [None]:
EV_no_nan = EV_aligned.fillna(method='bfill').fillna(method='ffill')

In [None]:
states = list(SFO.FO[0].O.columns)
n_state = len(states)

fig, ax = plt.subplots(n_state, 2, figsize=(6, n_state*2), dpi=150)
ax = np.atleast_2d(ax)

cmap = 'inferno_r'

min_ev = np.min(EV_no_nan.iloc[:, 2:].values)
max_ev = np.max(EV_no_nan.iloc[:, 2:].values)

log_tick_high = int(np.ceil(np.log10(max_ev)))
log_tick_low = int(np.floor(np.log10(min_ev)))
cnorm = mpl.colors.LogNorm(10**log_tick_low, 10**log_tick_high)

for n, state_name in enumerate(states):
    # colorline(t_sim, x_sim[state_name], EV_no_nan[state_name].values, ax=ax[n, 0], cmap=cmap, norm=cnorm)
    colorline(x_sim['x'], x_sim['y'], EV_no_nan[state_name].values, ax=ax[n, 0], cmap=cmap, norm=cnorm)
    colorline(t_sim, EV_no_nan[state_name].values, EV_no_nan[state_name].values, ax=ax[n, 1], cmap=cmap, norm=cnorm)

    # Colorbar
    cax = ax[n, -1].inset_axes([1.03, 0.0, 0.04, 1.0])
    cbar = fig.colorbar(mpl.cm.ScalarMappable(norm=cnorm, cmap=cmap), cax=cax,
                        ticks=np.logspace(log_tick_low, log_tick_high, log_tick_high-log_tick_low + 1))
    cbar.set_label('min. EV: ' + state_name, rotation=270, fontsize=7, labelpad=8)
    cbar.ax.tick_params(labelsize=6)
    
    ax[n, 0].set_ylim(np.min(x_sim['y']) - 0.01, np.max(x_sim['y']) + 0.01)
    ax[n, 0].set_xlim(np.min(x_sim['x']) - 0.01, np.max(x_sim['x']) + 0.01)
    ax[n, 0].set_ylabel('y', fontsize=7)
    ax[n, 0].set_xlabel('x', fontsize=7)
    ax[n, 0].set_aspect(1.0)

    ax[n, 1].set_ylim(10**log_tick_low, 10**log_tick_high)
    ax[n, 1].set_yscale('log')
    ax[n, 1].set_ylabel('min. EV: ' + state_name, fontsize=7)
    ax[n, 1].set_yticks(np.logspace(log_tick_low, log_tick_high, log_tick_high-log_tick_low + 1))


for a in ax.flat:
    a.tick_params(axis='both', labelsize=6)
    
for a in ax[:, 1]:
    a.set_xlabel('time (s)', fontsize=7)
    a.set_xlim(-0.1, t_sim[-1] + 0.1)
    
# for a in ax[:, 1]:
#     a.set_xlim(-0.1, t_sim[-1] + 0.1)

fig.subplots_adjust(left=None, bottom=None, right=None, top=None, wspace=0.3, hspace=0.4)

plt.show()