# Imports & Config

In [1]:
# Standard library imports
import os
import tempfile

# Third-party imports for data manipulation
import pandas as pd
import numpy as np

# Third-party imports for machine learning
import joblib
from sklearn.impute import SimpleImputer

# Third-party imports for data visualization
import matplotlib.pyplot as plt
import plotly.graph_objects as go

# Third-party imports for video processing and computer vision
import cv2
import mediapipe as mp

# IPython imports for interactive widgets and display utilities
import ipywidgets as widgets
from IPython.display import Video, HTML, display, clear_output

# Local imports for video processing pipeline
from src.pipeline.tracking.video_processing_pipeline import process_video, body_parts

# configuration
tracking_data_dir = './src/pipeline/tracking_data'
output_video_dir = './src/pipeline/output_videos'
predictions_path = "./src/pipeline/predictions"
models_dir = './data/models/sequence-4f'
tracking_data_path = '' # will be set by video tracking cell, otherwise define csv file here

# Ensure all directories exist
dirs_to_check = [tracking_data_dir, output_video_dir]
for dir in dirs_to_check:
    if not os.path.exists(dir):
        os.makedirs(dir)
        print(f"Directory created: {dir}")



# Video Tracking

In [2]:
# Define the file upload widget
file_selector = widgets.FileUpload(
    accept='.mp4, .mov',  # Specify file types
    multiple=False,  # Allow multiple files to be selected
    description='Select a video file',
    layout={'width': '400px'}
)

# Define a button widget
button = widgets.Button(
    description='Process Uploaded File',
    button_style='',  # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Click to process the uploaded file',
    icon='play',  # Button icon
    layout={'width': '400px'}
)

def save_uploaded_file(uploaded_file):
    file_name = uploaded_file['name']
    file_content = uploaded_file['content']
    with tempfile.NamedTemporaryFile(delete=False, suffix=file_name) as tmp_file:
        tmp_file.write(file_content)
        return tmp_file.name, os.path.dirname(tmp_file.name)

def save_landmarks_on_video(video_path, output_video_path):
    print("Creating video with landmarks... (this might take a while)")

    # Initialize MediaPipe Pose
    mp_pose = mp.solutions.pose
    mp_drawing = mp.solutions.drawing_utils

    # Open the video file
    cap = cv2.VideoCapture(video_path)
    
    # Check if video opened successfully
    if not cap.isOpened():
        print("Error: Could not open video.")
        return

    # Get the width, height, and frame rate of the video frame
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    # Read the tracking data from CSV
    tracking_data = pd.read_csv(tracking_data_path)

    # Define the codec and create VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'avc1')  # or 'XVID'
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))

    # Process video and draw landmarks from tracking data
    for index, row in tracking_data.iterrows():
        # Read frame from video
        ret, frame = cap.read()
        if not ret:
            break

        # Draw landmarks on the frame
        for landmark in mp.solutions.pose.PoseLandmark:
            landmark_name = landmark.name.lower().replace('_', ' ')  # Convert to lowercase and replace underscores with spaces
            landmark_name = landmark_name.capitalize()  # Capitalize only the first letter of the first word
            landmark_x = f'{landmark_name}_x'
            landmark_y = f'{landmark_name}_y'
            if landmark_x in tracking_data.columns and landmark_y in tracking_data.columns and not pd.isna(row[landmark_x]) and not pd.isna(row[landmark_y]):
                x = int(row[landmark_x] * width)
                y = int(row[landmark_y] * height)
                # Check if the current landmark is one of the hips
                if landmark in [mp.solutions.pose.PoseLandmark.LEFT_HIP, mp.solutions.pose.PoseLandmark.RIGHT_HIP]:
                    # Highlight hips in red
                    cv2.circle(frame, (x, y), 15, (0, 0, 255), -1)
                else:
                    # Draw other landmarks in green
                    cv2.circle(frame, (x, y), 5, (0, 255, 0), -1)


        # Write the frame with landmarks to the output video file
        out.write(frame)

    # Release everything when job is finished
    cap.release()
    out.release()
    cv2.destroyAllWindows()
    print(f"Video saved to {output_video_path}")

def process_and_display_tracking_data(tmp_file_path, tmp_file_dir):
    print("Processing video... (this might take a while)")
    process_video(os.path.basename(tmp_file_path), tmp_file_dir, tracking_data_dir)
    tracking_data_file = f'Tracking_{os.path.basename(tmp_file_path[:-4])}.csv'
    tracking_data_df = pd.read_csv(os.path.join(tracking_data_dir, tracking_data_file))
    display(tracking_data_df.head())
    # After tracking data is processed, save video with landmarks
    output_video_path = os.path.join(output_video_dir, f"{os.path.basename(tmp_file_path[:-4])}_landmarks.mp4")
    global tracking_data_path
    tracking_data_path = os.path.join(tracking_data_dir, tracking_data_file)
    save_landmarks_on_video(tmp_file_path, output_video_path)
    video_html = f'''
    Video with landmarks saved to 
    <a href='{output_video_path}' target='_blank'>{output_video_path}</a> <br><br>
    <video height="500" controls allowfullscreen>
      <source src="{output_video_path}" type="video/mp4">
    Your browser does not support the video tag.
    </video>
    '''
    display(HTML(video_html))

# Define an event handler for the button click event
def on_button_clicked(b):
    uploaded_files = file_selector.value
    if uploaded_files:
        uploaded_file = next(iter(uploaded_files))
        tmp_file_path, tmp_file_dir = save_uploaded_file(uploaded_file)
        process_and_display_tracking_data(tmp_file_path, tmp_file_dir)
    else:
        print("No file uploaded.")

# Attach the event handler to the button
button.on_click(on_button_clicked)

# Display the widgets
display(file_selector, button)


FileUpload(value=(), accept='.mp4, .mov', description='Select a video file', layout=Layout(width='400px'))

Button(description='Process Uploaded File', icon='play', layout=Layout(width='400px'), style=ButtonStyle(), to…

INFO:root:Processing video: tmp1zomwqajIMG_9340.MOV


Processing video... (this might take a while)


INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
INFO:root:Tracking data for tmp1zomwqajIMG_9340.MOV stored in ./src/pipeline/tracking_data/Tracking_tmp1zomwqajIMG_9340.csv


Unnamed: 0,Label,Timestamp,Nose_x,Nose_y,Nose_z,Left eye inner_x,Left eye inner_y,Left eye inner_z,Left eye_x,Left eye_y,...,Left heel_z,Right heel_x,Right heel_y,Right heel_z,Left foot index_x,Left foot index_y,Left foot index_z,Right foot index_x,Right foot index_y,Right foot index_z
0,,0,0.447181,0.186625,-0.454502,0.466466,0.167357,-0.481824,0.478228,0.166733,...,0.208295,0.527764,0.884256,0.747109,0.546632,0.973369,-0.081372,0.389055,0.922492,0.568898
1,,1,0.44744,0.189193,-0.463861,0.467123,0.170478,-0.498253,0.478337,0.169845,...,0.25073,0.527312,0.884385,0.692368,0.548656,0.978809,-0.046427,0.390017,0.923248,0.52004
2,,2,0.447379,0.191168,-0.478758,0.467114,0.172948,-0.515375,0.478157,0.17224,...,0.243222,0.527092,0.884469,0.687063,0.551741,0.980843,-0.052328,0.390481,0.923249,0.517358
3,,3,0.447204,0.191382,-0.487665,0.467088,0.173184,-0.523442,0.478086,0.172532,...,0.245198,0.527091,0.885141,0.690187,0.553133,0.981776,-0.04631,0.390478,0.923281,0.52312
4,,4,0.446104,0.191954,-0.475087,0.465854,0.173861,-0.520072,0.476621,0.173205,...,0.256576,0.527125,0.885262,0.690394,0.55415,0.982438,-0.032706,0.390477,0.923279,0.525715


Creating video with landmarks... (this might take a while)
Video saved to ./src/pipeline/output_videos/tmp1zomwqajIMG_9340_landmarks.mp4


# Model Usage - Squat Phase Analysis

In [None]:
print("Analysing Squat Phase of Tracked Data: " + tracking_data_path)

def plot_movement_data(csv_file_path, title="Average Hip Height over Time"):
    # Read the data from the CSV file
    data = pd.read_csv(csv_file_path, delimiter=",")

    # Calculate the average hip height
    data["Average_Hip_Height"] = (data["Left hip_y"] + data["Right hip_y"]) / 2

    # Create a scatter plot
    fig = go.Figure()

    # Check if 'Label' column exists and plot accordingly
    if 'Label' in data.columns and data['Label'].notna().any():
        # Define custom color mapping for movement types
        custom_color_mapping = {
            "Pause": "orange",
            "Ascending": "green",
            "Descending": "red",
            "Transition": "yellow",
            "Unknown": "grey",
        }

        for label, color in custom_color_mapping.items():
            mask = data["Label"] == label
            fig.add_trace(
                go.Scatter(
                    x=data.loc[mask, "Timestamp"],
                    y=data.loc[mask, "Average_Hip_Height"],
                    mode="markers",
                    name=label,
                    marker_color=color,
                    hovertemplate="x: %{x}",
                )
            )
    else:
        # Plot the average hip height over time in blue if no 'Label' column
        fig.add_trace(
            go.Scatter(
                x=data["Timestamp"],
                y=data["Average_Hip_Height"],
                mode="markers",
                name="Average Hip Height",
                marker_color="blue",
                hovertemplate="x: %{x}",
            )
        )

    # Get the x values of the first and last data points
    first_x = data["Timestamp"].iloc[0]
    last_x = data["Timestamp"].iloc[-1]

    # Add vertical lines at the first and last data points
    fig.add_shape(
        type="line",
        x0=first_x,
        y0=0,
        x1=first_x,
        y1=1,
        yref="paper",
        line=dict(color="Green", width=2),
    )
    fig.add_shape(
        type="line",
        x0=last_x,
        y0=0,
        x1=last_x,
        y1=1,
        yref="paper",
        line=dict(color="Red", width=2),
    )

    # Add annotations for the first and last data points
    fig.add_annotation(
        x=first_x, y=0.05, text=f"Start: {first_x}", showarrow=False, yref="paper"
    )
    fig.add_annotation(
        x=last_x, y=0.05, text=f"End: {last_x}", showarrow=False, yref="paper"
    )

    # Increase the number of grid lines
    fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor="LightGrey")
    fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor="LightGrey", autorange="reversed")

    # Remove the light blue background
    fig.update_layout(
        autosize=True,  
        # width=1000,  # Width of the figure in pixels
        height=600,  # Height of the figure in pixels
        hovermode="x", # Hovermode for the cursor
        plot_bgcolor="rgba(0,0,0,0)",
        paper_bgcolor="rgba(0,0,0,0)",
        title=f"{title}"  # Add a title to the plot
    )

    fig.show()

# Model Application Logic
def create_sequences(features, window_size):
    feature_sequences = []
    for i in range(len(features) - window_size + 1):
        feature_sequences.append(features.iloc[i:(i + window_size)].values)
    return np.array(feature_sequences)

# Define a blacklist of models to exclude
blacklist = ['imputer.pkl']  # Add model filenames to exclude

# Get all the .pkl files from the models directory
all_model_files = [f for f in os.listdir(models_dir) if f.endswith('.pkl') and f not in blacklist]

# Dropdown for Model Selection
model_dropdown = widgets.Dropdown(
    options=[(file, os.path.join(models_dir, file)) for file in all_model_files],
    description='Model:',
    disabled=False,
)

# Button for Starting Analysis
start_analysis_button = widgets.Button(
    description='Start Analysis',
    button_style='info',
    tooltip='Click to start the analysis with the selected model',
    icon='play'
)

# Display the widgets
display(model_dropdown, start_analysis_button)

# Event Handler for Analysis Button
def on_analysis_button_clicked(b):
    # Clear the previous outputs
    clear_output(wait=True)
    display(model_dropdown, start_analysis_button)
    
    selected_model_path = model_dropdown.value
    print("Starting analysis with model:", selected_model_path)
    
    # Load the tracking data
    new_data = pd.read_csv(tracking_data_path)

    # Prepare the data as per the model's training conditions
    new_data['Left hip_y_diff'] = new_data['Left hip_y'].diff().fillna(0)
    new_data['Right hip_y_diff'] = new_data['Right hip_y'].diff().fillna(0)

    # If the new data includes the 'Label' column, drop it
    if "Label" in new_data.columns:
        new_data = new_data.drop("Label", axis=1)

    required_features = ['Left hip_y', 'Right hip_y']
    new_data_for_sequences = new_data[required_features]

    # Create sequences from the new data
    window_size = 4  # Ensure this matches the window size used during training
    new_data_sequences = create_sequences(new_data_for_sequences, window_size)

    # Flatten the sequences for compatibility with traditional models
    new_data_flat = new_data_sequences.reshape(new_data_sequences.shape[0], -1)

    # Load the fitted imputer if needed and apply it to the flattened new data
    imputer_path = os.path.join(models_dir, 'imputer.pkl')
    if os.path.exists(imputer_path):
        imputer = joblib.load(imputer_path)
        new_data_imputed = imputer.transform(new_data_flat)
    else:
        new_data_imputed = new_data_flat

    # Load the selected model and make predictions
    loaded_model = joblib.load(selected_model_path)
    predictions = loaded_model.predict(new_data_imputed)

    # Handle predictions (e.g., assigning predictions to the last element of each sequence)
    for i, prediction in enumerate(predictions):
        if i + window_size - 1 < len(new_data):
            new_data.at[i + window_size - 1, 'Label'] = prediction

    # Save the DataFrame with the new labels back to a CSV file in the new data subfolder
    predicted_file_name = f"{os.path.basename(tracking_data_path[:-4])}_{os.path.splitext(os.path.basename(selected_model_path))[0]}_predicted.csv"
    predicted_file_dir = os.path.join(predictions_path, os.path.basename(tracking_data_path[:-4]))
    predicted_file_path = os.path.join(predicted_file_dir, predicted_file_name)

    # Create the new data directory if it doesn't exist
    if not os.path.exists(predicted_file_dir):
        os.makedirs(predicted_file_dir)
    
    new_data.to_csv(predicted_file_path, index=False)

    print(f"Predictions using {os.path.basename(selected_model_path)} saved to {predicted_file_path}")

    # Plot the new data with predictions
    plot_movement_data(predicted_file_path, title=f"Prediction - {os.path.basename(selected_model_path)}")

# Attach the event handler to the button
start_analysis_button.on_click(on_analysis_button_clicked)



Dropdown(description='Model:', index=6, options=(('GradientBoostingClassifier.pkl', './data/models/sequence-4f…

Button(button_style='info', description='Start Analysis', icon='play', style=ButtonStyle(), tooltip='Click to …

Starting analysis with model: ./data/models/sequence-4f/SVC.pkl
Predictions using SVC.pkl saved to ./src/pipeline/predictions/Tracking_tmp1zomwqajIMG_9340/Tracking_tmp1zomwqajIMG_9340_SVC_predicted.csv


# Model Training (sequencial, time distributed)