In [1]:
import numpy as np
import pandas as pd
import glob
import tensorflow as tf
from scipy.signal import find_peaks, peak_prominences, peak_widths
import matplotlib.pyplot as plt

columns = ['lJPos', 'rJPos', 'lJVel', 'rJVel', 'lJTorque', 'rJTorque',
           'eulerX', 'eulerY', 'eulerZ', 'gyroX', 'gyroY', 'gyroZ', 'accX', 'accY', 'accZ',
           'batt', 'cpu', 'mem', 'lBttn', 'rBttn', 'time', 'lJVelFilt', 'rJVelFilt',
           'lJPosReset', 'rJPosReset', 'lGC', 'rGC', 'stand', 'lCmdTorque', 'rCmdTorque',
           'lRecvTorque', 'rRecvTorque', 'lStanceSwing', 'rStanceSwing', 'nWalk', 'lWalk', 'rWalk', 'none']

ST_columns = ['lJPos', 'rJPos', 'lJVel', 'rJVel', 'lJTorque', 'rJTorque',
           'eulerX', 'eulerY', 'eulerZ', 'gyroX', 'gyroY', 'gyroZ', 'accX', 'accY', 'accZ',
           'lBttn', 'rBttn', 'time', 'lJVelFilt', 'rJVelFilt',
           'lJPosReset', 'rJPosReset', 'lGC', 'rGC', 'stand', 'lCmdTorque', 'rCmdTorque',
           'lRecvTorque', 'rRecvTorque', 'lStanceSwing', 'rStanceSwing', 'nWalk', 'lWalk', 'rWalk', 'none']

channel = ['lJPos', 'rJPos', 'lJVel', 'rJVel', 'gyroX', 'gyroY', 'gyroZ', 'accX', 'accY', 'accZ', 'lWalk', 'rWalk']
    
def find_peak_idx(joint_positions):
    peaks, _ = find_peaks(joint_positions)
    prominences = peak_prominences(joint_positions, peaks)[0]
    maximas, _ = find_peaks(joint_positions, prominence=np.median(prominences)+np.var(prominences), distance=150)
    return maximas

def label_ground_truth(joint_positions):
    maximas = find_peak_idx(joint_positions)
    maximas = np.append(0, maximas)
    end_idx = maximas[-1]
    
    y = pd.Series(np.nan, index=range(0, joint_positions.shape[0]))  
    for maxima in maximas:
        y[maxima] = 1
        y[maxima+1] = 0
    y.interpolate(inplace=True)
    y.fillna(0, inplace=True)
    y_theta = y * 2 * np.pi
    
    cartesian_output = np.stack([np.cos(y_theta), np.sin(y_theta)], axis=1)
    return y, end_idx, cartesian_output

def custom_rmse(y_true, y_pred):
    #Raw values and Prediction are in X,Y
    labels, theta, gp = {}, {}, {}

    #Separate legs
    left_true = y_true[:, :2]
    right_true = y_true[:, 2:]
    left_pred = y_pred[:, :2]
    right_pred = y_pred[:, 2:]
    
    #Calculate cosine distance
    left_num = np.sum(np.multiply(left_true, left_pred), axis=1)
    left_denom = np.linalg.norm(left_true, axis=1) * np.linalg.norm(left_pred, axis=1)
    right_num = np.sum(np.multiply(right_true, right_pred), axis=1)
    right_denom = np.linalg.norm(right_true, axis=1) * np.linalg.norm(right_pred, axis=1)

    left_cos = left_num / left_denom
    right_cos = right_num / right_denom
    
    #Clip large values and small values
    left_cos = np.minimum(left_cos, np.zeros(left_cos.shape)+1)
    left_cos = np.maximum(left_cos, np.zeros(left_cos.shape)-1)
    
    right_cos = np.minimum(right_cos, np.zeros(right_cos.shape)+1)
    right_cos = np.maximum(right_cos, np.zeros(right_cos.shape)-1)
    
    # What if denominator is zero (model predicts 0 for both X and Y)
    left_cos[np.isnan(left_cos)] = 0
    right_cos[np.isnan(right_cos)] = 0
    
    #Get theta error
    left_theta = np.arccos(left_cos)
    right_theta = np.arccos(right_cos)
    
    #Get gait phase error
    left_gp_error = left_theta * 100 / (2*np.pi)
    right_gp_error = right_theta * 100 / (2*np.pi)
    
    #Get rmse
    left_rmse = np.sqrt(np.mean(np.square(left_gp_error)))
    right_rmse = np.sqrt(np.mean(np.square(right_gp_error)))

    #Separate legs
    labels['left_true'] = left_true
    labels['right_true'] = right_true
    labels['left_pred'] = left_pred
    labels['right_pred'] = right_pred

    for key, value in labels.items(): 
        #Convert to polar
        theta[key] = np.arctan2(value[:, 1], value[:, 0])
        
        #Bring into range of 0 to 2pi
        theta[key] = np.mod(theta[key] + 2*np.pi, 2*np.pi)

        #Interpolate from 0 to 100%
        gp[key] = 100*theta[key] / (2*np.pi)

    return left_rmse, right_rmse

def custom_rmse_uni(left_true, left_pred):
    #Raw values and Prediction are in X,Y
    labels, theta, gp = {}, {}, {}
    
    #Calculate cosine distance
    left_num = np.sum(np.multiply(left_true, left_pred), axis=1)
    left_denom = np.linalg.norm(left_true, axis=1) * np.linalg.norm(left_pred, axis=1)

    left_cos = left_num / left_denom
    
    #Clip large values and small values
    left_cos = np.minimum(left_cos, np.zeros(left_cos.shape)+1)
    left_cos = np.maximum(left_cos, np.zeros(left_cos.shape)-1)
    
    # What if denominator is zero (model predicts 0 for both X and Y)
    left_cos[np.isnan(left_cos)] = 0
    
    #Get theta error
    left_theta = np.arccos(left_cos)
    
    #Get gait phase error
    left_gp_error = left_theta * 100 / (2*np.pi)
    
    #Get rmse
    left_rmse = np.sqrt(np.mean(np.square(left_gp_error)))

    #Separate legs
    labels['left_true'] = left_true
    labels['left_pred'] = left_pred

    for key, value in labels.items(): 
        #Convert to polar
        theta[key] = np.arctan2(value[:, 1], value[:, 0])
        
        #Bring into range of 0 to 2pi
        theta[key] = np.mod(theta[key] + 2*np.pi, 2*np.pi)

        #Interpolate from 0 to 100%
        gp[key] = 100*theta[key] / (2*np.pi)

    return left_rmse

In [3]:
#-- Load model & OA data --#

# for file_path in glob.glob(f'data/raw/AB08_CCW_BT.txt'):
for file_path in glob.glob(f'data/OA_TEST_OVERGROUND.txt'):
    data = pd.read_csv(file_path, sep=" ", header=1)
    data.columns = ST_columns
    input_data = pd.DataFrame(data, columns=channel).to_numpy()

input_data, eval_data = input_data[:int(input_data.shape[0]*0.8), :], input_data[int(input_data.shape[0]*0.8):, :]

print(input_data.shape, eval_data.shape)
# data = data.to_numpy()
# input_data = data[:,:-1]
# print(input_data)
# plt.plot(input_data[:2000,-1])
# plt.plot(input_data[:2000,-2])

# plt.show()

model_adap_left = tf.keras.models.load_model('GP_Left_WS80_noBN.h5')
model_stat_left = tf.keras.models.load_model('GP_Left_WS80_noBN.h5')

# model_adap_right = keras.models.load_model('final_model_right_WS80.h5')
# model_stat_right = keras.models.load_model('final_model_right_WS80.h5')

# model_adap_left.summary()
# for layer in model_adap_left.layers[:8]:
#     layer.trainable = False
#     print(layer.trainable)

(30528, 12) (7633, 12)


In [26]:
#-- Configs --#

window_size = 80
channel_number = 12
buffer_size = 1000

In [27]:
#-- Process Eval Data --#

peak_idx = find_peak_idx(eval_data[:,0])[-1]
eval_l_gp = eval_data[:,-2] 
eval_x = eval_data[:,:-2]

eval_gait_phase, end_idx, eval_y = label_ground_truth(eval_x[:,0])
eval_x = eval_x[:end_idx,:]
eval_y = eval_y[:end_idx,:]      
eval_gait_phase = eval_gait_phase[:end_idx]
eval_l_gp = eval_l_gp[:end_idx]


shape = (eval_x.shape[0] - window_size + 1, window_size, eval_x.shape[1])
strides = (eval_x.strides[0], eval_x.strides[0], eval_x.strides[1])
eval_x = np.lib.stride_tricks.as_strided(eval_x, shape=shape, strides=strides)
eval_y = eval_y[window_size - 1:]

print(eval_x.shape, eval_y.shape)

(7313, 80, 10) (7313, 2)


In [28]:
#-- OA --#

y_new_eval = model_stat_left.predict(eval_x)
y_old_eval = model_adap_left.predict(eval_x)

eval_rmse_o = custom_rmse_uni(eval_y, y_old_eval)
eval_rmse_n = custom_rmse_uni(eval_y, y_new_eval)

print(f'Starting Eval RMSE: Static({eval_rmse_o:.2f}) | Adap ({eval_rmse_n:.2f})')

old_buffer = np.empty((0, channel_number))
current_buffer = np.empty((0, channel_number))

for ii, stream_vec in enumerate(input_data):

    stream_vec = np.expand_dims(stream_vec, axis=0)
    current_buffer = np.concatenate([current_buffer, stream_vec], axis=0)
        
    if current_buffer.shape[0] == buffer_size:
        peak_idx = find_peak_idx(current_buffer[:,0])[-1]
        x = np.concatenate([old_buffer, current_buffer[:peak_idx, :]], axis=0)
#         l_mode = x[:,-2] 
        x = x[:,:-2]

        gait_phase, end_idx, y = label_ground_truth(x[:,0])
        x = x[:end_idx,:]
        y = y[:end_idx,:]      
#         gait_phase = gait_phase[:end_idx]
#         l_mode = l_mode[:end_idx]
        
#         new_idx = np.where((l_mode != 0))[0].tolist()
#         x = x[new_idx,:]
#         y = y[new_idx,:]
#         gait_phase = gait_phase[new_idx]
#         l_mode = l_mode[new_idx]
        
#         plt.plot(x[:,0])
#         plt.plot(gait_phase)
#         plt.plot(l_mode)
#         plt.show()

        window_size = 80
        shape = (x.shape[0] - window_size + 1, window_size, x.shape[1])
        strides = (x.strides[0], x.strides[0], x.strides[1])
        x = np.lib.stride_tricks.as_strided(x, shape=shape, strides=strides)
        y = y[window_size - 1:]
#         y = np.expand_dims(y[window_size - 1:], axis=1)
    
        y_new_train = model_adap_left.predict(x)
        y_old_train = model_stat_left.predict(x)
#         y = np.squeeze(y, axis=1)

        left_rmse_o = custom_rmse_uni(y, y_old_train)
        left_rmse_n = custom_rmse_uni(y, y_new_train)
        model_adap_left.fit(x, y, epochs=1, batch_size=256, verbose=0)

        
#         y_new_eval = model_adap_left.predict(eval_x)
#         y_old_eval = model_stat_left.predict(eval_x)
        
#         eval_rmse_o = custom_rmse_uni(eval_y, y_old_eval)
#         eval_rmse_n = custom_rmse_uni(eval_y, y_new_eval)

        print(f"Mode RMSE: ({left_rmse_o:.2f}) | ({left_rmse_n:.2f})")
#         print(f"Training RMSE: ({left_rmse_o:.2f}) | ({left_rmse_n:.2f}) -- Eval RMSE: ({eval_rmse_o:.2f}) | ({eval_rmse_n:.2f})")        
#         plt.plot(y[:,1])
#         plt.plot(y_old_test[:,1])
#         plt.plot(y_new_test[:,1])
#         plt.show()
        
        old_buffer = current_buffer[peak_idx:, :]
        current_buffer = np.empty((0, channel_number))

Starting Eval RMSE: Static(1.48) | Adap (7.74)
Mode RMSE: (27.05) | (17.53)
Mode RMSE: (9.54) | (1.27)
Mode RMSE: (9.71) | (1.07)
Mode RMSE: (8.26) | (1.11)
Mode RMSE: (8.72) | (1.26)
Mode RMSE: (6.99) | (1.16)
Mode RMSE: (7.32) | (1.01)
Mode RMSE: (6.84) | (1.15)
Mode RMSE: (6.98) | (1.06)
Mode RMSE: (9.69) | (1.16)
Mode RMSE: (7.66) | (1.18)
Mode RMSE: (6.76) | (0.91)
Mode RMSE: (8.29) | (0.82)
Mode RMSE: (8.04) | (0.95)
Mode RMSE: (9.67) | (1.17)
Mode RMSE: (4.44) | (1.38)
Mode RMSE: (4.59) | (1.37)
Mode RMSE: (4.83) | (1.20)
Mode RMSE: (4.70) | (1.35)
Mode RMSE: (5.42) | (0.86)
Mode RMSE: (6.28) | (1.22)
Mode RMSE: (7.57) | (1.12)
Mode RMSE: (10.32) | (1.01)
Mode RMSE: (11.28) | (1.00)
Mode RMSE: (8.97) | (1.02)
Mode RMSE: (12.27) | (1.14)
Mode RMSE: (8.93) | (1.08)
Mode RMSE: (12.76) | (1.31)
Mode RMSE: (10.81) | (1.09)
Mode RMSE: (9.44) | (1.35)


In [6]:
# model_adap_left = tf.keras.models.load_model('final_model_left_WS80.h5')

# # model_adap_left.summary()
# for layer in model_adap_left.layers[:9]:
#     layer.trainable = False
# for layer in model_adap_left.layers:
#     print(layer.trainable)