Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #22 from jakeshirey/feature/stride-length
Feature/stride length
- Loading branch information
Showing
4 changed files
with
328 additions
and
7 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,24 +1,34 @@ | ||
absl-py | ||
altgraph | ||
colorama | ||
contourpy | ||
cycler | ||
et-xmlfile | ||
fonttools | ||
future | ||
iniconfig | ||
kiwisolver | ||
libclang | ||
matplotlib | ||
numpy | ||
opencv-python | ||
openpyxl | ||
packaging | ||
pandas | ||
pefile | ||
Pillow | ||
pluggy | ||
pyinstaller | ||
pyinstaller-hooks-contrib | ||
pyparsing | ||
PyQt5 | ||
PyQt5-Qt5 | ||
PyQt5-sip | ||
pytest | ||
python-dateutil | ||
pytz | ||
pywin32-ctypes | ||
scipy | ||
six | ||
tzdata | ||
urllib3 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
''' | ||
Reference File for the stride length and duty factor calculations. Runs as a standalone script, where a DLC excel file is given as a command line argument. Includes matplotlib plots for understanding. | ||
''' | ||
import pandas as pd | ||
import matplotlib.pyplot as plt | ||
from scipy.signal import butter, filtfilt, find_peaks | ||
import numpy as np | ||
import sys | ||
|
||
if __name__ == "__main__": | ||
# Check if the file name is provided as a command-line argument | ||
if len(sys.argv) != 2: | ||
print("Usage: python read_excel.py <excel_file_name>") | ||
else: | ||
# Get the Excel file name from the command line | ||
excel_file_name = sys.argv[1] | ||
data_frame = pd.read_excel(excel_file_name) | ||
#clean data by combining labels and reindexing | ||
bodyparts_labels = data_frame.loc[0] | ||
coords_labels = data_frame.loc[1] | ||
labels = [i + "_" + j for i, j in zip(bodyparts_labels, coords_labels)] | ||
data_frame.columns = labels | ||
data_frame = data_frame.iloc[2: , : ] | ||
data_frame.index = range(len(data_frame.index)) | ||
data_frame = data_frame.drop(columns=["bodyparts_coords"]) | ||
|
||
# Design parameters | ||
filter_order = 8 | ||
cutoff_frequency = 0.05 # Half power frequency in MATLAB | ||
|
||
# Calculate the normalized cutoff frequency for Python | ||
nyquist_frequency = 0.5 # Nyquist frequency is 0.5 in normalized frequency | ||
cutoff_frequency_python = cutoff_frequency / nyquist_frequency | ||
|
||
# Design Butterworth lowpass filter coefficients | ||
b, a = butter(filter_order, cutoff_frequency_python, btype='low', analog=False) | ||
|
||
#filter the right hock x component | ||
filtered_data = filtfilt(b, a, data_frame["righthock_x"]) | ||
|
||
#take the gradient of the filtered data | ||
hockx_gradient = np.gradient(filtered_data) | ||
|
||
#Peaks of gradient correspond to middle of stride | ||
peaks, _ = find_peaks(np.abs(hockx_gradient), prominence=1, distance=25) | ||
|
||
#Threshold between swing and stance is 0.25 the median peak gradient value | ||
threshold = 0.25 * np.median(hockx_gradient[peaks]) | ||
|
||
footstrikes = np.where((hockx_gradient[:-1] >= threshold) & (hockx_gradient[1:] < threshold))[0] | ||
|
||
# Find locations where values rise from below threshold to above | ||
toeoffs = np.where((hockx_gradient[:-1] < threshold) & (hockx_gradient[1:] >= threshold))[0] | ||
|
||
# Check if any crossings are found | ||
if toeoffs.size > 0: | ||
rise_count_threshold = 10 | ||
# Find locations where values rise above the threshold for the tenth time past the intersection | ||
ten_above = [] | ||
for toeoff in toeoffs: | ||
if hockx_gradient[toeoff+10] and hockx_gradient[toeoff+10] > threshold: | ||
ten_above.append(toeoff+10) | ||
|
||
if ten_above: | ||
toeoffs = ten_above | ||
|
||
strides = [footstrikes[:-1], footstrikes[1:]] | ||
|
||
#stride length as distance between start and end of stride | ||
stride_lengths = strides[1] - strides[0] | ||
|
||
#remove an extra toeoff if one exists before a strike | ||
if toeoffs[0] < footstrikes[0]: | ||
toeoffs = toeoffs[1:] | ||
|
||
#calculate duty factor for each stride based on start and end of stride, and the toeoff inbetween | ||
dutyfactor = np.zeros(len(strides[0])) | ||
for j in range(len(strides[0])): | ||
stridestart = strides[j][0] | ||
strideend = strides[j][1] | ||
|
||
strideToeOff = toeoff[toeoff > stridestart] | ||
|
||
# Temporal | ||
timestance = toeoffs[j] - stridestart | ||
dutyfactor[j] = timestance / (strideend - stridestart) | ||
|
||
print(strides) | ||
print(stride_lengths) | ||
print(dutyfactor) | ||
|
||
plt.figure(figsize=(10, 6)) | ||
#plt.plot(data_frame.index, data_frame["rightHhoof_x"], label='Original Data') | ||
#plt.plot(data_frame.index, filtered_data, label=f'Filtered Data (Cutoff Frequency = {cutoff_frequency} Hz)') | ||
plt.plot(data_frame.index, hockx_gradient, label="First") | ||
plt.plot(peaks, hockx_gradient[peaks], 'rx', label='Detected Peaks') | ||
plt.axhline(y=threshold, color='r', linestyle='--', label=f'Threshold') | ||
plt.plot(footstrikes, hockx_gradient[footstrikes], 'bx', label='Toe Strikes') | ||
plt.plot(toeoffs, hockx_gradient[toeoffs], 'gx', label='Toeoffs') | ||
#plt.fill_between(data_frame.index, 0, swing_stance, color='lightgray', alpha=0.5) | ||
plt.title('Butterworth Lowpass Filtered Signal') | ||
plt.xlabel('Frame') | ||
plt.ylabel('Value') | ||
plt.legend() | ||
plt.grid(True) | ||
plt.show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
import numpy as np | ||
import pandas as pd | ||
from scipy.signal import butter, filtfilt | ||
from scipy.interpolate import interp1d | ||
from scipy.signal import find_peaks | ||
from numpy.linalg import norm, det | ||
|
||
# Placeholder for import_horse function | ||
def import_horse(file_path): | ||
# This function should be implemented to import data from the given file path. | ||
# Assuming the file is in a format that can be read as a DataFrame, like CSV. | ||
# Replace this with actual implementation. | ||
df = pd.read_csv(file_path) | ||
return df.to_numpy().T # Transposed to match MATLAB data structure | ||
|
||
# Function to find the angle | ||
def findangle(x1, y1, l1, x2, y2, l2, x3, y3, l3, frame, L, spotcheck, color, name, directionUsed=None): | ||
angle3 = [] | ||
angle = np.array([x1, y1, x2, y2, x3, y3]).T | ||
valid = (l1 > L) & (l2 > L) & (l3 > L) | ||
angle = angle[valid, :] | ||
goodframes = frame[valid] | ||
|
||
P0 = angle[:, 2:4] | ||
P1 = angle[:, 0:2] | ||
P2 = angle[:, 4:6] | ||
|
||
for i in range(len(goodframes)): | ||
n1 = (P2[i, :] - P0[i, :]) / norm(P2[i, :] - P0[i, :]) | ||
n2 = (P1[i, :] - P0[i, :]) / norm(P1[i, :] - P0[i, :]) | ||
if directionUsed is not None: | ||
angle3.append(np.arctan2(det([n2, n1]), np.dot(n2, n1))) | ||
else: | ||
angle3.append(np.arctan2(norm(det([n2, n1])), np.dot(n1, n2))) | ||
|
||
angle3 = pd.Series(angle3).rolling(window=5).median().to_numpy() | ||
return angle3, goodframes | ||
|
||
# Function to find the floor | ||
def floorfind(HTy, HMy, HHy): | ||
datas = [HTy, HMy, HHy] | ||
floory, floorx = [], [] | ||
for data in datas: | ||
floory.append(np.min(data[:len(data)//3])) | ||
floory.append(np.min(data[len(data)//3:2*len(data)//3])) | ||
floory.append(np.min(data[2*len(data)//3:])) | ||
floorx.append(np.argmin(data[:len(data)//3])) | ||
floorx.append(np.argmin(data[len(data)//3:2*len(data)//3]) + len(data)//3) | ||
floorx.append(np.argmin(data[2*len(data)//3:]) + 2*len(data)//3) | ||
floory = np.array(floory).reshape(-1) | ||
floorx = np.array(floorx).reshape(-1) | ||
P = np.polyfit(floorx, floory, 1) | ||
return P | ||
|
||
# Function to extract Y data features | ||
def pulloutYstuff(data, frames, spotcheck, color, name): | ||
pks, locs, w, p = find_peaks(data, height=4, distance=25) | ||
p = p['peak_heights'] | ||
TFp = np.abs(p - np.mean(p)) < 2 * np.std(p) # Remove outliers based on prominence | ||
TFw = np.abs(w - np.mean(w)) < 2 * np.std(w) # Remove outliers based on width | ||
TF = TFp & TFw | ||
mp = np.mean(p[TF]) | ||
stdp = np.std(p[TF]) | ||
mw = np.mean(w[TF]) | ||
stdw = np.std(w[TF]) | ||
return mw, mp, stdw, stdp | ||
|
||
# Function for finding stride-to-off (FSTO) | ||
def fsto(data, frames, directionfactor, spotcheck, graphfactor, color, name): | ||
dxdf = np.gradient(data) / np.gradient(frames) | ||
dxdf, rframes = remove_outliers(np.column_stack((dxdf, frames)), 25) | ||
pks, locs, _, _ = find_peaks(np.abs(dxdf), height=1, distance=25) | ||
|
||
# Further implementation needed based on MATLAB code | ||
# Placeholder return values | ||
dutyfactor, stridelength, numcycles, strides = 0, 0, 0, np.array([]) | ||
dutyfactorStd, stridelengthStd, byEye = 0, 0, '-' | ||
|
||
return dutyfactor, stridelength, numcycles, strides, dutyfactorStd, stridelengthStd, byEye | ||
|
||
def avgforstride(data, f2, strides, stridesXstart, spotcheck, color, name): | ||
""" | ||
Averages stride data. | ||
""" | ||
# Placeholder implementation. Adjust as per specific requirements. | ||
avg_stride = sum(data) / len(data) | ||
return avg_stride | ||
|
||
def remove_outliers(data, window): | ||
""" | ||
Removes outliers from data based on a moving average method. | ||
""" | ||
# Placeholder implementation. Adjust as per specific requirements. | ||
filtered_data = [d for d in data if abs(d - sum(data) / len(data)) < window] | ||
return filtered_data | ||
|
||
def basiccmooth(datax, datay, dataf, d1): | ||
""" | ||
Smoothes coordinate data. | ||
""" | ||
# Placeholder implementation. Adjust as per specific requirements. | ||
smoothed_datax = [sum(datax[max(i - d1, 0):min(i + d1 + 1, len(datax))]) / (2 * d1 + 1) for i in range(len(datax))] | ||
smoothed_datay = [sum(datay[max(i - d1, 0):min(i + d1 + 1, len(datay))]) / (2 * d1 + 1) for i in range(len(datay))] | ||
return smoothed_datax, smoothed_datay | ||
|
||
# Reading the existing Python script | ||
script_path = '/mnt/data/horse_analysis_script.py' | ||
with open(script_path, 'r') as file: | ||
existing_script = file.read() | ||
|
||
# Adding the new function implementations to the script | ||
new_functions = "\n\n".join([avgforstride.__doc__, remove_outliers.__doc__, basiccmooth.__doc__]) | ||
updated_script = existing_script + "\n\n# New Function Implementations\n" + new_functions | ||
|
||
# Saving the updated script | ||
updated_script_path = '/mnt/data/updated_horse_analysis_script.py' | ||
with open(updated_script_path, 'w') as file: | ||
file.write(updated_script) | ||
|
||
script_content = inspect.getsource(import_horse) + "\n" + \ | ||
inspect.getsource(findangle) + "\n" + \ | ||
inspect.getsource(floorfind) + "\n" + \ | ||
inspect.getsource(pulloutYstuff) + "\n" + \ | ||
inspect.getsource(fsto) + "\n" + \ | ||
inspect.getsource(avgforstride) + "\n" + \ | ||
inspect.getsource(basiccmooth) + "\n" + \ | ||
inspect.getsource(main) | ||
|
||
with open('/mnt/data/horse_analysis_script.py', 'w') as file: | ||
file.write(script_content) | ||
|
||
'/mnt/data/horse_analysis_script.py' | ||
|