## IMU Tibia Left & Right Processing

Specs:
- IMeasureU Blue Trident Senors
- Sampling freq for lowg = 1125hz
- Sampling freq for highg = 1600hz
- 5 minutes of data = 337,500 rows (1125hz) or 478,000 (1600hz)

In [6]:
# Import custom functions ---
import functions.file_import_gui as gui
import functions.data_prep as prep
import functions.custom_plots as plots
import functions.peak_detection as peaks
import functions.stats as stats
import functions.stride_variables as stride

# For saving files
import os

# For dataframes ---
import pandas as pd

# For plotting ---
import plotly.io as pio
import plotly.graph_objects as go
import matplotlib.pyplot as plt

# For DFA analysis
from sklearn.linear_model import LinearRegression

In [None]:
# Bring in IMU data files ---

# Subject to process
sub_id = 'run014'

# low g ---
# set directory
initialdir = f"data/five_min_runs/{sub_id}/lowg_1125hz/left_tibia"
# bring in csv files with data
dfs_lt_lowg, keys_list = gui.read_csv_files_gui(initialdir)
# set directory
initialdir = f"data/five_min_runs/{sub_id}/lowg_1125hz/right_tibia"
# bring in csv files with data
dfs_rt_lowg, keys_list = gui.read_csv_files_gui(initialdir)

# highg --
# set directory
initialdir = f"data/five_min_runs/{sub_id}/highg_1600hz/left_tibia"
# bring in csv files with data
dfs_lt_highg, keys_list = gui.read_csv_files_gui(initialdir)
# set directory
initialdir = f"data/five_min_runs/{sub_id}/highg_1600hz/right_tibia"
# bring in csv files with data
dfs_rt_highg, keys_list = gui.read_csv_files_gui(initialdir)

Prep

In [None]:
# Data prep ---

# lowg ---
# crop data for 5 mins (removes extra rows at the beginning so that there are exactly 5 mins of data)
prep.crop_df_five_mins(dfs_lt_lowg, sample_freq = 1125)
prep.crop_df_five_mins(dfs_rt_lowg, sample_freq = 1125)
# calculate and add resultant column
prep.add_resultant_column(dfs_lt_lowg, column_x = 'ax_m/s/s', column_y = 'ay_m/s/s', column_z = 'az_m/s/s', name_of_res_column = 'res_m/s/s')
prep.add_resultant_column(dfs_rt_lowg, column_x = 'ax_m/s/s', column_y = 'ay_m/s/s', column_z = 'az_m/s/s', name_of_res_column = 'res_m/s/s')
# convert accel columns to gs
prep.accel_to_gs_columns(dfs_lt_lowg)
prep.accel_to_gs_columns(dfs_rt_lowg)
# shift time scale to start at 0
prep.shift_time_s_to_zero(dfs_lt_lowg)
prep.shift_time_s_to_zero(dfs_rt_lowg)

# highg ---
# crop data for 5 mins (removes extra rows at the beginning so that there are exactly 5 mins of data)
prep.crop_df_five_mins(dfs_lt_highg, sample_freq = 1600)
prep.crop_df_five_mins(dfs_rt_highg, sample_freq = 1600)
# calculate and add resultant column
prep.add_resultant_column(dfs_lt_highg, column_x = 'ax_m/s/s', column_y = 'ay_m/s/s', column_z = 'az_m/s/s', name_of_res_column = 'res_m/s/s')
prep.add_resultant_column(dfs_rt_highg, column_x = 'ax_m/s/s', column_y = 'ay_m/s/s', column_z = 'az_m/s/s', name_of_res_column = 'res_m/s/s')
# convert accel columns to gs
prep.accel_to_gs_columns(dfs_lt_highg)
prep.accel_to_gs_columns(dfs_rt_highg)
# shift time scale to start at 0
prep.shift_time_s_to_zero(dfs_lt_highg)
prep.shift_time_s_to_zero(dfs_rt_highg)

Create plots to save in folder

In [None]:
# Create and store plots for specified columns ---

x_col = 'time_s_scaled'
y_cols = ['ax_g', 'ay_g', 'az_g', 'res_g']

# lowg ---
line_plots_lt_lowg = plots.create_line_plots(dfs_lt_lowg, x_col, y_cols)
line_plots_rt_lowg = plots.create_line_plots(dfs_rt_lowg, x_col, y_cols)

# highg ---
line_plots_lt_highg = plots.create_line_plots(dfs_lt_highg, x_col, y_cols)
line_plots_rt_highg = plots.create_line_plots(dfs_rt_highg, x_col, y_cols)

In [None]:
# Save plots to folder ---

# lowg --

# left
# set directory
output_dir = f"plots/{sub_id}/left_tibia/lowg_1125hz/"
# check if directory exists, if not, create it
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
# loop through all plots and save them
for key, fig in line_plots_lt_lowg.items():
    file_path = os.path.join(output_dir, f"{key}.html")
    pio.write_html(fig, file_path)

# right
# set directory
output_dir = f"plots/{sub_id}/right_tibia/lowg_1125hz/"
# check if directory exists, if not, create it
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
# loop through all plots and save them
for key, fig in line_plots_rt_lowg.items():
    file_path = os.path.join(output_dir, f"{key}.html")
    pio.write_html(fig, file_path)

# highg ---

# left
# set directory
output_dir = f"plots/{sub_id}/left_tibia/highg_1600hz/"
# check if directory exists, if not, create it
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
# loop through all plots and save them
for key, fig in line_plots_lt_highg.items():
    file_path = os.path.join(output_dir, f"{key}.html")
    pio.write_html(fig, file_path)

# right
# set directory
output_dir = f"plots/{sub_id}/right_tibia/highg_1600hz/"
# check if directory exists, if not, create it
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
# loop through all plots and save them
for key, fig in line_plots_rt_highg.items():
    file_path = os.path.join(output_dir, f"{key}.html")
    pio.write_html(fig, file_path)

### Tibial Acceleration of Resultant
- AVG Peak Tibial Accleration - Finds peaks of the resultant and then calculates the average from these peaks

Find Peaks

In [None]:
# Set parameters for functions below
# The minimum time between peaks i.e. footstrikes corresponding to 0.50 secs
lowg_time_between_peaks = 562
highg_time_between_peaks = 800

In [None]:
# STEP 1:

# Find peaks with no thresholds
# NOTE: This will be used to then determine individual thresholds for max peak height

# lowg ---
# left
_, dfs_lt_res_peak_values_lowg_no_threshold = peaks.calc_avg_positive_peaks(
    dfs_lt_lowg, columns=['res_g'], time_column='time_s_scaled', 
    min_peak_height=None, max_peak_height=None,
    min_samples_between_peaks=lowg_time_between_peaks
)
# right
_, dfs_rt_res_peak_values_lowg_no_threshold = peaks.calc_avg_positive_peaks(
    dfs_rt_lowg, columns=['res_g'], time_column='time_s_scaled', 
    min_peak_height=None, max_peak_height=None,
    min_samples_between_peaks=lowg_time_between_peaks)

# highg ---
# left
_, dfs_lt_res_peak_values_highg_no_threshold = peaks.calc_avg_positive_peaks(
    dfs_lt_highg, columns=['res_g'], time_column='time_s_scaled', 
    min_peak_height=None, max_peak_height=None,
    min_samples_between_peaks=highg_time_between_peaks
)
# right
_, dfs_rt_res_peak_values_highg_no_threshold = peaks.calc_avg_positive_peaks(
    dfs_rt_highg, columns=['res_g'], time_column='time_s_scaled', 
    min_peak_height=None, max_peak_height=None,
    min_samples_between_peaks=highg_time_between_peaks
)

In [None]:
# STEP 2: 

# Determine subject's individual thresholds for max_peak_height and min_peak_height from peaks indentified w/ no threshold

k = 4 #IQR 
z = 4 #SDs

# lowg ---
summary_tbl_lt_lowg = stats.create_summary_tbl(dfs_lt_res_peak_values_lowg_no_threshold, ['peak_values'], k=k, z=z)
summary_tbl_rt_lowg = stats.create_summary_tbl(dfs_rt_res_peak_values_lowg_no_threshold, ['peak_values'], k=k, z=z)

# highg ---
summary_tbl_lt_highg = stats.create_summary_tbl(dfs_lt_res_peak_values_highg_no_threshold, ['peak_values'], k=k, z=z)
summary_tbl_rt_highg = stats.create_summary_tbl(dfs_rt_res_peak_values_highg_no_threshold, ['peak_values'], k=k, z=z)

In [None]:
# STEP 3:

# Use individualized max and min peak height threshold as upper limit for finding peaks
# NOTE: This uses a different peak function that takes the summary table as inputs and steps through the indiv rows of each run/sensor

# lowg ---
# left
lt_res_peak_accel_lowg_df, dfs_lt_res_peak_values_lowg_threshold = peaks.calc_avg_positive_peaks_from_tbl(
    dfs_lt_lowg, ['res_g'], time_column='time_s_scaled',
    summary_table=summary_tbl_lt_lowg, id_column="id", min_peak_height_column="lower_bound_k", max_peak_height_column="upper_bound_k",
    min_samples_between_peaks=lowg_time_between_peaks
    )

# right
rt_res_peak_accel_lowg_df, dfs_rt_res_peak_values_lowg_threshold = peaks.calc_avg_positive_peaks_from_tbl(
    dfs_rt_lowg, ['res_g'], time_column='time_s_scaled',
    summary_table=summary_tbl_rt_lowg, id_column="id", min_peak_height_column="lower_bound_k", max_peak_height_column="upper_bound_k",
    min_samples_between_peaks=lowg_time_between_peaks
    )

# highg ---
# left
lt_res_peak_accel_highg_df, dfs_lt_res_peak_values_highg_threshold = peaks.calc_avg_positive_peaks_from_tbl(
    dfs_lt_highg, ['res_g'], time_column='time_s_scaled',
    summary_table=summary_tbl_lt_highg, id_column="id", min_peak_height_column="lower_bound_k", max_peak_height_column="upper_bound_k",
    min_samples_between_peaks=highg_time_between_peaks
    )

# right
rt_res_peak_accel_highg_df, dfs_rt_res_peak_values_highg_threshold = peaks.calc_avg_positive_peaks_from_tbl(
    dfs_rt_highg, ['res_g'], time_column='time_s_scaled', 
    summary_table=summary_tbl_rt_highg, id_column="id", min_peak_height_column="lower_bound_k", max_peak_height_column="upper_bound_k",
    min_samples_between_peaks=highg_time_between_peaks
    )

Visually Inspect Peaks

In [None]:
# Set plot keys for specific runs to visualize below ---

# lowg ---
# left
plot_keys_lt_lowg = [
    f'{sub_id}_light_prs_pre_00917_lowg', 
    f'{sub_id}_heavy_prs_pre_00917_lowg', 
]
# right
plot_keys_rt_lowg = [
    f'{sub_id}_light_prs_pre_00925_lowg',
    f'{sub_id}_heavy_prs_pre_00925_lowg'
]

# highg ---
# left
plot_keys_lt_highg = [
    f'{sub_id}_light_prs_pre_00917_highg', 
    f'{sub_id}_heavy_prs_pre_00917_highg', 
]
# right
plot_keys_rt_highg = [
    f'{sub_id}_light_prs_pre_00925_highg',
    f'{sub_id}_heavy_prs_pre_00925_highg'
]

# Create dictionary with dfs I want to plot

# lowg ---
dfs_lt_lowg_plots = {key: dfs_lt_lowg[key] for key in plot_keys_lt_lowg if key in dfs_lt_lowg}
dfs_rt_lowg_plots = {key: dfs_rt_lowg[key] for key in plot_keys_rt_lowg if key in dfs_rt_lowg}

# highg ---
dfs_lt_highg_plots = {key: dfs_lt_highg[key] for key in plot_keys_lt_highg if key in dfs_lt_highg}
dfs_rt_highg_plots = {key: dfs_rt_highg[key] for key in plot_keys_rt_highg if key in dfs_rt_highg}

# Create and store plots for specified columns ---

x_col = 'time_s_scaled'
y_cols = ['res_g']

# lowg ---
line_plots_lt_lowg = plots.create_line_plots(dfs_lt_lowg_plots, x_col, y_cols)
line_plots_rt_lowg = plots.create_line_plots(dfs_rt_lowg_plots, x_col, y_cols)

# highg ---
line_plots_lt_highg = plots.create_line_plots(dfs_lt_highg_plots, x_col, y_cols)
line_plots_rt_highg = plots.create_line_plots(dfs_rt_highg_plots, x_col, y_cols)

In [None]:
# Now actually plot the data ---

# lowg ---
# left
peaks_to_plot = dfs_lt_res_peak_values_lowg_threshold

for key in plot_keys_lt_lowg:
    fig = line_plots_lt_lowg.get(key)
    if fig is not None:
        # If 'peaks_2' trace already exists, remove it before adding new one
        fig.data = [trace for trace in fig.data if trace.name != 'peaks']

        # Check if key exists in peaks_to_plot
        if key in peaks_to_plot:
            # Get corresponding DataFrame
            df_lowg_peaks = peaks_to_plot[key]
            # Add points to figure
            fig.add_trace(go.Scatter(
                x=df_lowg_peaks['time_s_scaled'], 
                y=df_lowg_peaks['peak_values'], 
                mode='markers',
                marker=dict(
                    size=8,
                    color='black',  # for example, choose a color that stands out
                ),
                name='peaks'  # you can name the trace to be referenced in legend
            ))
        fig.show()
    else:
        print(f"No plot found with key {key}")

# right
peaks_to_plot = dfs_rt_res_peak_values_lowg_threshold

for key in plot_keys_rt_lowg:
    fig = line_plots_rt_lowg.get(key)
    if fig is not None:
        # If 'peaks_2' trace already exists, remove it before adding new one
        fig.data = [trace for trace in fig.data if trace.name != 'peaks']

        # Check if key exists in peaks_to_plot
        if key in peaks_to_plot:
            # Get corresponding DataFrame
            df_lowg_peaks = peaks_to_plot[key]
            # Add points to figure
            fig.add_trace(go.Scatter(
                x=df_lowg_peaks['time_s_scaled'], 
                y=df_lowg_peaks['peak_values'], 
                mode='markers',
                marker=dict(
                    size=8,
                    color='black',  # for example, choose a color that stands out
                ),
                name='peaks'  # you can name the trace to be referenced in legend
            ))
        fig.show()
    else:
        print(f"No plot found with key {key}")

In [None]:
# highg ---

# left
peaks_to_plot = dfs_lt_res_peak_values_highg_threshold

for key in plot_keys_lt_highg:
    fig = line_plots_lt_highg.get(key)
    if fig is not None:
        # If 'peaks_2' trace already exists, remove it before adding new one
        fig.data = [trace for trace in fig.data if trace.name != 'peaks']

        # Check if key exists in peaks_to_plot
        if key in peaks_to_plot:
            # Get corresponding DataFrame
            df_highg_peaks = peaks_to_plot[key]
            # Add points to figure
            fig.add_trace(go.Scatter(
                x=df_highg_peaks['time_s_scaled'], 
                y=df_highg_peaks['peak_values'], 
                mode='markers',
                marker=dict(
                    size=8,
                    color='black',  # for example, choose a color that stands out
                ),
                name='peaks'  # you can name the trace to be referenced in legend
            ))
        fig.show()
    else:
        print(f"No plot found with key {key}")

# right
peaks_to_plot = dfs_rt_res_peak_values_highg_threshold

for key in plot_keys_rt_highg:
    fig = line_plots_rt_highg.get(key)
    if fig is not None:
        # If 'peaks_2' trace already exists, remove it before adding new one
        fig.data = [trace for trace in fig.data if trace.name != 'peaks']

        # Check if key exists in peaks_to_plot
        if key in peaks_to_plot:
            # Get corresponding DataFrame
            df_highg_peaks = peaks_to_plot[key]
            # Add points to figure
            fig.add_trace(go.Scatter(
                x=df_highg_peaks['time_s_scaled'], 
                y=df_highg_peaks['peak_values'], 
                mode='markers',
                marker=dict(
                    size=8,
                    color='black',  # for example, choose a color that stands out
                ),
                name='peaks'  # you can name the trace to be referenced in legend
            ))
        fig.show()
    else:
        print(f"No plot found with key {key}")

Export Variables to Excel Table

In [None]:
# Add suffix to varible names

# highg ---
# left
lt_res_peak_accel_highg_df['variable'] = lt_res_peak_accel_highg_df['variable'] + '_lt_1600hz'
# right
rt_res_peak_accel_highg_df['variable'] = rt_res_peak_accel_highg_df['variable'] + '_rt_1600hz'

In [None]:
# Create table to export & append rows to to my processed variables table in Excel ---

# highg --
# left
df_to_export = prep.export_tbl(lt_res_peak_accel_highg_df)
file_path = "data/processed_variables/imu_training_load_variables.xlsx"
sheet_name = "variables"
prep.append_df_to_excel(df_to_export, file_path, sheet_name)
# right
df_to_export = prep.export_tbl(rt_res_peak_accel_highg_df)
file_path = "data/processed_variables/imu_training_load_variables.xlsx"
sheet_name = "variables"
prep.append_df_to_excel(df_to_export, file_path, sheet_name)

### Stride Time

Calculate Stride Times (STs) using the time between each consecutive footstrikes of single leg  (time of step 2 - time of step 1, time of step 3 - time of step 2, etc.)
 - Mean
 - Standard Deviation (SD)
 - Coefficient of Variance (CV)
 - Strides per min (SPM)
 - Fractal Scaling Index (FSI) via Detrended Fluctuation Analysis (DFA)

**Note:** Using HighG data for left and right tibia

In [None]:
# Using the time column where each peak occured, calculate the difference between consecutive foot strikes
# Creates a new column called stride_times for each run

# highg ---
# left
dfs_lt_stride_times_highg = stride.calc_stride_times(dfs=dfs_lt_res_peak_values_highg_threshold , time_column="time_s_scaled")
# right
dfs_rt_stride_times_highg = stride.calc_stride_times(dfs=dfs_rt_res_peak_values_highg_threshold , time_column="time_s_scaled")

In [None]:
# Create a summary table for stride times

k = 3 #IQR 
z = 3 #SDs

# highg ---
st_summary_tbl_lt_highg = stats.create_summary_tbl(dfs_lt_stride_times_highg, ['stride_times'], k=k, z=z)
st_summary_tbl_rt_highg = stats.create_summary_tbl(dfs_rt_stride_times_highg, ['stride_times'], k=k, z=z)

In [None]:
# Remove stride time outliers based on the upper and lower threshold values created in the summary table above

# highg ---
# left
dfs_lt_stride_times_highg_no_outliers, counts_dfs_lt_stride_times_highg = stats.remove_outliers(
    dfs_lt_stride_times_highg, 'stride_times', 
    st_summary_tbl_lt_highg, id_column='id', lower_threshold_column='lower_bound_k', upper_threshold_column='upper_bound_k')
# right
dfs_rt_stride_times_highg_no_outliers, counts_dfs_rt_stride_times_highg = stats.remove_outliers(
    dfs_rt_stride_times_highg, 'stride_times', 
    st_summary_tbl_rt_highg, id_column='id', lower_threshold_column='lower_bound_k', upper_threshold_column='upper_bound_k')


In [None]:
# Calculate stride time variables 
# Returns a single table with the columns key, variable, and value

# highg ---
# left
st_vars_lt_highg_df = stride.calc_stride_times_vars(dfs_lt_stride_times_highg,'stride_times', total_run_time_mins=5)
# right
st_vars_rt_highg_df = stride.calc_stride_times_vars(dfs_rt_stride_times_highg, 'stride_times', total_run_time_mins=5)

Export Variables to Excel Table

In [None]:
# Add suffix to varible names

# highg ---
# left
st_vars_lt_highg_df['variable'] = st_vars_lt_highg_df['variable'] + '_lt_1600hz'
# right
st_vars_rt_highg_df['variable'] = st_vars_rt_highg_df['variable'] + '_rt_1600hz'

In [None]:
# Create table to export & append rows to to my processed variables table in Excel ---

# highg --
# left
df_to_export = prep.export_tbl(st_vars_lt_highg_df)
file_path = "data/processed_variables/imu_training_load_variables.xlsx"
sheet_name = "variables"
prep.append_df_to_excel(df_to_export, file_path, sheet_name)
# right
df_to_export = prep.export_tbl(st_vars_rt_highg_df)
file_path = "data/processed_variables/imu_training_load_variables.xlsx"
sheet_name = "variables"
prep.append_df_to_excel(df_to_export, file_path, sheet_name)