# TBL Model Development: Linear Regression (Elastic Net)

In [1]:
# imports
import os
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.model_selection import GridSearchCV, train_test_split, KFold
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.ensemble import RandomForestRegressor
from joblib import Parallel, delayed


In [2]:
# Processing Data Function (switching to using radius)

def read_and_process_data(directory_path):
    data_frames = []
    columns_to_extract = ['radius_X', 'radius_Y', 'radius_Z', 'radius_Ox', 'radius_Oy', 'radius_Oz', 'Fx', 'Fy', 'Fz', 'Tx', 'Ty', 'Tz']

    # Assuming each cycle has exactly 356 data points
    total_data_points = 356

    for file_name in os.listdir(directory_path):
        if file_name.endswith(".csv"):
            file_path = os.path.join(directory_path, file_name)
            participant = int(os.path.basename(file_path).split('_')[0])
            cycle_id = os.path.basename(file_path).split('_')[1].split('.')[0]  # Extract cycle_id
            intensity = cycle_id[:4]  # Extract the first four characters of cycle_id as intensity

            # Read data from CSV and select only the desired columns
            df = pd.read_csv(file_path, usecols=columns_to_extract)

            # Add participant ID, cycle_id, and participant_cycle_id as features
            df['Participant'] = participant
            df['Cycle_ID'] = cycle_id
            df['Participant_Cycle_ID'] = f"{participant}_{cycle_id}"
            #df['Intensity'] = intensity

            # Since Data will get shuffled (idk why but shuffling makes the model so much better.. ??)
            # So thus, need to store original index values 
            df['Original_Index'] = df.index

            # # Add normalized_cycle_position
            # df['Normalized_Cycle_Position'] = df.index / (total_data_points - 1)

            if (intensity == "HIIT"):
                df['Intensity'] = 0.9
            else:
                df['Intensity'] = 0.5
            
            # df['Intensity'] = intensity  # this is either "HIIT" or "MICT"

            data_frames.append(df)

    # Concatenate all data frames
    processed_data = pd.concat(data_frames, ignore_index=True)

    # Merge with participant weights
    weights_df = pd.read_csv("Participant Weights.csv")
    weights_df['Weight'] = weights_df['Weight'].astype(float)
    weights_df['Wingspan'] = weights_df['Wingspan'].astype(float)
    processed_data = pd.merge(processed_data, weights_df, left_on='Participant', right_on='Participant')

    return processed_data


In [3]:
#  # Set up paths
# data_directory = "Processed Data for ML"

# # Read and process data
# data = read_and_process_data(data_directory)

# # Shuffle the data based on 'Participant_Cycle_ID'
# data = data.sample(frac=1, random_state=42).reset_index(drop=True)

# # Get unique participants
# participants = data['Participant'].unique()

# # Randomly select 16 participants for the train set and 4 participants for the test set
# train_participants = np.random.choice(participants, size=16, replace=False)
# test_participants = np.setdiff1d(participants, train_participants)

# # Split the data into train and test based on the selected participants
# train_data = data[data['Participant'].isin(train_participants)]
# test_data = data[data['Participant'].isin(test_participants)]

# # Specify the output columns
# output_columns = ['Fx', 'Fy', 'Fz', 'Tx', 'Ty', 'Tz']

# # Create X (input) and y (output) for train/validation/test
# X_train = train_data.drop(output_columns, axis=1)  # Dropping the output columns to create input
# y_train = train_data[output_columns]  # Creating output, each column will be a separate y

# X_test = test_data.drop(output_columns, axis=1)  # Dropping the output columns to create input
# y_test = test_data[output_columns]  # Creating output, each column will be a separate y

In [4]:
# Set up paths
data_directory = "Processed Data for ML"
output_folder = "Train and Test Data 10 Fold"

# Read and process data
data = read_and_process_data(data_directory)

# Shuffle the data
data = data.sample(frac=1, random_state=42).reset_index(drop=True)

# Get unique participants
participants = data['Participant'].unique()

# Initialize KFold cross-validator with 10 folds
kf = KFold(n_splits=10, shuffle=True, random_state=42)

# Create lists to store train and test sets for all folds
all_X_train_sets = []
all_y_train_sets = []
all_X_test_sets = []
all_y_test_sets = []

fold_number = 1

# Iterate over each fold
for train_index, test_index in kf.split(participants):
    train_participants = participants[train_index]
    test_participants = participants[test_index]
    
    # Split the data into train and test based on the selected participants
    train_data = data[data['Participant'].isin(train_participants)]
    test_data = data[data['Participant'].isin(test_participants)]
    
    # Specify the output columns
    output_columns = ['Fx', 'Fy', 'Fz', 'Tx', 'Ty', 'Tz']
    
    # Create X (input) and y (output) for train/validation/test
    X_train = train_data.drop(output_columns, axis=1)  # Dropping the output columns to create input
    y_train = train_data[output_columns]  # Creating output, each column will be a separate y
    
    X_test = test_data.drop(output_columns, axis=1)  # Dropping the output columns to create input
    y_test = test_data[output_columns]  # Creating output, each column will be a separate y
    
    # Store train and test data
    fold_folder = os.path.join(output_folder, f"Fold_{fold_number}")
    os.makedirs(fold_folder, exist_ok=True)
    
    # Save train and test data to CSV
    train_data.to_csv(os.path.join(fold_folder, "train_data.csv"), index=False)
    test_data.to_csv(os.path.join(fold_folder, "test_data.csv"), index=False)
    
    # Save participant numbers to text files
    np.savetxt(os.path.join(fold_folder, "train_participants.txt"), train_participants, fmt='%d')
    np.savetxt(os.path.join(fold_folder, "test_participants.txt"), test_participants, fmt='%d')
    
    # Append train and test sets to lists
    all_X_train_sets.append(X_train)
    all_y_train_sets.append(y_train)
    all_X_test_sets.append(X_test)
    all_y_test_sets.append(y_test)
    
    fold_number += 1

In [5]:
# from sklearn.pipeline import Pipeline
# from sklearn.compose import ColumnTransformer
# from sklearn.impute import SimpleImputer
# from sklearn.preprocessing import StandardScaler, PolynomialFeatures
# from sklearn.linear_model import ElasticNet
# from sklearn.model_selection import GridSearchCV
# import gc

# # Define numerical features
# numeric_features = ['radius_X', 'radius_Y', 'radius_Z', 'radius_Ox', 'radius_Oy', 'radius_Oz', 'Weight', 'Wingspan', 'Intensity']

# # Create transformers for numerical and categorical features
# numeric_transformer = Pipeline(steps=[
#     ('imputer', SimpleImputer(strategy='median')),
#     ('scaler', StandardScaler())
# ])

# # Create a preprocessor that applies transformers to specific columns 
# preprocessor = ColumnTransformer(
#     transformers=[
#         ('num', numeric_transformer, numeric_features)
#     ]
# )

# # Initialize a dictionary to store best models for each fold
# best_models = {}

# # Define the parameter grid for GridSearchCV
# param_grid = {
#     'regressor__alpha': [0.001, 0.005, 0.01, 0.1],  # values for alpha
#     'regressor__l1_ratio': [0, 0.5, 1.0],  # values for l1_ratio
# }

# # Iterate over each fold
# for fold_number in range(1, 2):
#     X_train = all_X_train_sets[fold_number - 1]
#     y_train = all_y_train_sets[fold_number - 1]
    
#     # Clear memory before starting a new fold
#     gc.collect()
    
#     # Create a pipeline with the preprocessor, polynomial features, and the regressor
#     pipeline = Pipeline(steps=[
#         ('preprocessor', preprocessor),
#         ('poly', PolynomialFeatures(degree=2, include_bias=False)),  # Add polynomial features of degree 2
#         ('regressor', ElasticNet(max_iter=7000))  # ElasticNet regressor with default alpha and l1_ratio
#     ])
    
#     # Create a GridSearchCV object
#     grid_search = GridSearchCV(pipeline, param_grid, cv=5, scoring='neg_mean_squared_error', n_jobs=-1) 
    
#     # Fit the GridSearchCV object to your training data
#     grid_search.fit(X_train, y_train)
    
#     # Store the best model for this output column in the corresponding fold's dictionary
#     print(f"\nFold {fold_number} completed.")
    
#     best_models[str(fold_number)] = grid_search.best_estimator_
    
#     # Print the best parameters found by GridSearchCV
#     print(f"Best parameters found by GridSearchCV for Fold {fold_number}: {grid_search.best_params_}")
    
#     # Clear memory after completing a fold
#     del X_train, y_train, pipeline
#     gc.collect()


In [6]:
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import ElasticNet
import gc  # Import the garbage collector module

# Define numerical features
numeric_features = ['radius_X', 'radius_Y', 'radius_Z', 'radius_Ox', 'radius_Oy', 'radius_Oz', 'Weight', 'Wingspan', 'Intensity']

# Create transformers for numerical and categorical features
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

# Create a preprocessor that applies transformers to specific columns 
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features)
    ]
)

# Initialize a dictionary to store best models for each fold
best_models = {}

# Iterate over each fold
for fold_number in range(1, 11):
    X_train = all_X_train_sets[fold_number - 1]
    y_train = all_y_train_sets[fold_number - 1]
    
    # Clear memory before starting a new fold
    gc.collect()
    
    # Create a pipeline with the preprocessor and the regressor
    pipeline = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('regressor', ElasticNet(alpha=0.003, l1_ratio=1.0, max_iter=2000))  # Set parameters directly
    ])
    
    # Fit the pipeline to your training data
    pipeline.fit(X_train, y_train)
    
    # Store the model for this output column in the corresponding fold's dictionary
    print(f"\nFold {fold_number} completed.")
    
    best_models[str(fold_number)] = pipeline
    
    # Clear memory after completing a fold
    del X_train, y_train, pipeline
    gc.collect()


Fold 1 completed.

Fold 2 completed.

Fold 3 completed.

Fold 4 completed.

Fold 5 completed.

Fold 6 completed.

Fold 7 completed.

Fold 8 completed.

Fold 9 completed.

Fold 10 completed.


In [7]:
def calculate_mdpe_by_participant_cycle_ids(y_pred, y_test, X_test):
    """
    Calculate Median Percentage Error (MDPE) for each unique participant_cycle_id for a specific output column.
    
    Args:
    - y_pred (numpy array): Predicted values.
    - y_test (numpy array): True values.
    - X_test (DataFrame): DataFrame containing the test data including 'Participant_Cycle_ID'.
    - output_column (str): Name of the output column.
    
    Returns:
    - mdpe_scores (dict): Dictionary containing MDPE scores for each unique participant_cycle_id.
    """
    mdpe_scores = []
    
    # Get unique participant_cycle_ids
    unique_participant_cycle_ids = X_test['Participant_Cycle_ID'].unique()
    
    # Calculate MDPE for each unique participant_cycle_id
    for unique_id in unique_participant_cycle_ids:
        mask = X_test['Participant_Cycle_ID'] == unique_id
        y_pred_id = y_pred[mask]
        y_test_id = y_test[mask]
        
        # Exclude NaN values
        mask_valid = ~np.isnan(y_test_id)
        y_pred_id = y_pred_id[mask_valid]
        y_test_id = y_test_id[mask_valid]
        
        mdpe = np.median((y_pred_id - y_test_id) / y_test_id * 100)
        mdpe_scores.append(mdpe)
        
    return mdpe_scores

In [10]:
import os
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Initialize y_pred as a dictionary with keys for each fold number
y_pred = {fold_number: [] for fold_number in range(1, 11)}

# Create a directory to store the y_pred for each fold and each output
output_directory = "Linear Regression Outputs"
os.makedirs(output_directory, exist_ok=True)

mdpe_scores_outputs = {}

# Accumulate MDPE scores across all folds for each output variable
combined_mdpe_scores = {output_col: [] for output_col in output_columns}

# Iterate over each fold
for fold_number in range(1, 11):
    X_test = all_X_test_sets[fold_number - 1]
    y_test = all_y_test_sets[fold_number - 1]
    
    print(f"\nFold {fold_number}:")
    
    # Predict y_pred for all output columns in this fold
    y_pred[fold_number] = best_models[str(fold_number)].predict(X_test)

    # Save selected columns from X_test, y_test, and y_pred for this fold as CSV
    fold_output_directory = os.path.join(output_directory, f"Fold_{fold_number}")
    os.makedirs(fold_output_directory, exist_ok=True)

    # Convert the NumPy array to a pandas DataFrame
    y_pred_df = pd.DataFrame(y_pred[fold_number], columns=output_columns)
    
    # Save train and test data to CSV
    X_test.to_csv(os.path.join(fold_output_directory, "X_test.csv"), index=False)
    y_test.to_csv(os.path.join(fold_output_directory, "y_test.csv"), index=False)
    y_pred_df.to_csv(os.path.join(fold_output_directory, "y_pred.csv"), index=False)

    # Calculate MDPE scores for this fold and each output column
    mdpe_scores_list = []
    for i, output_col in enumerate(output_columns):
        y_pred_fold_output = y_pred[fold_number][:, i]
        
        mdpe_scores = calculate_mdpe_by_participant_cycle_ids(y_pred_fold_output, y_test[output_col], X_test)
        mdpe_scores_outputs[(fold_number, output_col)] = mdpe_scores
        mdpe_scores_list.append(pd.DataFrame({'Output': [output_col]*len(mdpe_scores), 'MDPE': mdpe_scores}))
        
        # Append MDPE scores to combined_mdpe_scores
        combined_mdpe_scores[output_col].extend(mdpe_scores)
        
        average = np.mean(mdpe_scores)
        std = np.std(mdpe_scores)
        print(f"Output Column: {output_col}")
        print("Average of MdPEs:", average)
        print("Standard Deviation of MdPEs:", std)
        
        # Save average and standard deviation of MdPEs to the output directory
        with open(os.path.join(output_directory, f"Fold_{fold_number}_mdpe_scores.txt"), 'a') as f:
            f.write(f"Output Column: {output_col}\n")
            f.write(f"Average of MdPEs: {average}\n")
            f.write(f"Standard Deviation of MdPEs: {std}\n\n")
    
    # Combine MDPE scores for all output columns in this fold
    mdpe_df = pd.concat(mdpe_scores_list)
    
    # Plot MDPE scores for this fold and all output columns
    plt.figure(figsize=(12, 8))
    sns.boxplot(x='Output', y='MDPE', data=mdpe_df, palette='husl')  # Using 'husl' palette for more colorful plots
    plt.title(f'Box-and-Whisker Plot of MDPE Scores for Fold {fold_number}')
    plt.ylabel('MDPE')
    plt.xlabel('Output Column')
    plt.xticks(rotation=45)
    plt.savefig(os.path.join(fold_output_directory, f"MDPE_plot.png"))
    plt.close()

# Calculate the mean and standard deviation of MDPE scores for each output variable across all folds
average_mdpe_scores = {}
std_mdpe_scores = {}

for output_col in output_columns:
    average_mdpe_scores[output_col] = np.mean(combined_mdpe_scores[output_col])
    std_mdpe_scores[output_col] = np.std(combined_mdpe_scores[output_col])

# Write mean and standard deviation of MDPEs to a txt file
with open(os.path.join(output_directory, "average_mdpe_scores.txt"), 'w') as f:
    for output_col in output_columns:
        f.write(f"Output Column: {output_col}\n")
        f.write(f"Average of MdPEs across 10 folds: {average_mdpe_scores[output_col]}\n")
        f.write(f"Standard Deviation of MdPEs across 10 folds: {std_mdpe_scores[output_col]}\n\n")

# Combine MDPE scores for all output columns across all folds
combined_mdpe_df = pd.concat([pd.DataFrame({'Output': [output_col]*len(combined_mdpe_scores[output_col]), 'MDPE': combined_mdpe_scores[output_col]}) for output_col in output_columns], ignore_index=True)

# Save combined MDPE scores to CSV
combined_mdpe_df.to_csv(os.path.join(output_directory, "combined_mdpe_scores.csv"), index=False)

# Plot combined MDPE scores for all output columns
plt.figure(figsize=(12, 8))
sns.boxplot(x='Output', y='MDPE', data=combined_mdpe_df, palette='husl')  # Using 'husl' palette for more colorful plots
plt.title('Box-and-Whisker Plot of Combined MDPE Scores for All Folds')
plt.ylabel('MDPE')
plt.xlabel('Output Column')
plt.xticks(rotation=45)
plt.savefig(os.path.join(output_directory, "combined_MDPE_plot.png"))
plt.close()


Fold 1:
Feature Importance for Output Fx:
radius_Oy: 21.4423
radius_Y: -14.2142
radius_X: 13.6648
radius_Z: -6.9091
radius_Oz: -4.8629
radius_Ox: -4.8548
Wingspan: 4.5900
Weight: -2.6043
Intensity: -0.4231
Feature Importance for Output Fy:
radius_X: 19.3985
radius_Y: -12.8815
radius_Z: -9.2857
radius_Oy: 5.0630
Wingspan: 3.4930
radius_Oz: -2.5790
Weight: -1.9682
Intensity: 1.8994
radius_Ox: 0.9675
Feature Importance for Output Fz:
radius_X: 2.8758
Wingspan: -2.8638
Intensity: -1.3846
radius_Oy: -1.0007
radius_Ox: -0.4146
radius_Y: 0.3222
Weight: 0.0802
radius_Oz: 0.0406
radius_Z: -0.0188
Feature Importance for Output Tx:
radius_X: -0.9726
radius_Y: 0.5574
radius_Z: 0.4556
radius_Ox: -0.1384
Intensity: -0.1268
radius_Oz: 0.1192
Wingspan: -0.1074
Weight: 0.0938
radius_Oy: 0.0379
Feature Importance for Output Ty:
radius_Oy: 0.8201
radius_X: 0.6839
radius_Y: -0.6155
Wingspan: 0.3744
radius_Z: -0.3385
radius_Oz: -0.2204
radius_Ox: -0.1370
Weight: -0.1326
Intensity: 0.0452
Feature Importanc

In [12]:
import numpy as np

# Prepare a dictionary to store coefficients for each feature across all outputs
feature_coefficients = {feature: [] for feature in numeric_features}

# Iterate over each fold to accumulate coefficients
for fold_number in range(1, 11):
    # Extracting and accumulating feature importance
    coefficients_matrix = best_models[str(fold_number)].named_steps['regressor'].coef_
    
    # Accumulate coefficients for averaging later
    for output_index, coefficients in enumerate(coefficients_matrix):
        for feature, coeff in zip(numeric_features, coefficients):
            feature_coefficients[feature].append(coeff)

# Calculate average importance for each feature
average_feature_importance = {feature: np.mean(np.abs(coeffs)) for feature, coeffs in feature_coefficients.items()}

# Calculate the sum of these averages to normalize
total_importance = sum(average_feature_importance.values())

# Normalize the average importance values so they add up to 1
normalized_feature_importance = {feature: importance / total_importance for feature, importance in average_feature_importance.items()}

# Sort features by their normalized average importance
sorted_normalized_importance = sorted(normalized_feature_importance.items(), key=lambda x: x[1], reverse=True)

# Print normalized averaged feature importance rankings
print("Normalized Averaged Feature Importance Rankings:")
for feature, norm_coeff in sorted_normalized_importance:
    print(f"{feature}: {norm_coeff:.4f}")

Normalized Averaged Feature Importance Rankings:
radius_X: 0.2549
radius_Y: 0.1894
radius_Oy: 0.1772
radius_Z: 0.1147
radius_Oz: 0.0740
Wingspan: 0.0676
radius_Ox: 0.0619
Weight: 0.0351
Intensity: 0.0251
