# Gait steps

Reginaldo K Fukuchi 

This NB implements the "gait_steps.m" Sean Osis method to detect gait events.

In [1]:
# Prepare environment
import os, glob
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

from detecta import detect_peaks

In [2]:
# Import data
pathname = r'../data'

### Supporting functions

In [3]:
def f7(seq):
    seen = set()
    seen_add = seen.add
    return [x for x in seq if not (x in seen or seen_add(x))]

def loadjson(filename):
    import json # import library
    with open(fn_json, 'r') as f:
        data = json.load(f)
    
    return data

def loadmat(filename):
    '''
    this function should be called instead of direct spio.loadmat
    as it cures the problem of not properly recovering python dictionaries
    from mat files. It calls the function check keys to cure all entries
    which are still mat-objects
    '''
    import scipy.io as spio
    
    def _check_keys(d):
        '''
        checks if entries in dictionary are mat-objects. If yes
        todict is called to change them to nested dictionaries
        '''
        for key in d:
            if isinstance(d[key], spio.matlab.mat_struct):
                d[key] = _todict(d[key])
        return d

    def _todict(matobj):
        '''
        A recursive function which constructs from matobjects nested dictionaries
        '''
        d = {}
        for strg in matobj._fieldnames:
            elem = matobj.__dict__[strg]
            if isinstance(elem, spio.matlab.mat_struct):
                d[strg] = _todict(elem)
            elif isinstance(elem, np.ndarray):
                d[strg] = _tolist(elem)
            else:
                d[strg] = elem
        return d

    def _tolist(ndarray):
        '''
        A recursive function which constructs lists from cellarrays
        (which are loaded as numpy ndarrays), recursing into the elements
        if they contain matobjects.
        '''
        elem_list = []
        for sub_elem in ndarray:
            if isinstance(sub_elem, spio.matlab.mat_struct):
                elem_list.append(_todict(sub_elem))
            elif isinstance(sub_elem, np.ndarray):
                elem_list.append(_tolist(sub_elem))
            else:
                elem_list.append(sub_elem)
        return elem_list
    data = spio.loadmat(filename, struct_as_record=False, squeeze_me=True)
    return _check_keys(data)

def cardanangles(r):
    '''
    %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
    # %   This function inputs the ROTATION matrix for a given joint
    # %   at one point in time and uses the cardan angles sequence to
    # %   provide the XYZ joint angles in OUT argument
    # %
    # %   The function uses the following  rotation matrix to calculate angles
    # %      | CzCy-SzSySx  SzCy+CzSySx  -SyCx |
    # %      | -SzCx        CzCx         Sx    |
    # %      | CzSy+SzCySx  SzSy-CzCySx  CyCx  |
    # %  INPUTS
    # %  --------
    # %   R (mat):    A 3x3 rotation matrix for a joint

    # %  OUTPUTS
    # %  -------
    # %  OUT (mat):      The three (1x3) planes of rotation in radians
    # %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
    # %%
    # %   angle.L_ankle(i,:) = cardanangles(R.L_ankle(:,:,i));

    # % the use of atan2 increases the stability by avoiding gimble lock in
    # % the physiologicaly posible range of joint angles.
    '''
    # Prepare Python environment
    import numpy as np
    x = np.arctan2(r[1,2], np.sqrt(r[0,2]**2+r[2,2]**2))
    y = np.arctan2(-r[0,2], r[2,2])
    z = np.arctan2(-r[1,0], r[1,1])

    out = np.array([x,y,z])

    return out

## Import data
### RIC data

In [4]:
figshare_dir = r'C:\Users\Reginaldo\OneDrive - University of Calgary\data\Figshare_SciData\new_unzip'
data_dir = r'../data'

In [5]:
fn_json=os.path.join(figshare_dir, '201225', '20140515T133244.json')
data_RIC = loadjson(fn_json)
data_RIC.keys()

dict_keys(['hz_w', 'hz_r', 'walking', 'running', 'joints', 'neutral', 'dv_w', 'dv_r'])

In [6]:
# Create dataframe column corresponding to the dataset
neutral_lbls = list(data_RIC['neutral'].keys())
xyz = list('XYZ')*len(neutral_lbls)
neutral_lbls = [ele for ele in neutral_lbls for i in range(3)]
neutral_lbls = [neutral_lbls[i]+'_'+xyz[i] for i in range(len(xyz))]

# Joint marker labels static trial
joints_lbls = list(data_RIC['joints'].keys())
xyz = list('XYZ')*len(joints_lbls)
joints_lbls = [ele for ele in joints_lbls for i in range(3)]
joints_lbls = [joints_lbls[i]+'_'+xyz[i] for i in range(len(xyz))]

# Marker labels running trial
gait_lbls = list(data_RIC['running'].keys())
xyz = list('XYZ')*len(gait_lbls)
gait_lbls = [ele for ele in gait_lbls for i in range(3)]
gait_lbls = [gait_lbls[i]+'_'+xyz[i] for i in range(len(xyz))]

In [7]:
# Convert dictionaries into pandas dfs
neutral = pd.DataFrame.from_dict(data_RIC['neutral']).values.reshape((1,len(neutral_lbls)),
                                                                     order='F')
joints  = pd.DataFrame.from_dict(data_RIC['joints']).values.reshape((1,len(joints_lbls)),
                                                                     order='F')

In [8]:
# Convert dictionaries into pandas dfs
neutral = pd.DataFrame(data=neutral, 
                           columns=neutral_lbls)
joints = pd.DataFrame(data=joints, 
                           columns=joints_lbls)

In [9]:
run_data = np.empty(shape=(5000, len(list(data_RIC['running'].keys()))*3))
for m, mkr in enumerate(list(data_RIC['running'].keys())):
    run_data[:, 3*m:3*(m+1)] = np.array(data_RIC['running'][mkr])
# Create dataframe with running data
gait = pd.DataFrame(data = run_data, columns=gait_lbls)

### Running function gait_kinematics.py to output required variables for gait_steps.py

In [10]:
import sys
sys.path.insert(1, r'../functions')
from gait_kinematics import gait_kinematics

In [11]:
# Invoking function to calculate angles
R_R_ankle, R_R_knee, R_R_hip, angle_R_foot, angle_Pelvis = gait_kinematics(joints, neutral, gait, data_RIC['hz_r'])

In [12]:
angle_R_ankle = np.empty(shape=(R_R_ankle.shape[2],3)) * np.NaN
angle_R_knee = np.empty(shape=(R_R_ankle.shape[2],3)) * np.NaN
angle_R_hip = np.empty(shape=(R_R_ankle.shape[2],3)) * np.NaN
for i in range(R_R_ankle.shape[2]):
    angle_R_ankle[i,:] = cardanangles(R_R_ankle[:,:,i])
    angle_R_knee[i,:] = cardanangles(R_R_knee[:,:,i])
    angle_R_hip[i,:] = cardanangles(R_R_hip[:,:,i])

In [13]:
angle_R_ankle = angle_R_ankle * (180/np.pi)
angle_R_knee  = angle_R_knee * (180/np.pi)
angle_R_hip   = angle_R_hip * (180/np.pi)
angle_R_foot  = angle_R_foot * (180/np.pi)
angle_Pelvis  = angle_Pelvis * (180/np.pi)

In [14]:
# Create dataframe column corresponding to the dataset
joints_lbls = ['pelvis','foot','hip','knee','ankle']
xyz = list('XYZ')*len(joints_lbls)
joints_lbls = [ele for ele in joints_lbls for i in range(3)]
joints_lbls = [joints_lbls[i]+'_'+xyz[i] for i in range(len(xyz))]

In [15]:
angs = np.hstack([angle_Pelvis, angle_R_foot, angle_R_hip, angle_R_knee, angle_R_ankle])

In [16]:
angles = pd.DataFrame(data=angs, columns=joints_lbls)
angles.head()

Unnamed: 0,pelvis_X,pelvis_Y,pelvis_Z,foot_X,foot_Y,foot_Z,hip_X,hip_Y,hip_Z,knee_X,knee_Y,knee_Z,ankle_X,ankle_Y,ankle_Z
0,-0.130881,3.62383,-0.35451,12.427321,3.441102,-87.858121,-10.735628,-6.963042,7.212884,19.450638,16.619757,51.327518,-10.925609,-7.332781,26.055983
1,-0.109653,3.092453,0.503615,12.336751,4.156125,-87.042577,-9.942102,-6.339841,6.811078,20.051316,15.919602,53.369701,-10.691751,-7.543328,24.570621
2,-0.132692,2.53905,1.368341,12.218492,4.876927,-86.157714,-9.120387,-5.842418,6.344889,20.630059,15.22679,55.438418,-10.455375,-7.734672,23.052603
3,-0.216864,1.964295,2.222881,12.066526,5.590107,-85.191288,-8.27857,-5.520388,5.807934,21.161956,14.572236,57.492671,-10.214346,-7.895527,21.521611
4,-0.372386,1.374357,3.041033,11.879516,6.286569,-84.131576,-7.42912,-5.406812,5.188237,21.626613,13.983739,59.492093,-9.967965,-8.01355,19.994906


In [17]:
#data.loc[:, data.columns.str.contains('in')]

In [18]:
#%% Determine functional measures and gait type (walk vs run)
# % movement speed comes from the A/P position time history of a heel marker
# % so we first need to identify a heel marker
# LEFT SIDE
# Determine functional measures and gait type (walk vs run) movement speed comes 
# from the A/P position time history of a heel marker so we first need to identify 
# a heel marker.

# % Combine 3 of the foot markers into one matrix (ignore the created fourth)
L_foot = neutral[['L_foot_1_X', 'L_foot_1_Y', 'L_foot_1_Z',
              'L_foot_2_X', 'L_foot_2_Y', 'L_foot_2_Z',
              'L_foot_3_X', 'L_foot_3_Y', 'L_foot_3_Z']].values.reshape((3,3))
# sort the markers from left to right
i_lf   = list(L_foot[:, 0].argsort())
L_foot = L_foot[L_foot[:, 0].argsort()]

# find the lower of the two medial markers
if L_foot[1,1] < L_foot[2,1]:
    L_marker = 'L_foot_' + str(i_lf[1]+1)
    L_heel =  gait.filter(like=L_marker).values
else:
    L_marker = 'L_foot_' + str(i_lf[2]+1)
    L_heel =  gait.filter(like=L_marker).values
    
# Find peaks location. Signal flipped because of X-axis convention difference.
locs0 = detect_peaks(np.diff(L_heel[:,2]), mpd=np.round(0.5*data_RIC['hz_r']), 
                    mph=0, show=False)
pks = np.diff(L_heel[:,2])[locs0]

locs = detect_peaks(-np.diff(L_heel[:,2]), mpd=np.round(0.5*data_RIC['hz_r']), 
                    mph=0, show=False)

# Gait velocity and cadence
vel    = data_RIC['hz_r']*np.median(pks)/1000; # gait speed
stRate = 60/(np.median(np.diff(locs))/data_RIC['hz_r']); # cadence
print('Gait velocity is '+str(vel)+' m/s')
print('Stride rate is '+str(stRate)+' strides/min')

Gait velocity is 2.904997037904531 m/s
Stride rate is 91.6030534351145 strides/min


In [19]:
#%% RIGHT SIDE
# % Combine 3 of the foot markers into one matrix (ignore the created fourth)
R_foot = neutral[['R_foot_1_X', 'R_foot_1_Y', 'R_foot_1_Z',
              'R_foot_2_X', 'R_foot_2_Y', 'R_foot_2_Z',
              'R_foot_3_X', 'R_foot_3_Y', 'R_foot_3_Z']].values.reshape((3,3))
# sort the markers from left to right
i_rf   = list(R_foot[:, 0].argsort())
R_foot = R_foot[R_foot[:, 0].argsort()]

# find the lower of the two medial markers
if R_foot[0,1] < R_foot[1,1]:
    R_marker = 'R_foot_' + str(i_rf[0]+1)
    R_heel =  gait.filter(like=R_marker).values
else:
    R_marker = 'R_foot_' + str(i_rf[1]+1)
    R_heel =  gait.filter(like=R_marker).values

### Linear discriminant analysis

In [20]:
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis

In [21]:
# Import training dataset
gaitClass = pd.read_csv(os.path.join(pathname, 'LDA_out.txt'), delimiter='\t', 
                        header=None, names=['Category','Speed','Cadence'], usecols=[0,1,2])
# Replace numerical by categorical
gaitClass['Category'] = gaitClass['Category'].replace(1, 'walk')
gaitClass['Category'] = gaitClass['Category'].replace(2, 'run')

In [22]:
# Input to the model
X = gaitClass[['Speed','Cadence']].values # training data
y = gaitClass['Category'].values.tolist() # testing data
model = LinearDiscriminantAnalysis()# define model
model.fit(X, y) # Model fit

# make a prediction
yhat = model.predict(np.array([vel,stRate]).reshape((1,2)))[0]
print('Gait category is '+yhat)

Gait category is run


### Function pca_td.py

In [23]:
# Prepare Python environment
import scipy.io
import os
from scipy import signal

In [24]:
"""
%PCA_TD applies a pre-trained algorithm to detect touchdown events.
%
%   [EVTL,EVTR] = PCA_TD(ANGLES,HZ,GAIT)
%
%   This function takes in the ANGLES output from a *_kinematics.m function
%   and the sampling frequency HZ, and outputs touchdown events as
%   determined by the Osis et al. (2014) method.  GAIT is a label for type
%   of gait (must be either 'walk' or 'run') which determines the model
%   that will be applied.  EVTL and EVTR are vectors of indices
%   corresponding to the timing of touchdowns from the original ANGLES
%   signals. NOTE: EVTL and EVTR are not rounded... rounding is completed
%   in *_steps functions.
%
%   Created By: Sean Osis on March 10, 2014
%
%   Copyright (C) 2014, Sean Osis and The Running Injury Clinic
%
%   Update, July, 2015: Code now implements event detection for both
%   walking and running using model trained for both.  Bias is currently
%   eliminated as it does not drastically improve fit.
%
%   Update, Jan, 2016: Code has been tested for running and walking with
%   full research database.  Roughly 3% of cases demonstrate issues with
%   event detection for running, mostly due to atypical movement patterns
%   and very fast speeds ~4m/s.  Roughly 7% of cases demonstrate issues
%   with detection for walking, mostly due to large variability in walking
%   strides, however, many of these can be screened for contigous blocks or
%   to have bad steps removed.
"""


# Input parameters
hz = 200
gait_mode = yhat
angles = angles
# Load PCA output from mat file
# % event_data is a .mat file containing 'coeff' which is the coefficients
# % from the pre-trained PCA and 'p' which is the list of coefficients of the
# % linear polynomial relating PCA scores with touchdown timing relative to
# % the foot acceleration peak.
mat = scipy.io.loadmat(os.path.join(pathname, 'event_data_TD.mat'))

# Future use for separate left and right coefficients if needed... right
# now there appears to be no benefit to having them separate
coeffL = mat['coeff']
coeffR = mat['coeff']

pL = mat['p'].flatten()
pR = mat['p'].flatten()

#%% Default sampling rate for which model was originally developed
defaultHz = 200

# The length of the search window when finding positive peaks in the foot
# acceleration signal
srchlgth = round(0.175*defaultHz)

# Chunk length for PCA... determined to be 35 frames on either side of the
# foot accel peak which is 35/200 = 0.175
#chnklgth = round(0.175*hz)
chnklgth = round(0.175*defaultHz)

# Minimum distance between foot acceleration peaks
if gait_mode=='walk':
    minpkdist = round(0.7*defaultHz)
else:
    minpkdist = round(0.5*defaultHz)

# Minimum peak height for finding negative acceleration peaks
negminpkht = 0.1

# Minimum peak height for finding positive acceleration peaks
posminpkht = 0.01

# Bias added to event timing to compensate for the bias evident from the
# methodology used in Osis et al. (2014).  This bias may be due to changes
# in timing when differentiating.  Bias should be added in seconds in order
# to allow for different frame rates
bias = 0*defaultHz;

#%% %% Resample signal if needed
# Original model trained on 200Hz

if hz < 100:
    print("Error! 'Sampling frequency is less than 100 Hz. Event detection may provide inconsistent results at low sampling rates.'!")
    sys.exit()

elif hz!=defaultHz:
    
    # Resample the signals to match 200 Hz for methods below
    temp = angles.values
    temp = signal.resample_poly(temp, defaultHz, hz)
    
    angles.values = temp

In [25]:
angles.head()

Unnamed: 0,pelvis_X,pelvis_Y,pelvis_Z,foot_X,foot_Y,foot_Z,hip_X,hip_Y,hip_Z,knee_X,knee_Y,knee_Z,ankle_X,ankle_Y,ankle_Z
0,-0.130881,3.62383,-0.35451,12.427321,3.441102,-87.858121,-10.735628,-6.963042,7.212884,19.450638,16.619757,51.327518,-10.925609,-7.332781,26.055983
1,-0.109653,3.092453,0.503615,12.336751,4.156125,-87.042577,-9.942102,-6.339841,6.811078,20.051316,15.919602,53.369701,-10.691751,-7.543328,24.570621
2,-0.132692,2.53905,1.368341,12.218492,4.876927,-86.157714,-9.120387,-5.842418,6.344889,20.630059,15.22679,55.438418,-10.455375,-7.734672,23.052603
3,-0.216864,1.964295,2.222881,12.066526,5.590107,-85.191288,-8.27857,-5.520388,5.807934,21.161956,14.572236,57.492671,-10.214346,-7.895527,21.521611
4,-0.372386,1.374357,3.041033,11.879516,6.286569,-84.131576,-7.42912,-5.406812,5.188237,21.626613,13.983739,59.492093,-9.967965,-8.01355,19.994906


In [26]:
#%% Right Side Touchdown Detections

#% Flip the foot in sagittal and differentiate
negsig = -np.diff(angles['foot_Z'].values, 2)

# PENDING
## CONTINUE REPRODUCING THE pca_td.m code from here

In [27]:
negsig[:5]

array([-0.06931977, -0.08156313, -0.09328528, -0.10171603, -0.10702868])

In [None]:
angles.filter(like=R_marker).values

In [None]:
#%% Identify Touch Down and Take Off events: Gait Independent
# % Use PCA touchdown detection based on updated Osis et al. (2014) for
# % both walking and running.
# % Use new PCA toeoff detection for both walking and running.
# % evt variables are NOT rounded
try
    [evtLtd,evtRtd] = pca_td(angles,hz,label);
    [evtLto,evtRto] = pca_to(angles,hz,label);
catch ME
    %For a small number of people, these functions return errors, or in the
    %case of bad data... default to use FF and FB in these cases
    
    evtLtd = [];
    evtRtd = [];
    evtLto = [];
    evtRto = [];
    
    disp('Automated event detection failed, defaulting to foot-forward foot-back')
    
    ME.message;
end

In [None]:
model.coef_

In [None]:
model.priors_

## Plot compare outputs

In [None]:
tn = np.linspace(0,angle_R_ankle.shape[0],num=angle_R_ankle.shape[0])

In [None]:
# Import foot angles calculated in Matlab
fn_f_matlab = os.path.join(pathname, 'r_foot_angle_R.txt')
angf_matlab = np.loadtxt(fn_f_matlab) * (np.pi/180)

In [None]:
fig, axs = plt.subplots(1,3, figsize=(10,6))
fig.suptitle('Comparison of foot angles between methods')
axs[0].plot(angle_R_foot[:,0], 'b', label='Python')
axs[0].plot(tn[0::5],angf_matlab[0::5,0], 'bo', label='Matlab')
axs[0].grid('on')
axs[1].plot(angle_R_foot[:,1], 'r', label='Python')
axs[1].plot(tn[0::5],angf_matlab[0::5,1], 'ro', label='Matlab')
axs[1].grid('on')
axs[2].plot(angle_R_foot[:,2], 'g', label='Python')
axs[2].plot(tn[0::5],angf_matlab[0::5,2], 'go', label='Matlab')
axs[2].grid('on')
plt.show()

In [None]:
# Import Pelvis angles calculated in Matlab
fn_p_matlab = os.path.join(pathname, 'r_pelvis_angle.txt')
angp_matlab = np.loadtxt(fn_p_matlab) * (np.pi/180)

In [None]:
fig, axs = plt.subplots(1,3, figsize=(10,6))
fig.suptitle('Comparison of foot angles between methods')
axs[0].plot(angle_Pelvis[:,0], 'b', label='Python')
axs[0].plot(tn[0::5],angp_matlab[0::5,0], 'bo', label='Matlab')
axs[0].grid('on')
axs[1].plot(angle_Pelvis[:,1], 'r', label='Python')
axs[1].plot(tn[0::5],angp_matlab[0::5,1], 'ro', label='Matlab')
axs[1].grid('on')
axs[2].plot(angle_Pelvis[:,2], 'g', label='Python')
axs[2].plot(tn[0::5],angp_matlab[0::5,2], 'go', label='Matlab')
axs[2].grid('on')
plt.show()

In [None]:
# Import ankle angles calculated in Matlab
fn_a_matlab = os.path.join(pathname, 'r_ankle_angle_R.txt')
anga_matlab = np.loadtxt(fn_a_matlab)
anga_matlab = anga_matlab

In [None]:
fig, axs = plt.subplots(3, figsize=(10,6))
fig.suptitle('Comparison of ankle angles between methods')
axs[0].plot(angle_R_ankle[:,0], 'b', label='Python')
axs[0].plot(tn[0::5],anga_matlab[0::5,0], 'b*', label='Matlab')
axs[0].grid('on')
axs[1].plot(angle_R_ankle[:,1], 'r', label='Python')
axs[1].plot(tn[0::5],anga_matlab[0::5,1], 'r*', label='Matlab')
axs[1].grid('on')
axs[2].plot(angle_R_ankle[:,2], 'g', label='Python')
axs[2].plot(tn[0::5],anga_matlab[0::5,2], 'g*', label='Matlab')
axs[2].grid('on')
plt.show()

In [None]:
# Import ankle angles calculated in Matlab
fn_k_matlab = os.path.join(pathname, 'r_knee_angle_R.txt')
angk_matlab = np.loadtxt(fn_k_matlab)
angk_matlab = angk_matlab

In [None]:
fig, axs = plt.subplots(3, figsize=(10,6))
fig.suptitle('Comparison of knee angles between methods')
axs[0].plot(angle_R_knee[:,0], 'b', label='Python')
axs[0].plot(tn[0::5],angk_matlab[0::5,0], 'b*', label='Matlab')
axs[0].grid('on')
axs[1].plot(angle_R_knee[:,1], 'r', label='Python')
axs[1].plot(tn[0::5],angk_matlab[0::5,1], 'r*', label='Matlab')
axs[1].grid('on')
axs[2].plot(angle_R_knee[:,2], 'g', label='Python')
axs[2].plot(tn[0::5],angk_matlab[0::5,2], 'g*', label='Matlab')
axs[2].grid('on')
plt.show()

In [None]:
# Import ankle angles calculated in Matlab
fn_h_matlab = os.path.join(pathname, 'r_hip_angle_R.txt')
angh_matlab = np.loadtxt(fn_h_matlab)
angh_matlab = angh_matlab

In [None]:
fig, axs = plt.subplots(3, figsize=(10,6))
fig.suptitle('Comparison of knee angles between methods')
axs[0].plot(tn,angle_R_hip[:,0], 'b', label='Python')
axs[0].plot(tn[0::5],angh_matlab[0::5,0], 'bo', label='Matlab')
axs[0].grid('on')
axs[1].plot(angle_R_hip[:,1], 'r', label='Python')
axs[1].plot(tn[0::5],angh_matlab[0::5,1], 'ro', label='Matlab')
axs[1].grid('on')
axs[2].plot(angle_R_hip[:,2], 'g', label='Python')
axs[2].plot(tn[0::5],angh_matlab[0::5,2], 'go', label='Matlab')
axs[2].grid('on')
plt.show()