## Imports


In [None]:
import pandas as pd
import numpy as np

## Loading Dataset


In [None]:
# i want to put a json object in a pandas dataframe
df_1 = pd.read_json(
    "/home/g03-s2025/Desktop/SquashCoachingCopilot/cv-module/digitization/event-recognition/stroke-detection/implementation/annotated_jsons/AO_Output.json"
)
df_2 = pd.read_json(
    "/home/g03-s2025/Desktop/SquashCoachingCopilot/cv-module/digitization/event-recognition/stroke-detection/implementation/annotated_jsons/FA Output.json"
)
df_3 = pd.read_json(
    "/home/g03-s2025/Desktop/SquashCoachingCopilot/cv-module/digitization/event-recognition/stroke-detection/implementation/annotated_jsons/video_4_annotated.json"
)
df_4 = pd.read_json(
    "/home/g03-s2025/Desktop/SquashCoachingCopilot/cv-module/digitization/event-recognition/stroke-detection/implementation/annotated_jsons/integrated_data.json"
)
df_5 = pd.read_json(
    "/home/g03-s2025/Desktop/SquashCoachingCopilot/cv-module/digitization/event-recognition/stroke-detection/implementation/annotated_jsons/nadines_video_annotated.json"
)
df_6 = pd.read_json(
    "/home/g03-s2025/Desktop/SquashCoachingCopilot/cv-module/digitization/event-recognition/stroke-detection/implementation/annotated_jsons/video_1_annotated.json"
)

In [None]:
# add an index column to each dataframe
df_1["index"] = 1
df_2["index"] = 2
df_3["index"] = 3
df_4["index"] = 4
df_5["index"] = 5
df_6["index"] = 6

In [None]:
dfs = [df_1, df_2, df_3, df_4, df_5, df_6]
for idx, df in enumerate(dfs, 1):
    print(f"df_{idx} event value counts:")
    print(df["event"].value_counts())
    print("-" * 40)

In [None]:
train_df = pd.concat([df_5, df_2, df_1, df_4, df_6], ignore_index=True)
test_df = df_3.copy()

In [None]:
train_df.shape

In [None]:
train_df["event"].value_counts()

In [None]:
test_df.shape

In [None]:
test_df["event"].value_counts()

In [None]:
def expand_df(df):
    expanded_df = (
        df["keypoints"]
        .apply(
            lambda person: {f"x_{part}": person[part]["x"] for part in person}
            | {f"y_{part}": person[part]["y"] for part in person}
        )
        .apply(pd.Series)
    )

    # Merge back into original df if needed
    df_expanded = pd.concat([df, expanded_df], axis=1)

    # Optionally drop original nested column
    df_expanded.drop(columns=["keypoints"], inplace=True)
    return df_expanded


train_df_expanded = expand_df(train_df)
test_df_expanded = expand_df(test_df)

In [None]:
train_df_expanded.shape

In [None]:
def normalize_keypoints_df(df):
    """
    Normalize all keypoints in the dataframe using body-relative normalization
    REPLACES original x_, y_ columns with normalized values

    Args:
        df: DataFrame with columns: x_left_shoulder, y_left_shoulder, etc.

    Returns:
        df_normalized: DataFrame with normalized keypoint columns (same column names)
    """
    df_norm = df.copy()

    # Keypoint names
    keypoint_names = [
        "left_shoulder",
        "right_shoulder",
        "left_elbow",
        "right_elbow",
        "left_wrist",
        "right_wrist",
        "left_hip",
        "right_hip",
        "left_knee",
        "right_knee",
        "left_ankle",
        "right_ankle",
    ]

    # Normalize frame by frame
    for idx, row in df.iterrows():
        # Calculate hip center
        hip_center_x = (row["x_left_hip"] + row["x_right_hip"]) / 2
        hip_center_y = (row["y_left_hip"] + row["y_right_hip"]) / 2

        # Calculate shoulder center
        shoulder_center_x = (row["x_left_shoulder"] + row["x_right_shoulder"]) / 2
        shoulder_center_y = (row["y_left_shoulder"] + row["y_right_shoulder"]) / 2

        # Calculate torso length
        torso_length = np.sqrt(
            (shoulder_center_x - hip_center_x) ** 2
            + (shoulder_center_y - hip_center_y) ** 2
        )

        if torso_length < 1e-6:
            torso_length = 1.0

        # REPLACE original columns with normalized values
        for name in keypoint_names:
            df_norm.at[idx, f"x_{name}"] = (
                row[f"x_{name}"] - hip_center_x
            ) / torso_length
            df_norm.at[idx, f"y_{name}"] = (
                row[f"y_{name}"] - hip_center_y
            ) / torso_length

    return df_norm


train_df_expanded = normalize_keypoints_df(train_df_expanded)
test_df_expanded = normalize_keypoints_df(test_df_expanded)

In [None]:
def add_sequence(df_expanded, window = 15):
    event_indices = df_expanded[df_expanded["event"].notnull()].index

    selected_indices = set()
    neither_count = 0
    MAX_NEITHER_SEGMENTS = 100
    
    # Step 1: Annotate labeled events and their 15-frame tails
    for idx in event_indices:
        label = df_expanded.at[idx, "event"]
        group_id = df_expanded.at[idx, "index"]
        selected_indices.add(idx)
        df_expanded.at[idx, "event"] = label

        for offset in range(0, window/2):
            prev_idx = idx - offset
            if prev_idx < 0:
                break

            if (
                pd.notnull(df_expanded.at[prev_idx, "event"])
                or df_expanded.at[prev_idx, "index"] != group_id
            ):
                break

            selected_indices.add(prev_idx)
            df_expanded.at[prev_idx, "event"] = label

        # Add 5 frames AFTER
        for offset in range(0, window/2):
            next_idx = idx + offset
            if next_idx >= len(df_expanded):
                break

            if (
                pd.notnull(df_expanded.at[next_idx, "event"])
                or df_expanded.at[next_idx, "index"] != group_id
            ):
                break

            selected_indices.add(next_idx)
            df_expanded.at[next_idx, "event"] = label

    # Step 2: Annotate null event segments in window-frame "neither" batches
    null_indices = df_expanded[df_expanded["event"].isnull()].index
    null_indices = sorted(null_indices)

    i = 0
    while i < len(null_indices):
        if neither_count >= MAX_NEITHER_SEGMENTS:
            break

        start_idx = null_indices[i]
        group_id = df_expanded.at[start_idx, "index"]

        # Collect consecutive nulls in same group
        segment = [start_idx]
        for j in range(i + 1, len(null_indices)):
            current_idx = null_indices[j]
            prev_idx = null_indices[j - 1]

            if (
                current_idx == prev_idx + 1
                and df_expanded.at[current_idx, "index"] == group_id
            ):
                segment.append(current_idx)
            else:
                break

        # Process in batches of window
        for k in range(0, len(segment), window):
            if neither_count >= MAX_NEITHER_SEGMENTS:
                break

            batch = segment[k : k + window]
            if len(batch) == window:
                for idx in batch:
                    selected_indices.add(idx)
                    df_expanded.at[idx, "event"] = "neither"
                neither_count += 1

        i += len(segment)

    # Step 3: Final filtering
    df_expanded = df_expanded.loc[sorted(selected_indices)].reset_index(drop=True)

    print(f"Number of 'neither' segments (window-frame batches): {neither_count}")

    return df_expanded


train_df_expanded = add_sequence(train_df_expanded)
test_df_expanded = add_sequence(test_df_expanded)

In [None]:
train_df_expanded["event"].value_counts()

In [None]:
test_df_expanded["event"].value_counts()

## Better Approach (LSTM - 30 Frames per event)


In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import to_categorical

In [None]:
lstm_df = pd.read_csv(
    "/home/g03-s2025/Desktop/SquashCoachingCopilot/cv-module/digitization/event-recognition/stroke-detection/implementation/annotated_jsons/combined_data.csv"
)

In [None]:
# # ----------------------------
# # Step 1: Normalize coordinates
# # ----------------------------
# coord_cols = [
#     col
#     for col in train_df_expanded.columns
#     if col.startswith("x") or col.startswith("y")
# ]
# scaler = MinMaxScaler()
# train_df_expanded[coord_cols] = scaler.fit_transform(train_df_expanded[coord_cols])

In [None]:
train_df_expanded.drop(columns=["player_id", "time", "index"], inplace=True)
test_df_expanded.drop(columns=["player_id", "time", "index"], inplace=True)

In [None]:
# Step 2: Label encoding
# ----------------------------
label_map = {"forehand": 0, "backhand": 1, "neither": 2}
train_df_expanded = train_df_expanded[
    train_df_expanded["event"].isin(label_map.keys())
].copy()
train_df_expanded["event"] = train_df_expanded["event"].map(label_map).astype(int)

In [None]:
# # ----------------------------
# # Step 1: Normalize coordinates
# # ----------------------------
# coord_cols = [
#     col
#     for col in test_df_expanded.columns
#     if col.startswith("x") or col.startswith("y")
# ]
# scaler = MinMaxScaler()
# test_df_expanded[coord_cols] = scaler.fit_transform(test_df_expanded[coord_cols])

In [None]:
coord_cols = [
    col
    for col in test_df_expanded.columns
    if col.startswith("x") or col.startswith("y")
]

In [None]:
# Step 2: Label encoding
# ----------------------------
label_map = {"forehand": 0, "backhand": 1, "neither": 2}
test_df_expanded = test_df_expanded[
    test_df_expanded["event"].isin(label_map.keys())
].copy()
test_df_expanded["event"] = test_df_expanded["event"].map(label_map).astype(int)

In [None]:
# Data preparation with non-overlapping windows
X_train = []
y_train = []
window_size = 15

# Process the data in non-overlapping chunks of size 16
for i in range(0, len(train_df_expanded), window_size):
    # Check if we have a full window
    if i + window_size <= len(train_df_expanded):
        window = train_df_expanded.iloc[i : i + window_size]

        # Only use the window if all rows have the same event
        if window["event"].nunique() == 1:
            X_train.append(
                window[coord_cols].values
            )  # Shape: (16, 24) - 16 timesteps, 24 features
            # Store the single event for this window
            y_train.append(window["event"].iloc[0])

X_train = np.array(X_train)  # shape should be (num_non_overlapping_windows, 16, 24)
y_train = np.array(y_train)  # shape should be (num_non_overlapping_windows,)

print(f"X_train shape: {X_train.shape}")
print(f"y_train shape: {y_train.shape}")
print(f"Label distribution: {np.unique(y_train, return_counts=True)}")

In [None]:
# Data preparation with non-overlapping windows
X_test = []
y_test = []
window_size = 15

# Process the data in non-overlapping chunks of size 16
for i in range(0, len(test_df_expanded), window_size):
    # Check if we have a full window
    if i + window_size <= len(test_df_expanded):
        window = test_df_expanded.iloc[i : i + window_size]

        # Only use the window if all rows have the same event
        if window["event"].nunique() == 1:
            X_test.append(
                window[coord_cols].values
            )  # Shape: (16, 24) - 16 timesteps, 24 features
            # Store the single event for this window
            y_test.append(window["event"].iloc[0])

X_test = np.array(X_test)  # shape should be (num_non_overlapping_windows, 16, 24)
y_test = np.array(y_test)  # shape should be (num_non_overlapping_windows,)

print(f"X_test shape: {X_test.shape}")
print(f"y_test shape: {y_test.shape}")
print(f"Label distribution: {np.unique(y_test, return_counts=True)}")

In [None]:
# from sklearn.model_selection import train_test_split

# X_train, X_val, y_train, y_val = train_test_split(
#     X, y, test_size=0.2, random_state=42, stratify=y
# )

In [None]:
model = Sequential()
# Input shape: (16 timesteps, 24 features)
model.add(LSTM(16, input_shape=(window_size, len(coord_cols)), return_sequences=False))
model.add(Dropout(0.1))
model.add(Dense(3, activation="softmax"))  # 3 classes (0, 1, 2)

model.compile(
    optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)

In [None]:
# ----------------------------
# Step 6: Training
# ----------------------------
# Class weights
from sklearn.utils.class_weight import compute_class_weight

# Don't use SMOTE, use class weights instead
class_weights = compute_class_weight("balanced", classes=np.unique(y_train), y=y_train)

class_weight_dict = dict(enumerate(class_weights))
print(f"Class weights: {class_weight_dict}")
# Example: {0: 0.5, 1: 5.0, 2: 5.0}  # penalizes misclassifying minority classes

# Early stopping
early_stop = EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)

# Training the model
history = model.fit(
    X_train,
    y_train,
    validation_data=(X_test, y_test),
    epochs=100,
    batch_size=8,  # You can adjust this batch size for training
    callbacks=[early_stop],
    verbose=1,
    class_weight=class_weight_dict,
)

In [None]:
# print classification report


# Evaluate the model
y_pred_probs = model.predict(X_test)
y_pred = np.argmax(model.predict(X_test), axis=1)
from sklearn.metrics import classification_report

print(classification_report(y_test, y_pred))

In [None]:
# Get training and validation accuracy histories
import matplotlib.pyplot as plt

training_acc = history.history["accuracy"]
val_acc = history.history["val_accuracy"]

EPOCHS = len(training_acc)

# Create count of the number of epochs
epoch_count = range(1, EPOCHS + 1)

# Visualize accuracy history
plt.figure()
plt.plot(epoch_count, training_acc, "r--")
plt.plot(epoch_count, val_acc, "b-")
plt.legend(["LSTM Training Accuracy", "LSTM Val Accuracy"])
plt.xlabel("Epoch")
plt.ylabel("LSTM Accuracy")
plt.show()

In [None]:
# train_df_expanded[coord_cols].values

In [None]:
# X = train_df_expanded.drop(columns=["event"])
# y = train_df_expanded["event"]

# test_x = test_df_expanded.drop(columns=["event"])
# test_y = test_df_expanded["event"]

In [None]:
# # import xgboost
# from xgboost import XGBClassifier

# xgb = XGBClassifier()
# xgb.fit(X, y)
# y_pred_xgb = xgb.predict(test_x)
# print(classification_report(test_y, y_pred_xgb))

In [None]:
# comparison_df = pd.DataFrame(
#     {
#         "y_true": test_y,
#         "y_pred": y_pred_xgb,
#     }
# )

In [None]:
# save lstm model
model.save("lstm_model.h5")