# Classic human-in-the-loop tracking with n4sid and Machine learning

Rarely do I read articles about system identification and machine learning. When I learned system identification, it was sort of like the less exciting cousin of control theory, not as exciting as optimal control theory but a practical numerical tool.

Classic human-in-the-loop tracking problems, like how to model the behavior of a human operator, can be modeled by using a 'black-box' modeling algorithm called n4sid that predicts the human operator behavior from information that they saw and information that they did. The signal that the person sees is considered the input and the signal that the person performs is considered the output. n4sid projects the input to the output, this is called a transfer function matrix, such that significant ode coefficients are identified and estimated, thus creating a model that represents how the human used the visual information to produce their output.

Thinking about machine learning and n4sid, a ML regression model could be used to predict the same human operator behavior, where the features are a clever mixture of what the person saw and did.

In this post, I outline how the n4sid model works from a numerical and state-space control theory perspective. This formulation may not give exact results like MATLAB's n4sid function, because of the way that I estimate the ode, but the steps are correct. There are many different formulations of n4sid, the most popular formulations are from a matrix analysis and state-space control theory perspective (System Identification Theory for the User by Lennart Ljung).

<img src="splash_n4sid.png" alt="Drawing" style="width: 500px;"/>

In [1]:
import numpy as np
import math

# Plotting
import plotly.graph_objects as go

# Personal python functions
import sys
sys.path.insert(1, 'C:\\Users\\jamilah\\Documents\\Subfunctions_python')

from make_a_properlist import *
from n4sid_prediction.optimize_n4sid_settings import *

import warnings
warnings.filterwarnings('ignore')

## Create a input and output signals

Let's say that a human operator sees a ball bouncing on a computer screen like the input signal, they try to copy the same ball bouncing trajectory and create the output signal.  We then want to build a model describing how the human copied the bouncing ball with respect to the original ball bounce, so that we can test if the human can copy other ball bouncing trajectories without having to test the operator again in real-life.

In the figure below, the green line in the ball bouncing and the blue line is the human operator copying the bouncing ball.

In [3]:
# ----------------
# Toy problem
# ----------------
fs = 10  # sampling frequency
ts = 1/fs  # sampling time

start_val = 0  # secs
stop_val = 10  # secs

N = int(fs*stop_val)  # number of sample points
t = np.multiply(range(start_val, N), ts)

# --------------------
# Let's make-up the input signal
A = 2  # peak amplitude

# The larger omega the more waves
# Find omega per cycle
omega = (2*math.pi)/fs  # carrier frequency OR natural frequency (degrees)

# Choose how many cycles you want per time t
num_of_cycles = 4
omega = num_of_cycles*omega

phi = 0  # degrees
c = 0  # vertical shift
inputs = A * np.sin(omega*(t - phi)) + c
inputs0 = np.array(make_a_properlist(inputs))
# --------------------

# --------------------
# Let's make-up the output signal
A = 1  # peak amplitude

# The larger omega the more waves
# Find omega per cycle
omega = (2*math.pi)/fs  # carrier frequency OR natural frequency (degrees)

# Choose how many cycles you want per time t
num_of_cycles = 2
omega = num_of_cycles*omega

phi = 0  # degrees
c = 0  # vertical shift
outputs = A * np.sin(omega*(t - phi)) + c
outputs0 = np.array(make_a_properlist(outputs))
# --------------------

fig = go.Figure()
config = dict({'scrollZoom': True, 'displayModeBar': True, 'editable': True})

fig.add_trace(go.Scatter(x=t, y=inputs0, name='ball bounce', line = dict(color='green', width=2, dash='dash'), showlegend=True))
fig.add_trace(go.Scatter(x=t, y=outputs0, name='human operator', line = dict(color='blue', width=2, dash='dash'), showlegend=True))

fig.update_layout(title='signals', xaxis_title='time', yaxis_title='signal')
fig.show(config=config)

## Step 1: normalize input and output signals for the coefficient prediction

In [None]:
# Individual input and output normalization
inputs_norm = inputs_org/max(abs(inputs_org))
outputs_norm = outputs_org/max(abs(outputs_org))

## Step 2 : Estimate the projection coefficients

In this step we map the input points (u(s)) onto the output points (y(s)).

y(s) = g(s)u(s)
where g(s) = h1*s^-1 + h2*s^-2 + h3*s^-3 + ... + hN*s^-N

The projection is not multiplying a previous input point by a scalar to get a future output point.  The projection is dilating/stretching each input point by some scalar and summing them all up to get each output point. 

We use the Markov parameters to compute the Hankel matrix parameters, which are the coefficients of the transfer function matrix (g(s)); represented above as h1, h2, ..., hN and called H in total.  H is the first row of the Hankel matrix and they are the Markov parameters, the Hankel matrix is another way to solve for the h1, h2, ..., hN projection coefficients.

In [None]:
m = N  # the number of markov parameters
H = control.markov(outputs_norm, inputs_norm, m)
H = make_a_properlist(H)
mapping_coefs = H

## Step 3: Transform the power series (projection coefficients) into a transfer function

Put the coefficients into a state-space/transfer function form, we assume that the transfer function order is 2nd order and we use the already proved canonical controllable form numerical method to transform power series to a transfer function.

There are other ways to do this, but using the canonical controllable form is straight-forward.

In [None]:
# Symbolic math Library
import sympy as sym
from sympy import Matrix, symbols

# Simplification of full form, by assuming the form is canonical controllable form :
a1 = 0
a2 = 1

c1 = 1
c2 = 0

a3, a4, b1, b2 = symbols('a3 a4 b1 b2')

A = Matrix([[a1, a2],[a3, a4]])
# print('A', str(A))
# print('size of A : ', str(A.shape))

B = Matrix([b1, b2]) # 2x1 matrix
# print('B', str(B))
# print('size of B : ', str(B.shape))

C = Matrix([c1, c2])# 2x1 matrix, but also Matrix([[c1], [c2]]) creates a 2x1 matrix
# print('C.T', str(C.T)) # change to a 1x2
# print('size of C.T : ', str(C.T.shape))

# Only four matrix equations are needed in algebraic form :
# print('C.T*B : ', str(C.T*B))

# print('C.T*A*B : ', str(C.T*A*B))

# print('C.T*A*A*B : ', str(C.T*A*A*B))

# print('C.T*A*A*A*B : ', str(C.T*A*A*A*B))

# C.T*B :  Matrix([[b1]])
# C.T*A*B :  Matrix([[b2]])
# C.T*A*A*B :  Matrix([[a3*b1 + a4*b2]])
# C.T*A*A*A*B :  Matrix([[a3*a4*b1 + b2*(a3 + a4**2)]])

# --------------------

T00, T01, T02, T03 = sym.symbols('T00 T01 T02 T03')

out = sym.solve(( b1 - T00, b2 - T01, a3*b1 + a4*b2 - T02, a3*a4*b1 + b2*(a3 + a4**2) - T03 ), (b1, b2, a3, a4))

After solving the the 
out: [(T00, 

T01, 

-(T01*T03 - T02**2)/(T00*T02 - T01**2), 

(T00*T03 - T01*T02)/(T00*T02 - T01**2)

)]

Which are the coefficients of the 2nd order transfer function in terms of state-space
b1 = T00

b2 = T01

a3 = -(T01*T03 - T02**2)/(T00*T02 - T01**2)

a4 = (T00*T03 - T01*T02)/(T00*T02 - T01**2)

We will see them below!  Below is not the real working code, but important code snippets to understand how the algorithm works.

## Step 4: Tune several parameters to find the best fitting transfer function prediction model

In [None]:
# Next, plug in the Markov coefficient parameters.  
# Start using coefficient values when they are not zero.
ist = 0  # Initialize 
flag = 0  # flag is redundant : just to ensure that the break works
for i in range(len(mapping_coefs)):
    if mapping_coefs[i] != 0 and flag == 0:
        ist = i
        flag = 1
        break
# print('ist : ' + str(ist))
T00 = mapping_coefs[ist+1]
T01 = mapping_coefs[ist+2]
T02 = mapping_coefs[ist+3]
T03 = mapping_coefs[ist+4]

# --------------------

# You can tune the gain of the choosen Markov coefficient parameters, 
# to get a better model fit.  Generate a list of gains to multiply with the 
# numerator : to try to get a better fit
test_loop = 10
endK_tune = 1  # we choose the end gain value as 1 because the coefficients are always smaller than needed

# It is unclear what value to multiply by the coefficients, 
# but from trial-and-error, a larger value than the height of 
# the input and output is required.

# Let's choose 100 times larger than the max abs input output
k = max([max(np.abs(inputs_generic)), max(np.abs(outputs_generic))])

Kincrement = (k-endK_tune)/test_loop

# Generate positive gain values from k to endK_tune
kk = [k]  # initial value of the k vector
for i in range(test_loop):
    kk = kk + [kk[i-1]-Kincrement]
kk = np.ravel(kk)
# print('kk : ' + str(kk))

# Generate equivalent positive and negative gain values
kout = np.concatenate((kk, -kk), axis=0)
kout = np.array(kout)

# --------------------

for i in range(test_loop):
    T00 = kout[i]*T00
    T01 = kout[i]*T01
    T02 = kout[i]*T02
    T03 = kout[i]*T03
    
    # --------------------
    
    b1 = T00
    b2 = T01
    a3 = -(T01*T03 - T02**2)/(T00*T02 - T01**2)
    a4 = (T00*T03 - T01*T02)/(T00*T02 - T01**2)

    # --------------------
    
    # Let's fill in the A, B, C, D state-space matricies
    A = np.array([[a1, a2],[a3, a4]])
    if np.isnan(A).any() == True:
        A = np.array([[a1, a2],[0.01, 0.05]])   # some values that the algorithm can compute
    B = np.reshape([b1, b2], (2,1))
    C = np.reshape([[c1],[c2]], (1,2))
    D = np.zeros((1,1))

    # --------------------
    
    (num, den) = ss2tf(A, B, C, D)

    num = np.array(make_a_properlist(num))
    num = np.round(num, 5)
    # print('num : ' + str(num))

    den = np.array(make_a_properlist(den))
    den = np.round(den, 5)
    
    # --------------------
    
    # Technically, all signals are discrete.  But, you can "get away" with using a 
    # continuous time formulation if the system is over-sampled.  So both discrete 
    # and continuous time calculations are performed, just to understand if the signal 
    # is over-sampled or not.
    
    # --------------------
    
    # I only show the Discrete system here, because the code is repetative
    # See github for more
    tf_dis = (num, den, ts)
    # print('tf_dis : ' + str(tf_dis))

    pred_t_dis, pred_output_dis  = signal.dlsim(tf_dis, inputs_generic, t=time)

    # Make sure signal in a list
    pred_output_dis = make_a_properlist(pred_output_dis)
    pred_t_dis = make_a_properlist(pred_t_dis)

    # Make sure it is an array so we can cut the data in the next step
    pred_output_dis = np.array(pred_output_dis)
    pred_output_dis = pred_output_dis.real

    # Ensure predicted signal and outputs are the same length : 
    # sometimes pred_output has one point less than outputs
    minlen = min([len(pred_output_dis), len(outputs_generic)])
    pred_output_dis = np.ravel(np.reshape(pred_output_dis[0:minlen], (1, minlen)))
    outputs_generic = np.ravel(np.reshape(outputs_generic[0:minlen], (1, minlen)))
    
    # --------------------
    
    # I calculate the R-squared error, adjusted R-squared error, and absolute error.
    # R-squared error was decided to be the best metric for determing if a predicted model
    # output signal was similar to the actual human operator output signal
    # See github for this function
    r_squared_dis = rsquared_abserror(outputs_generic, pred_output_dis, time, ts, 1)
    metric_dis = r_squared_dis

    # --------------------
    
    # Plotting
    # See github for this function
    plot_best_coefmeth(time, inputs_generic, outputs_generic, pred_output_dis, way_coeff, metric_dis, plot_ALL_predictions)

    # --------------------
    
    # Update output parameters : I compare the previous best r-squared value with the current
    # r-squared value, and choose the model with the best r-squared value
    # See github for this function
    methcho1, metric_out = choose_best_regression_metric(metric_dis_best, metric_dis, metric_type)
    if methcho1 == 1:
        tf_dis_best = tf_dis
        pred_output_dis_best = pred_output_dis
        metric_dis_best = metric_dis

# --------------------

# After the loop, we have the best predicting transfer function model
return tf_dis_best, pred_output_dis_best, metric_dis_best

## Success: Let's see the fit

Putting all four steps into a single function, see github, we can test all possible combinations of tunning parameters and find the best model that can predict the output using the input signal.

In [4]:
pred_output, tf_pred, r_squared, sysmeth, discon, which_setting = optimize_n4sid_settings(inputs0, outputs0, t, ts)

Here we can see that the function found a model that can predict the output with R-squared equal 0.75, where R-squared =1 is perfect prediction.  Thus, the model fit is not perfect but also not too bad at predicting what the human operation can do.

I think that n4sid could be used for finding causality between machine learning features.  If the fit is very poor, it means that it is not likely to find an ordinary differential
equation that connects the input to the output.  Thus the input and output are likely to be 
generated by different processes, or the input does not cause the output to occur.
Which is the definition of non-causality.

I hope everyone enjoys n4sid as much as I do...