In [78]:
import os
from pathlib import Path
from typing import List, Tuple

import pandas as pd
from sklearn.tree import DecisionTreeClassifier  # Import Decision Tree Classifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import train_test_split  # Import train_test_split function
from sklearn import metrics  # Import scikit-learn metrics module for accuracy calculation
from sklearn.metrics import classification_report
from sklearn.preprocessing import LabelEncoder
from xgboost import XGBClassifier


classifiers = {
    "xg": XGBClassifier(),
    "tree": DecisionTreeClassifier(max_depth=3, min_samples_split=2, min_samples_leaf=1, max_features=None),
    "random_forest": RandomForestClassifier(n_estimators=100),
    "gb": GradientBoostingClassifier()
}


def load_data(file_path: Path | str) -> pd.DataFrame:
    """
    Load the data from the specified file path.
    """
    data = pd.read_csv(file_path)

    return data

def merge_selected_columns_from_dfs(df1: pd.DataFrame, df2: pd.DataFrame, columns_to_merge: List[str]) -> pd.DataFrame:
    # Ensure the columns to merge also include the key columns with their names as they appear in df2
    key_columns_df2 = ['cyclist_id', 'date']
    all_columns_to_merge = list(set(columns_to_merge + key_columns_df2))

    # Select only the necessary columns from df2
    df2_selected = df2[all_columns_to_merge]

    # Merge the DataFrames on the key columns using an inner join
    merged_df = pd.merge(df1, df2_selected, on=['cyclist_id', 'date'], how='inner')

    return merged_df

def prepare_data(df: pd.DataFrame, window_size_past: int, window_size_future: int):
    prepared_data = {}
    feature_cols = df.columns
    remove_keys = ["day", "year", "month", "workout_week", "workout_title", "workout_type", "_id"]
    feature_cols = list(filter(lambda col: not any([key in col for key in remove_keys]), feature_cols))
    # Iterate over the unique cyclist_ids to handle each cyclist's data separately
    for cyclist_id in df['cyclist_id'].unique():
        cyclist_data = df[df['cyclist_id'] == cyclist_id]
        prepared_data[cyclist_id] = pd.DataFrame()

        # Create rolling windows of features
        for i in range(1, window_size_past + 1):
            for feature in feature_cols:
                prepared_data[cyclist_id][f'{feature}{i}'] = cyclist_data[feature].shift(-i)

        # Create the label column using future data

        for i in range(1, window_size_future + 1):
            prepared_data[cyclist_id][f'label_disrupt{i}'] = cyclist_data['disrupt'].shift(-i - window_size_past)
            prepared_data[cyclist_id][f'label_score{i}'] = cyclist_data['score'].shift(-i - window_size_past)

        prepared_data[cyclist_id]['cyclist_id'] = cyclist_id
        # rearrange the columns to have the cyclist_id as the first column
        prepared_data[cyclist_id] = prepared_data[cyclist_id][['cyclist_id'] + [col for col in prepared_data[cyclist_id].columns if col != 'cyclist_id']]

        # drop the rows with NaN values which are the result of shifting
        prepared_data[cyclist_id] = prepared_data[cyclist_id].dropna()

        
        # prepared_data[cyclist_id]['label'] = cyclist_data['disrupt'].shift(-window_size_past - 1).rolling(window=window_size_future).max()
    
    return pd.concat(prepared_data.values()).reset_index(drop=True)

    # Drop the rows with NaN values which are the result of shifting
    # prepared_data = prepared_data.dropna()

    return prepared_data


def evaluate_and_visualize_model(classifier: DecisionTreeClassifier, X_test: pd.DataFrame, y_test: pd.DataFrame):
    y_pred = classifier.predict(X_test)

    print("Accuracy:", metrics.accuracy_score(y_test, y_pred))
    report = classification_report(y_test, y_pred, output_dict=True)
    print(classification_report(y_test, y_pred))
    return report


def filter_relevant_cols_for_model(df: pd.DataFrame, label_idx=-2):
    # Note: -2 is the "disrupt" column and -1 is the "score" column.
    feature_cols = df.columns[:-2]  # removing the label column
    # remove_keys = ["date", "day", "year", "month", "workout_week", "workout_title", "workout_type", "tss_cal", "_id"]
    remove_keys = ["date", "day", "year", "month", "workout_week", "workout_title", "workout_type", "_id"]
    feature_cols = list(filter(lambda col: not any([key in col for key in remove_keys]), feature_cols))
    return feature_cols, [df.columns[label_idx]]


def label_encoding(df: pd.DataFrame, column_names: List[str]):
    le = LabelEncoder()
    for column in column_names:
        df[column] = le.fit_transform(df[column])
    return df


def create_model(df: pd.DataFrame, clf):
    feature_cols, label_cols = filter_relevant_cols_for_model(df)
    tss_method_cols = [col for col in df.columns if "tss_calculation_method" in col]
    df = label_encoding(df, tss_method_cols)
    df = df.apply(pd.to_numeric, errors='coerce')
    X = df[feature_cols]
    y = df[label_cols]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4,
                                                        random_state=1)  # 90% training and 10% test

    clf = clf.fit(X_train, y_train)
    report = evaluate_and_visualize_model(clf, X_test, y_test)
    return report


def process_report(report: dict):
    pass



In [79]:
def get_all_windows_config_from_dir(window_dir: str, prediction_type: str) -> List[Tuple[int, int]]:
    """
    Get all the window configurations from the directory.
    A file's name might be in the format of "IllnessesTimeSeries_x3_y1.csv" or "InjuriesTimeSeries_x3_y7.csv".

    Args:
        window_dir: The directory containing the window configurations.
        prediction_type: The type of prediction, either Illnesses or Injuries.

    Returns:
        A list of tuples containing the window configurations.
    """
    window_configs = []
    for file in os.listdir(window_dir):
        if file.startswith(f"{prediction_type}TimeSeries"):
            split_file = file[:-4].split("_")
            x = int(split_file[1][1:])
            y = int(split_file[2][1:].split(".")[0])
            window_configs.append((int(x), int(y)))
    return sorted(window_configs)

    

In [80]:
def check_all_configs(predict_type: str):
        """
        Check all the configurations for the given prediction type.
        """
        window_configs = get_all_windows_config_from_dir("../Data", predict_type)
        reports = {}
        for x, y in window_configs:
                time_series_df = load_data(f"../Data/{predict_type}TimeSeries_x{x}_y{y}.csv")
                for clf in classifiers:
                        print(f"Config: x={x}, y={y}, clf={clf}")
                        report = create_model(time_series_df, clf=classifiers[clf])
                        reports[(x, y, clf)] = report
                        return reports
        return reports


In [81]:
df1 = load_data(rf"../Data/Cleaned_Agg_Workouts_2023.csv")
df2 = load_data(rf"../Data/Cleaned_riderInjuries.csv")
df_merged = merge_selected_columns_from_dfs(df1, df2, ["disrupt", "score"])


In [82]:
prepared_data = prepare_data(df_merged, 4, 2)