In [None]:
#Part 1: Importing Main Libraries
import pandas as pd #used for data manipulation
import os #used for directory operations
import re #used for regular expression handling
import numpy as np #used for numerical computing
import scipy.fft as fft  # used for frequency domain computing
import scipy.stats as stats #used for statistical computing
import matplotlib.pyplot as plt #used for visualizations

In [5]:
# Part 2: Defining the main directory contatining exercise data.
# data_dir = '/Users/Vonara/Library/CloudStorage/GoogleDrive-vonara.queens@gmail.com/.shortcut-targets-by-id/1U60BMD1q6t41t1odOePZDMBR7ypHkU2N/BME 1570 Project/PHYTMO/inertial/inertial_exercise_data'
data_dir = "G:/My Drive/BME 1570 Project/PHYTMO/inertial/inertial_exercise_data"

metadata = [] #initializes an empty list to store the metadata and data from each file

#checks to make sure the directory exists: 
if os.path.exists(data_dir):
    print("Main directory exists.")
    
    # looks for any exercise folders within the directory
    exercise_folders = [folder for folder in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, folder))]
    
    if exercise_folders:
        print(f"Found {len(exercise_folders)} exercise folders in the main directory.")
        
        for exercise_folder in exercise_folders:
            exercise_path = os.path.join(data_dir, exercise_folder)
            print(f"\nProcessing folder: {exercise_folder}")
            
            #check for any .csv files within the folder
            csv_files = [f for f in os.listdir(exercise_path) if f.endswith('.csv')]
            
            #if there are .csv files in the folder, print out the number of files present
            if csv_files:
                print(f"Found {len(csv_files)} CSV files in {exercise_folder}.")
                
                #test to read the first csv file present (to ensure the files are accessed and readable)
                test_file_path = os.path.join(exercise_path, csv_files[0])
                print(f"Attempting to read file: {test_file_path}")
                
                try:
                    test_df = pd.read_csv(test_file_path)
                    print("CSV file read successfully! Displaying the first few rows:")
                    print(test_df.head())  # displays the first few rows to verify contents
                    
                except Exception as e:
                    print(f"Error reading {test_file_path}: {e}")
            else:
                print(f"No CSV files found in folder {exercise_folder}.")
    else:
        print("No exercise folders found in the main directory.")
else:
    print("Main directory does not exist.")


Main directory exists.
Found 10 exercise folders in the main directory.

Processing folder: zMislabeled
No CSV files found in folder zMislabeled.

Processing folder: Extension Arms Over Head
Found 464 CSV files in Extension Arms Over Head.
Attempting to read file: G:/My Drive/BME 1570 Project/PHYTMO/inertial/inertial_exercise_data\Extension Arms Over Head\A01EAHXLAY_1.csv
CSV file read successfully! Displaying the first few rows:
    Time (s)  Gyroscope X (deg/s)  Gyroscope Y (deg/s)  Gyroscope Z (deg/s)  \
0  11.350807             40.48140            -46.52059            -5.424869   
1  11.360693             50.55264            -53.34631            -0.232101   
2  11.370578             58.51016            -63.40838             6.182496   
3  11.380463             62.17809            -73.23508            13.330190   
4  11.390348             61.61857            -80.94344            20.783340   

   Accelerometer X (g)  Accelerometer Y (g)  Accelerometer Z (g)  \
0            -0.080877 

In [6]:
# Part 3: Incorporating data into the metadata list. 

# loops through exercise folders in main directory
for exercise_folder in os.listdir(data_dir): 
    exercise_path = os.path.join(data_dir, exercise_folder) #creates path for exercise folder


    #checking to see if the path is a directory (we are only looking at folders):
    if os.path.isdir(exercise_path):
        print(f"Processing Folder: {exercise_folder}") #checkpoint to show which folder you are in


        # check to see if there are .csv files in the folder 
        csv_files = [f for f in os.listdir(exercise_path) if f.endswith(".csv")]
        if not csv_files:
            print(f"  Skipping {exercise_folder} (no CSV files found).")
            continue  # skips to the next folder if no CSV files are present

        # Process each CSV file in the folder
        for filename in csv_files:
            #match the filename to the nomenclature implemented
            pattern = r'([A-E])(\d{2})(KFE|HAA|SQT|EAH|EFE|SQZ|GAT|GIS|GHT)([LRX])([LR])([STAF])([YN])_(\d)\.csv'

            match = re.match(pattern, filename)

             #if statement to extract metadata, if filename matches pattern
            if match: 
                #metadata extractions from filename: 
                patient_age, patient_id, exercise_code, LRX_sensor, LR_body, muscle_group, correctness, series = match.groups()

                #defining the file path to extract data from csv
                file_path = os.path.join(exercise_path, filename)

                try:
                    data = pd.read_csv(file_path)
                   # print(f" Successfully read file: {filename}") #checkpoint to show which file you are reading
                    # Add metadata columns to the DataFrame
                    data['Patient_Age'] = patient_age          # Store exercise code (from filename)
                    data['Patient_ID'] = patient_id            # Store patient ID
                    data['Exercise'] = exercise_code           # Store exercise code (from filename)
                    data['Exercise_Type'] = exercise_folder    # Store exercise type (from folder name)
                    data['Sensor_Location'] = LRX_sensor       # Store exercise code (from filename)
                    data['Left_Right_Side'] = LR_body          # Store exercise code (from filename)
                    data['Muscle_Group'] = muscle_group        # Store muscle group
                    data['Correctness'] = correctness          # Store Correctness
                    data['Series'] = int(series)               # Store series number as an integer
                        
                    # Append the DataFrame to the metadata list for future concatenation
                    metadata.append(data)
                except Exception as e:
                    print(f"  Error reading {file_path}: {e}")  # Error message if reading fails
            else:
                print(f"  Filename {filename} does not match the expected pattern.")
 




Processing Folder: zMislabeled
  Skipping zMislabeled (no CSV files found).
Processing Folder: Extension Arms Over Head
Processing Folder: Squeezing
Processing Folder: Elbow Flex Extension
Processing Folder: Knee Flex Extension
Processing Folder: Gait
Processing Folder: Hip Abduction
Processing Folder: Squat
Processing Folder: Heel Tiptoe Gait
  Skipping Heel Tiptoe Gait (no CSV files found).
Processing Folder: Gait Describing
  Skipping Gait Describing (no CSV files found).


In [7]:
# Part 4: Concatenate all the dataframes into one single dataframe

if metadata:
    all_data = pd.concat(metadata, ignore_index=True)
    print("\nAll files successfully concatenated.")
    # print(all_data.head())  # Display the first few rows to verify data organization

    # reordering dataframe to a more logical order (for readability)
    column_order = [
        'Patient_Age', 'Patient_ID', 'Exercise', 'Sensor_Location', 'Left_Right_Side', 'Muscle_Group', 'Correctness', 'Series',
        'Gyroscope X (deg/s)', 'Gyroscope Y (deg/s)', 'Gyroscope Z (deg/s)',
        'Accelerometer X (g)', 'Accelerometer Y (g)', 'Accelerometer Z (g)',
        'Magnetometer X (uT)', 'Magnetometer Y (uT)', 'Magnetometer Z (uT)'
    ]

    all_data = all_data[column_order]

    # printing first few lines to verify structure
    print(all_data.head())

else:
    print("No data files were loaded.")



All files successfully concatenated.
  Patient_Age Patient_ID Exercise Sensor_Location Left_Right_Side  \
0           A         01      EAH               X               L   
1           A         01      EAH               X               L   
2           A         01      EAH               X               L   
3           A         01      EAH               X               L   
4           A         01      EAH               X               L   

  Muscle_Group Correctness  Series  Gyroscope X (deg/s)  Gyroscope Y (deg/s)  \
0            A           Y       1             40.48140            -46.52059   
1            A           Y       1             50.55264            -53.34631   
2            A           Y       1             58.51016            -63.40838   
3            A           Y       1             62.17809            -73.23508   
4            A           Y       1             61.61857            -80.94344   

   Gyroscope Z (deg/s)  Accelerometer X (g)  Accelerometer Y (g)  

In [8]:
# Part 5: Add in columns to turn the categorical data into numerical data (one-hot encoding)
# Defining exercise mapping for one-hot encoding
exercise_mapping = {
    'KFE': 'Knee_Flexion_Extension',
    'HAA': 'Hip_Abduction_Adduction',
    'SQT': 'Squat',
    'EAH': 'Elbow_Abduction_Horizontal',
    'EFE': 'Elbow_Flexion_Extension',
    'SQZ': 'Squeeze',
    'GAT': 'Gait',
    'GIS': 'Gait_with_Step',
    'GHT': 'Gait_with_Head_Turn'
}

# Adding One-Hot Encoded Columns to `all_data`
for code, name in exercise_mapping.items():
    # Create a new column for each exercise type
    all_data[name] = (all_data['Exercise'] == code).astype(int)

# One-Hot Encoding the muscle group (for use later on)
muscle_type = {
    'S': 'Shin',
    'T': 'Thigh',
    'A': 'Arm',
    'F': 'Forearm',
}

# Adding One-Hot Encoded Columns to `all_data`
for code, name in muscle_type.items():
    # Create a new column for each exercise type
    all_data[name] = (all_data['Muscle_Group'] == code).astype(int)
    
# One-Hot Encoding the Right/Left Column
all_data['Right'] = (all_data['Left_Right_Side'] == 'R').astype(int)
all_data['Left'] = (all_data['Left_Right_Side'] == 'L').astype(int)

# One-Hot Encoding the Correctness Column
all_data['Correct'] = (all_data['Correctness'] == 'Y').astype(int)
all_data['Incorrect'] = (all_data['Correctness'] == 'N').astype(int)

# Print the DataFrame to check the added columns
print(all_data.head())


  Patient_Age Patient_ID Exercise Sensor_Location Left_Right_Side  \
0           A         01      EAH               X               L   
1           A         01      EAH               X               L   
2           A         01      EAH               X               L   
3           A         01      EAH               X               L   
4           A         01      EAH               X               L   

  Muscle_Group Correctness  Series  Gyroscope X (deg/s)  Gyroscope Y (deg/s)  \
0            A           Y       1             40.48140            -46.52059   
1            A           Y       1             50.55264            -53.34631   
2            A           Y       1             58.51016            -63.40838   
3            A           Y       1             62.17809            -73.23508   
4            A           Y       1             61.61857            -80.94344   

   ...  Gait_with_Step  Gait_with_Head_Turn  Shin  Thigh  Arm  Forearm  Right  \
0  ...               0 

In [None]:
# Part 6: Feature Extraction
# This section will extract important features from the gyroscope, accelerometer and magnetomater data. 
# it will extract statistical, fft and two other types of features (described more in the writing)

# Feature extraction functions
def extract_stats_features(window_data):
    features = [
        np.mean(window_data),            # Mean: Average value of the window data
        np.std(window_data),             # Standard Deviation: Measures the spread of the data
        np.max(window_data),             # Max: Maximum value in the window
        np.min(window_data),             # Min: Minimum value in the window
        np.sum(window_data ** 2),        # Energy (sum of squares): Sum of squared values in the window
        stats.skew(window_data),         # Skewness: Measures asymmetry of the data distribution
        stats.kurtosis(window_data),     # Kurtosis: Measures the "tailedness" of the data distribution
        np.sum(np.abs(window_data)),     # Signal Magnitude Area (SMA): Sum of absolute values of data
        np.sqrt(np.mean(window_data ** 2)) # Root Mean Square (RMS): Square root of the mean of squared data values
       
    ]
    return features

def extract_fft_features(window_data):
    # Applying fast fourier transform to the window data (can only apply to one axis of data)

    n = len(window_data) # gets the length of the windowed data
    freqs = fft.fftfreq(n) # gets the frequency values of the fft
    fft_vals = np.abs(fft.fft(window_data)) #applies fast fourier transform to the data and obtains the absolute value of it


    fft_features = [
        np.mean(fft_vals),                    # Mean FFT
        np.std(fft_vals),                     # Standard Deviation of FFT
        np.max(fft_vals),                     # Max FFT value
        np.min(fft_vals),                     # Min FFT value
        np.sum(fft_vals ** 2)                 # FFT energy
    ]
    return fft_features

def extract_other_features(window_data):
    # Extracts features such as Mean Absolute Value and Zero Crossing Rate

    mav = np.mean(np.abs(window_data))                                         # Mean Absolute Value (MAV)                                     
    zcr = np.sum(np.abs(np.diff(np.sign(window_data)))) / len(window_data)     # Zero Crossing Rate (ZCR): Counts how many times the signal crosses zero (indicates frequency of signal oscillation)
    return [mav, zcr]


# Calculate features for each axis in the window
def calculate_features(window):
    combined_features = []  # Initialize an empty list to store all features for the current window

    for i in range(window.shape[1]):  # Loop over each axis (e.g., X, Y, Z)
        axis_data = window[:, i]  # Extract data for the current axis (e.g., X, Y, or Z) for the entire window

        # Extract all Features for the current axis
        stats_features = extract_stats_features(axis_data) # Extract statistical features for this axis (mean, std, max, min, etc.)
        fft_features = extract_fft_features(axis_data) # Extract frequency-domain features for this axis using FFT (mean, std, max of FFT values, etc.)
        other_features = extract_other_features(axis_data) # Extract other features for this axis, such as Mean Absolute Value (MAV) and Zero Crossing Rate (ZCR)
        
        # Concatenate all features from this axis into the combined features list for the window
        combined_features.extend(stats_features + fft_features + other_features)

    return combined_features  # Return the combined list of features for all axes in the window


# Extract features for each sensor
def extract_features_with_windowing(data, window_size, overlap):
    step_size = int(window_size * (1 - overlap)) # calculates the stepsize for moving the window with a 50% overlap
    features = []
    
    for start in range(0, len(data) - window_size + 1, step_size):
        # for loop that will go through the data using a sliding window
        window = data[start:start + window_size]
        feature_vector = calculate_features(window)
        
        # Verify all elements in feature vector are scalars
        if all(isinstance(f, (float, int)) for f in feature_vector):
            features.append(feature_vector)
        else:
            print(f"Skipping window {start} due to unexpected feature structure or types.")
    
    # Ensure consistent feature arrays
    consistent_features = [f for f in features if len(f) == len(features[0])]
    print(f"Consistent feature count: {len(consistent_features)} / {len(features)}")

    if consistent_features:
        return np.vstack(consistent_features)
    else:
        print("No valid features to stack. Returning empty array.")
        return np.array([])


# Example usage
window_size = 100  # Window size in samples
overlap = 0.5      # 50% overlap

# Extract features for each sensor
gyro_features = extract_features_with_windowing(gyro_data, window_size, overlap)
accel_features = extract_features_with_windowing(accel_data, window_size, overlap)
mag_features = extract_features_with_windowing(mag_data, window_size, overlap)

# Combine features from all sensors
if gyro_features.size > 0 and accel_features.size > 0 and mag_features.size > 0:
    combined_features = np.hstack([gyro_features, accel_features, mag_features])
    print("Combined feature matrix shape:", combined_features.shape)
else:
    print("No features extracted for one or more sensors.")

# Print the first few rows of combined features for inspection
print("\nFirst few examples of the combined extracted features (first 3 windows):")
for i in range(min(3, len(combined_features))):  # Print up to 3 rows
    print(f"Window {i+1}: {combined_features[i, :10]} ...")  # Print first 10 features as example


Consistent feature count: 281901 / 281901
Consistent feature count: 281901 / 281901
Consistent feature count: 281901 / 281901
Combined feature matrix shape: (281901, 144)

First few examples of the combined extracted features (first 3 windows):
Window 1: [ 2.31308819e+01  5.33746557e+01  1.21797300e+02 -8.60308200e+01
  3.38389157e+05 -2.54547999e-01 -8.42358523e-01  4.99310469e+03
  5.81712263e+01  2.48763147e+02] ...
Window 2: [-5.06073825e-01  3.07785747e+01  6.08725500e+01 -7.49027200e+01
  9.47576768e+04 -4.90869517e-01 -3.41200437e-02  2.33441449e+03
  3.07827349e+01  1.68250509e+02] ...
Window 3: [ 1.20790268e+00  1.47992738e+01  5.01174600e+01 -1.67133100e+01
  2.20477535e+04  1.89270190e+00  2.87428983e+00  9.81300553e+02
  1.48484860e+01  8.44035164e+01] ...


In [None]:
# Part 6; Data processing

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# Define feature and target columns
sensor_columns = [
    'Gyroscope X (deg/s)', 'Gyroscope Y (deg/s)', 'Gyroscope Z (deg/s)',
    'Accelerometer X (g)', 'Accelerometer Y (g)', 'Accelerometer Z (g)',
    'Magnetometer X (uT)', 'Magnetometer Y (uT)', 'Magnetometer Z (uT)'
]
X = all_data[sensor_columns]
y_exercise = all_data[[col for col in all_data.columns if col in exercise_mapping.values()]]  # One-hot encoded exercise columns
y_correct = all_data['Correct']  # Binary column for correct/incorrect

# Split data into train and test sets
X_train, X_test, y_exercise_train, y_exercise_test, y_correct_train, y_correct_test = train_test_split(
    X, y_exercise, y_correct, test_size=0.2, random_state=42
)

# Standardize the features based on training data
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Reshape data for sequential input if using a 1D CNN
# Example assumes 10 timesteps with 9 features per timestep
X_train_reshaped = X_train.reshape((-1, 10, 9))
X_test_reshaped = X_test.reshape((-1, 10, 9))
# look into timesteps 