# Matlab testbench model state-space matrices

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import scienceplots
import scipy.linalg as LA
from scipy.signal import lsim, dlsim
import opentorsion as ot
import cvxpy as cp

In [None]:
plt.style.use(['science', 'ieee'])
plt.rcParams["figure.figsize"] = (6,4)

In [None]:
def c2d(A, B, Ts):
    """
    C2D computes a discrete-time model of a system (A_c,B_c) with sample time T_s.
    The function returns matrices A,B of the discrete-time system.
    """
    m, n = A.shape
    nb = B.shape[1]

    s = np.concatenate([A,B], axis=1)
    s = np.concatenate([s, np.zeros((nb, n+nb))], axis=0)
    S = LA.expm(s*Ts)
    Ad = S[0:n,0:n]
    Bd = S[0:n,n:n+nb+1]
    
    return Ad, Bd

In [None]:
time_steps = np.genfromtxt('../../data/ice_excitation/times.csv', delimiter=',')
motor = np.genfromtxt('../../data/ice_excitation/motor.csv', delimiter=',')
propeller = np.genfromtxt('../../data/ice_excitation/propeller.csv', delimiter=',')
angular_speeds = np.genfromtxt('../../data/ice_excitation/speed_measurements.csv', delimiter=',')
shaft_torque = np.genfromtxt('../../data/ice_excitation/torque_measurements.csv', delimiter=',')

In [None]:
A_mat = np.genfromtxt('../../data/testbench_A.csv', delimiter=',')
B_mat = np.genfromtxt('../../data/testbench_B.csv', delimiter=',')
C_mat = np.genfromtxt('../../data/testbench_C.csv', delimiter=',')
D_mat = np.genfromtxt('../../data/testbench_D.csv', delimiter=',')

In [None]:
ts = np.mean(np.diff(time_steps))
A_mat_d, B_mat_d = c2d(A_mat, B_mat, ts)

# Input and state estimation functions

In [None]:
def O(A, C, n):
    '''
    Create the extended observability matrix used in the data equation.

    Parameters:

    A : numpy.ndarray
        The state matrix of the state-space system
    C : numpy.ndarray
        The observation matrix of the state-space system
    n : float
        number of measurements

    Returns:

    O : numpy.ndarray, shape(n, number of state variables)
        The extended observability matrix
    '''
    A_power = np.copy(A)
    O = np.vstack((np.copy(C), C @ A))
    
    for k in range(n-2):
        A_power = A_power @ A
        O = np.vstack((O, C @ A_power))

    return O

In [None]:
def gamma(A, B, C, n):
    '''
    Create the impulse response matrix used in the data equation.

    Parameters:

    A : numpy.ndarray
        The state matrix of the state-space system
    B : numpy.ndarray
        The input matrix of the state-space system
    C : numpy.ndarray
        The observation matrix of the state-space system
    n : float
        number of measurements

    Returns:

    gamma : numpy.ndarray, shape(n*number of state variables, n*number of state variables)
        The impulse response matrix
    '''
    A_power = np.copy(A)
    Z = np.zeros((C @ B).shape)
    
    # first column
    gamma_column_first = np.vstack((
        Z,
        C @ B, 
        C @ A @ B
    ))
    for _ in range(n-3):
        A_power = A_power @ A
        gamma_column_first = np.vstack((gamma_column_first, C @ A_power @ B))

    # build complete matrix, column by column, from left to right
    gamma = np.copy(gamma_column_first)
    current_column = 1
    for _ in range(1, n):
        gamma_rows = Z
        
        # first add zero matrices
        for _ in range(current_column):
            gamma_rows = np.vstack((gamma_rows, Z))
        
        # then add the impulse responses
        A_power2 = np.copy(A)
        
        if current_column < (n-2):
            gamma_rows = np.vstack((
                gamma_rows,
                C @ B,
                C @ A @ B # these must not be added to the last and the second to last columns
            ))
            
        if current_column == (n-2):
            gamma_rows = np.vstack((
                gamma_rows,
                C @ B # this has to be added to the end of the second to last column
            ))
        
        for _ in range(n-current_column-3):
            A_power2 = A_power2 @ A
            gamma_rows = np.vstack((gamma_rows, C @ A_power2 @ B))

        # add column on the right hand side
        gamma = np.hstack((gamma, gamma_rows))
        current_column += 1
    
    return gamma

In [None]:
def L(input_shape):
    '''
    The regularization matrix L.

    Parameters:

    input_shape : float
        Input vector shape used to determine L matrix shape.

    Returns:

    L : ndarray
        The regularization matrix
    '''
    I = np.eye(input_shape)
    L = -I + np.diag(np.ones(input_shape-1), 1)

    return I

In [None]:
def second_difference_matrix(n, m):
    D2 = np.eye(n*m) - 2*np.eye(n*m, k=2) + np.eye(n*m, k=4)
    
    # delete incomplete rows
    D2 = D2[:-2*m, :]
    
    return D2

In [None]:
def tikhonov_problem(meas, obsrv, gamm, regu, initial_state=None, lam=1, cmplx=False):
    '''
    This function uses the cvxpy library to solve a Tikhonov regularization problem.
    '''
    d = cp.Variable((gamm.shape[1], 1), complex=cmplx)
    
    if initial_state is None:
        x = cp.Variable((obsrv.shape[1], 1), complex=cmplx)
    else:
        x = initial_state
        
    measurements = cp.Parameter(meas.shape)
    measurements.value = meas
    
    objective = cp.Minimize(cp.sum_squares(measurements - obsrv @ x - gamm @ d) + lam * cp.sum_squares(regu @ d))
    
    prob = cp.Problem(objective)
    prob.solve()

    if initial_state is None:
        x_value = x.value
    else:
        x_value = initial_state

    return d.value, x_value

In [None]:
def lasso_problem(meas, obsrv, gamm, regu, initial_state=None, lam=1, cmplx=False):
    '''
    This function uses the cvxpy library to solve a LASSO problem.
    '''
    d = cp.Variable((gamm.shape[1], 1), complex=cmplx)
    
    if initial_state is None:
        x = cp.Variable((obsrv.shape[1], 1), complex=cmplx)
    else:
        x = initial_state
        
    measurements = cp.Parameter(meas.shape)
    measurements.value = meas
    
    objective = cp.Minimize(cp.sum_squares(measurements - obsrv @ x - gamm @ d) + lam * cp.pnorm(regu @ d, 1))
    
    prob = cp.Problem(objective)
    prob.solve()
    
    if initial_state is None:
        x_value = x.value
    else:
        x_value = initial_state

    return d.value, x_value

# TODO: Input and state estimation with laboratory testbench measurments

## Load and preprocess data
* motor sampling rate ~1kHz
* sensor sampling rate ~3kHz

In [None]:
# time | MotorVelocitySet | MotorVelocity | MotorTorqueSet | MotorTorque | PropellerVelocitySet | PropellerVelocity | PropellerTorqueSet | PropellerTorque
m_data = np.genfromtxt('../../data/testbench_data/motor/baseline_500rpm_Constant_Torque_11%_motor_GP1.csv', delimiter=',')
motor_data = np.delete(m_data, 0, 0) # delete header row

# time | enc1_ang | enc1_time | enc2_ang | enc2_time | enc3_ang | enc3_time | enc4_ang | enc4_time | enc5_ang | enc5_time | acc1 | acc2 | acc3 | acc4 | Torq1 | Torq2
s_data = np.genfromtxt('../../data/testbench_data/sensor/baseline_500rpm_Constant_Torque_11%_GP1.csv', delimiter=',')
sensor_data = np.delete(s_data, 0, 0) # delete header row

In [None]:
# time | MotorTorqueSet | MotorTorque | MotorVelocitySet | MotorVelocity | PropellerTorqueSet | PropellerTorque | PropellerVelocitySet | PropellerVelocity
#m_data = np.genfromtxt('../../data/testbench_data/motor/baseline_500rpm_Sin_torque_11%_motor_GP1.csv', delimiter=',')
#motor_data = np.delete(m_data, 0, 0) # delete header row

# time | enc1_ang | enc1_time | enc2_ang | enc2_time | enc3_ang | enc3_time | enc4_ang | enc4_time | enc5_ang | enc5_time | acc1 | acc2 | acc3 | acc4 | Torq1 | Torq2
#s_data = np.genfromtxt('../../data/testbench_data/sensor/baseline_500rpm_Sin_Torque_11%_GP1.csv', delimiter=',')
#sensor_data = np.delete(s_data, 0, 0) # delete header row

In [None]:
# time | MotorTorqueSet | MotorTorque | MotorVelocitySet | MotorVelocity | PropellerTorqueSet | PropellerTorque | PropellerVelocitySet | PropellerVelocity
#m_data = np.genfromtxt('../../data/testbench_data/motor/Virtual_Sensor_2.4%_0_motor.csv', delimiter=',')
#motor_data = np.delete(m_data, 0, 0) # delete header row

# time | enc1_ang | enc1_time | enc2_ang | enc2_time | enc3_ang | enc3_time | enc4_ang | enc4_time | enc5_ang | enc5_time | acc1 | acc2 | acc3 | acc4 | Torq1 | Torq2
#s_data = np.genfromtxt('../../data/testbench_data/sensor/Virtual_Sensor_2.4%_0.csv', delimiter=',')
#sensor_data = np.delete(s_data, 0, 0) # delete header row

In [None]:
t_motor = motor_data[:,0]
tau_motor = motor_data[:,2+2]
tau_propeller = motor_data[:,6+2]

In [None]:
print(len(t_motor))
plt.plot(t_motor, tau_motor, label='motor')
plt.plot(t_motor, tau_propeller, label='propeller')
#plt.xlim(0,10)
plt.legend()
plt.show()

# Simulation with measured inputs

In [None]:
dt = np.mean(np.diff(t_motor))
mot_pot = np.vstack((tau_motor, tau_propeller)).T

A_mat_d, B_mat_d = c2d(A_mat, B_mat, dt)

In [None]:
t_out, y_out, x_out = dlsim((A_mat_d, B_mat_d, C_mat, D_mat, dt), t=t_motor, u=mot_pot)

## Experimental measurements

In [None]:
# motor sampling rate is ~1/3 of sensor sampling rate so every third sensor sample is used
t_sensor = sensor_data[:,0]
i_max = len(t_sensor) - (len(t_sensor) % len(t_motor))
sensor_tmp = sensor_data[:i_max,:] # len(sensor_data) == 3*len(motor_data)
sensor_data_third = sensor_tmp[::3,:] # len(sensor_data) == len(motor_data)

In [None]:
t_sensor_third = sensor_data_third[:,0]
torq_sensor1 = sensor_data_third[:,-2]
torq_sensor2 = sensor_data_third[:,-1]

In [None]:
omega1 = np.gradient(sensor_data_third[:,1]*(np.pi/180), sensor_data_third[:,2])
omega2 = np.gradient(sensor_data_third[:,3]*(np.pi/180), sensor_data_third[:,4])
omega3 = np.gradient(sensor_data_third[:,5]*(np.pi/180), sensor_data_third[:,6])
omega4 = -np.gradient(sensor_data_third[:,7]*(np.pi/180), sensor_data_third[:,8])
omega5 = np.gradient(sensor_data_third[:,9]*(np.pi/180), sensor_data_third[:,10])
omega1[np.isnan(omega1)] = 0
omega2[np.isnan(omega2)] = 0
omega3[np.isnan(omega3)] = 0
omega4[np.isnan(omega4)] = 0
omega5[np.isnan(omega5)] = 0
plt.plot(t_sensor_third, omega1, label='enc1')
plt.plot(t_sensor_third, omega2, label='enc2')
plt.plot(t_sensor_third, omega3, label='enc3')
plt.plot(t_sensor_third, omega4, label='enc4')
plt.plot(t_sensor_third, omega5, label='enc5')
plt.legend()
plt.show()

In [None]:
# torque sensor data is mixed up: V -> Nm conversion is redone
torq1 = (torq_sensor1/10)*4
torq2 = (torq_sensor2/4)*10

plt.plot(t_sensor_third, torq1, label='Torque sensor 1')
#plt.plot(t_sensor_third, torq2, label='Torque sensor 2', alpha=0.5)
plt.plot(t_out, y_out[:,-1], label='Simulated torque sensor 1')
plt.legend()
plt.show()

## Input estimation

In [None]:
measurements_noise = np.vstack((omega1[40000:], omega2[40000:], tau_motor[40000:], torq1[40000:])).T
print(measurements_noise.shape)

In [None]:
c_mat1 = np.insert(C_mat, 2, np.zeros((1, C_mat.shape[1])), 0)
c_mat1[2,22] += 1.9e5
print(c_mat.shape)

In [None]:
B_mat_d1 = B_mat_d
#B_mat_d1 = np.delete(B_mat_d, 0, 1)
m = B_mat_d1.shape[1]
#n = len(t_sensor_third)
n = 2e4 # corresponds to datapoints in over a 2 second period (1 kHz sampling frequency)

bs = 500 # batch size
loop_len = int(n/bs)

D2_mat = second_difference_matrix(bs, m)
L_mat = L(bs*m)
O_mat = O(A_mat_d, c_mat, bs)
G_mat = gamma(A_mat_d, B_mat_d1, c_mat, bs)
x_tikhonov = np.zeros((O_mat.shape[1], 1))
x_lasso = np.zeros((O_mat.shape[1], 1))

tikh_estimates = []
lasso_estimates = []

In [None]:
print(O_mat.shape)
print(B_mat_d1.shape)
print(m)
print(loop_len)

In [None]:
# for initial state estimation
C_full = np.eye(B_mat_d1.shape[0])
omat = O(A_mat_d, C_full, bs)
gmat = gamma(A_mat_d, B_mat_d1, C_full, bs)

In [None]:
for i in range(loop_len):
    batch = measurements_noise[i*bs:(i+1)*bs,:]
    y_noise = batch.reshape(-1,1)
    
    #print('Initial state estimate (Tikhonov):\n', x_tikhonov.T)
    #print('Initial state estimate (LASSO):\n', x_lasso.T)
    
    tikhonov_estimate, x_tikhonov = tikhonov_problem(y_noise, O_mat, G_mat, D2_mat, initial_state=x_tikhonov, lam=0.05)
    lasso_estimate, x_lasso = lasso_problem(y_noise, O_mat, G_mat, D2_mat, initial_state=x_lasso, lam=0.05)
    
    x_est_t = omat @ x_tikhonov + gmat @ tikhonov_estimate
    x_tikhonov = x_est_t[-A_mat_d.shape[0]:,:]
    
    x_est_l = omat @ x_lasso + gmat @ lasso_estimate
    x_lasso = x_est_l[-A_mat_d.shape[0]:,:]
    
    #print('New initial state estimate (Tikhonov):\n', x_tikhonov.T)
    #print('New initial state estimate (LASSO):\n', x_lasso.T)
    
    tikh_estimates.append(tikhonov_estimate)
    lasso_estimates.append(lasso_estimate)

In [None]:
#%matplotlib widget
for i in range(loop_len):
    plt.plot(t_motor[i*bs:(i+1)*bs], tau_propeller[i*bs:(i+1)*bs], linestyle='solid', color='black')
    plt.plot(t_sensor_third[i*bs:(i+1)*bs], lasso_estimates[i][1::2], linestyle='dashed', color='blue')

plt.legend(('known input', 'LASSO estimate'))
plt.xlabel('Time (s)')
plt.ylabel('Torque (Nm)')
plt.xlim(4,5)
plt.title('Propeller side input estimates (H-P trend filtering)')
plt.show()

In [None]:
for i in range(loop_len):
    plt.plot(t_motor[i*bs:(i+1)*bs], tau_motor[i*bs:(i+1)*bs], linestyle='solid', color='black')
    plt.plot(t_sensor_third[i*bs:(i+1)*bs], tikh_estimates[i][::2], linestyle='dotted', alpha=0.2, color='red')
    plt.plot(t_sensor_third[i*bs:(i+1)*bs], lasso_estimates[i][::2], linestyle='dashed', color='blue')

plt.legend(('known input', 'Tikhonov estimate', 'LASSO estimate'))
plt.xlabel('Time (s)')
plt.ylabel('Torque (Nm)')
plt.title('Motor side input estimates (H-P trend filtering)')
#plt.ylim(4.95, 5.05)
#plt.xlim(0.4,0.6)
plt.show()

In [None]:
for i in range(loop_len):
    plt.plot(t_motor[i*bs:(i+1)*bs], tau_propeller[i*bs:(i+1)*bs], linestyle='solid', color='black')
    plt.plot(t_sensor_third[i*bs:(i+1)*bs], tikh_estimates[i][1::2], linestyle='dotted', alpha=0.5, color='red')
    plt.plot(t_sensor_third[i*bs:(i+1)*bs], lasso_estimates[i][1::2], linestyle='dashed', color='blue')

plt.legend(('known input', 'Tikhonov estimate', 'LASSO estimate'))
plt.xlabel('Time (s)')
plt.ylabel('Torque (Nm)')
plt.title('Propeller side input estimates (H-P trend filtering)')
#plt.ylim(-4, 0)
#plt.xlim(0.4,0.6)
plt.show()

# TODO:
* virtual sensor code to notebook
* debug 3dof KF example
* try estimating motor torque with no propeller torque dataset
* dc torque of propeller estimate is too high, problem with gear ratio?
* try lsim method for motor data?