<a href="https://www.kaggle.com/code/umerellous/dl-assignment-03?scriptVersionId=254837724" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [9]:
from glob import glob
import pandas as pd
import numpy as np
import mediapipe as mp
import cv2
import os
import json
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.preprocessing.sequence import pad_sequences

In [5]:
base_path = "/kaggle/input"
def get_pairs(base_path):
    pairs = []
    # More flexible folder discovery
    for root, dirs, files in os.walk(base_path):
        if 'Videos' in dirs and 'Annotation_files' in dirs:
            videos_path = os.path.join(root, 'Videos')
            annotations_path = os.path.join(root, 'Annotation_files')
            video_formats = ['*.avi', '*.mp4', '*.mov', '*.mkv']
            video_files = []
            for fmt in video_formats:
                video_files.extend(glob(os.path.join(videos_path, fmt)))
            for video_file in video_files:
                video_name = os.path.splitext(os.path.basename(video_file))[0]
                annotation_patterns = [
                    os.path.join(annotations_path, f"{video_name}.txt"),
                    os.path.join(annotations_path, f"{video_name.replace('video', 'Video')}.txt"),
                    os.path.join(annotations_path, f"{video_name.lower()}.txt"),
                ]
                for annotation_file in annotation_patterns:
                    if os.path.exists(annotation_file):
                        pairs.append((video_file, annotation_file))
                        break
                else:
                    print(f"Warning: No matching annotation found")
    return pairs

In [6]:
# Get pairs
dataset_pairs = get_pairs(base_path)
df = pd.DataFrame(dataset_pairs, columns=['video_path', 'annotation_path'])
df['scenario'] = df['video_path'].apply(lambda x: x.split('/')[-3])
df.to_csv('video_annotation_mapping.csv', index=False)

In [7]:
print(df.head())

                                          video_path  \
0  /kaggle/input/falldataset-imvia/Home_01/Home_0...   
1  /kaggle/input/falldataset-imvia/Home_01/Home_0...   
2  /kaggle/input/falldataset-imvia/Home_01/Home_0...   
3  /kaggle/input/falldataset-imvia/Home_01/Home_0...   
4  /kaggle/input/falldataset-imvia/Home_01/Home_0...   

                                     annotation_path scenario  
0  /kaggle/input/falldataset-imvia/Home_01/Home_0...  Home_01  
1  /kaggle/input/falldataset-imvia/Home_01/Home_0...  Home_01  
2  /kaggle/input/falldataset-imvia/Home_01/Home_0...  Home_01  
3  /kaggle/input/falldataset-imvia/Home_01/Home_0...  Home_01  
4  /kaggle/input/falldataset-imvia/Home_01/Home_0...  Home_01  


In [11]:
print(list(df.columns))

['video_path', 'annotation_path', 'scenario']


In [14]:
# Mediapipe Pose model
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False)

# Paths
csv_path = '/kaggle/working/video_annotation_mapping.csv'
output_dir = '/kaggle/working/keypoints_output'
os.makedirs(output_dir, exist_ok=True)

# Read CSV
df = pd.read_csv(csv_path)

for idx, row in df.iterrows():
    video_path = row['video_path'] 
    annotation = row['annotation_path'] 

    cap = cv2.VideoCapture(video_path)
    
    frame_keypoints = []
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        result = pose.process(frame_rgb)
        
        if result.pose_landmarks:
            keypoints = [(lm.x, lm.y, lm.z) for lm in result.pose_landmarks.landmark]
            frame_keypoints.append(keypoints)
        else:
            frame_keypoints.append(None) 
        
    cap.release()
    
    # Save keypoints 
    save_data = {
        'annotation': annotation,
        'keypoints': frame_keypoints
    }
    
    video_name = os.path.splitext(os.path.basename(video_path))[0]
    output_path = os.path.join(output_dir, f"{video_name}_keypoints.json")
    
    with open(output_path, 'w') as f:
        json.dump(save_data, f)

W0000 00:00:1745697974.974054     146 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1745697975.027970     146 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1745697975.050006     147 landmark_projection_calculator.cc:186] Using NORM_RECT without IMAGE_DIMENSIONS is only supported for the square ROI. Provide IMAGE_DIMENSIONS or use PROJECTION_MATRIX.
[mp3float @ 0x22b86380] Header missing
[mp3float @ 0x22b86380] Header missing
[mp3float @ 0x22b86380] Header missing
[mp3float @ 0x2240afc0] Header missing
[mp3float @ 0x22545e80] Header missing
[mp3float @ 0x22416e40] Header missing
[mp3float @ 0x22416e40] Header missing
[mp3float @ 0x22416e40] Header missing
[mp3float @ 0x22416e40] Header missing
[mp3float @ 0x22416e40] Header missing
[mp3float @ 0x223f1800] Header missing


In [None]:
# Function to load JSON keypoints from a file
def load_keypoints_from_json(json_file):
    with open(json_file, 'r') as f:
        data = json.load(f)
    # Filter out frames with null keypoints
    keypoints = data["keypoints"]
    valid_keypoints = []
    for frame in keypoints:
        # Filter out frames with any null keypoints
        if frame is not None and all(kp is not None for kp in frame):
            valid_keypoints.append(frame)
    return valid_keypoints

In [None]:
# Helper functions to extract features
def calculate_angle(a, b, c):
    a = np.array(a[:3])  
    b = np.array(b[:3])
    c = np.array(c[:3])
    
    ba = a - b
    bc = c - b
    
    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6)
    angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0))
    return np.degrees(angle)

In [None]:
def compute_center_of_gravity(landmarks):
    landmarks_array = np.array([lmk[:3] for lmk in landmarks])  # Only (x, y, z)
    cog = np.mean(landmarks_array, axis=0)
    return cog

def compute_velocity(p1, p2, fps):
    p1 = np.array(p1[:3])
    p2 = np.array(p2[:3])
    return (p2 - p1) * fps

def compute_acceleration(v1, v2, fps):
    v1 = np.array(v1)
    v2 = np.array(v2)
    return (v2 - v1) * fps

In [60]:
def extract_features_from_keypoints(video_keypoints, fps=25):
    features = []
    prev_frame = None
    prev_velocity = None

    for frame in video_keypoints:
        frame_features = {}

        if len(frame) != 33: 
            continue
        
        try:
            LEFT_SHOULDER = 11
            LEFT_ELBOW = 13
            LEFT_WRIST = 15
            RIGHT_SHOULDER = 12
            RIGHT_ELBOW = 14
            RIGHT_WRIST = 16
            LEFT_HIP = 23
            LEFT_KNEE = 25
            LEFT_ANKLE = 27
            RIGHT_HIP = 24
            RIGHT_KNEE = 26
            RIGHT_ANKLE = 28

            frame_features['left_elbow_angle'] = calculate_angle(frame[LEFT_SHOULDER], frame[LEFT_ELBOW], frame[LEFT_WRIST])
            frame_features['right_elbow_angle'] = calculate_angle(frame[RIGHT_SHOULDER], frame[RIGHT_ELBOW], frame[RIGHT_WRIST])
            frame_features['left_knee_angle'] = calculate_angle(frame[LEFT_HIP], frame[LEFT_KNEE], frame[LEFT_ANKLE])
            frame_features['right_knee_angle'] = calculate_angle(frame[RIGHT_HIP], frame[RIGHT_KNEE], frame[RIGHT_ANKLE])
            frame_features['left_shoulder_angle'] = calculate_angle(frame[LEFT_ELBOW], frame[LEFT_SHOULDER], frame[LEFT_HIP])
            frame_features['right_shoulder_angle'] = calculate_angle(frame[RIGHT_ELBOW], frame[RIGHT_SHOULDER], frame[RIGHT_HIP])
        except Exception as e:
            frame_features['left_elbow_angle'] = 0
            frame_features['right_elbow_angle'] = 0
            frame_features['left_knee_angle'] = 0
            frame_features['right_knee_angle'] = 0
            frame_features['left_shoulder_angle'] = 0
            frame_features['right_shoulder_angle'] = 0

        # Center of Gravity
        cog = compute_center_of_gravity(frame)
        frame_features['cog_x'] = cog[0]
        frame_features['cog_y'] = cog[1]
        frame_features['cog_z'] = cog[2]

        # Mean Confidence Score
        confidence_scores = [kp[3] for kp in frame if len(kp) == 4]  # Get confidence scores
        frame_features['mean_confidence'] = np.mean(confidence_scores) if confidence_scores else 0.0

        # Velocity
        if prev_frame is not None:
            velocities = []
            for i in range(len(frame)):
                velocities.append(compute_velocity(prev_frame[i], frame[i], fps))
            velocities = np.array(velocities)
            frame_features['mean_velocity'] = np.mean(np.linalg.norm(velocities, axis=1))
        else:
            frame_features['mean_velocity'] = 0.0

        # Acceleration
        if prev_velocity is not None:
            accelerations = []
            for i in range(len(velocities)):
                accelerations.append(compute_acceleration(prev_velocity[i], velocities[i], fps))
            accelerations = np.array(accelerations)
            frame_features['mean_acceleration'] = np.mean(np.linalg.norm(accelerations, axis=1))
        else:
            frame_features['mean_acceleration'] = 0.0

        # Save
        prev_frame = frame
        if 'velocities' in locals():
            prev_velocity = velocities

        features.append(frame_features)

    return features

In [None]:
# Process all videos in the directory
def process_videos_in_directory(directory, fps=25):
    all_video_features = {}

    for video_file in os.listdir(directory):
        if video_file.endswith(".json"):
            video_path = os.path.join(directory, video_file)
            video_keypoints = load_keypoints_from_json(video_path)

            features = extract_features_from_keypoints(video_keypoints, fps=fps)
            all_video_features[video_file] = features

    return all_video_features

In [61]:
def save_features_to_csv(features_dict, output_directory):
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)

    for video_name, features in features_dict.items():
        df = pd.DataFrame(features)
        df['time_step'] = df.index
        csv_file_path = os.path.join(output_directory, f"{video_name}_keypoints.csv")
        df.to_csv(csv_file_path, index=False)
        print(f"Saved features for {video_name} to {csv_file_path}")

# Example Usage
directory = "keypoints_output"
all_video_features = process_videos_in_directory(directory, fps=25)
save_features_to_csv(all_video_features, "features_output")

Saved features for video (42)_keypoints.json to features_output/video (42)_keypoints.json_keypoints.csv
Saved features for video (3)_keypoints.json to features_output/video (3)_keypoints.json_keypoints.csv
Saved features for video (31)_keypoints.json to features_output/video (31)_keypoints.json_keypoints.csv
Saved features for video (46)_keypoints.json to features_output/video (46)_keypoints.json_keypoints.csv
Saved features for video (18)_keypoints.json to features_output/video (18)_keypoints.json_keypoints.csv
Saved features for video (11)_keypoints.json to features_output/video (11)_keypoints.json_keypoints.csv
Saved features for video (27)_keypoints.json to features_output/video (27)_keypoints.json_keypoints.csv
Saved features for video (53)_keypoints.json to features_output/video (53)_keypoints.json_keypoints.csv
Saved features for video (26)_keypoints.json to features_output/video (26)_keypoints.json_keypoints.csv
Saved features for video (45)_keypoints.json to features_output/vi

In [62]:
# Helper function to create temporal sequences
def create_temporal_sequences(df, sequence_length=30):
    temporal_sequences = []
    for i in range(len(df) - sequence_length + 1):
        sequence = df.iloc[i:i+sequence_length].reset_index(drop=True)
        temporal_sequences.append(sequence)
    return temporal_sequences

# Process the CSV files to create temporal sequences
def process_temporal_sequences(directory, output_directory, sequence_length=30):
    os.makedirs(output_directory, exist_ok=True)
    
    all_temporal_sequences = {}

    for video_file in os.listdir(directory):
        if video_file.endswith(".csv"):
            video_path = os.path.join(directory, video_file)
            df = pd.read_csv(video_path)

            if 'time_step' not in df.columns:
                print(f"Warning: 'time_step' column not found in {video_file}. Skipping.")
                continue
            
            temporal_sequences = create_temporal_sequences(df, sequence_length)
            all_temporal_sequences[video_file] = temporal_sequences
            
            # Save temporal sequences to new folder
            output_video_path = os.path.join(output_directory, video_file.replace('.csv', '_temporal_sequences.csv'))
            temporal_df = pd.concat(temporal_sequences, ignore_index=True)
            temporal_df.to_csv(output_video_path, index=False)
            print(f"Temporal sequences saved for {video_file} to {output_video_path}")

    return all_temporal_sequences

In [63]:
# Example Usage
directory = "features_output" 
output_directory = "sequences_output" 
all_temporal_sequences = process_temporal_sequences(directory, output_directory, sequence_length=30)

Temporal sequences saved for video (27)_keypoints.json_keypoints.csv to sequences_output/video (27)_keypoints.json_keypoints_temporal_sequences.csv
Temporal sequences saved for video (24)_keypoints.json_keypoints.csv to sequences_output/video (24)_keypoints.json_keypoints_temporal_sequences.csv
Temporal sequences saved for video (20)_keypoints.json_keypoints.csv to sequences_output/video (20)_keypoints.json_keypoints_temporal_sequences.csv
Temporal sequences saved for video (35)_keypoints.json_keypoints.csv to sequences_output/video (35)_keypoints.json_keypoints_temporal_sequences.csv
Temporal sequences saved for video (47)_keypoints.json_keypoints.csv to sequences_output/video (47)_keypoints.json_keypoints_temporal_sequences.csv
Temporal sequences saved for video (18)_keypoints.json_keypoints.csv to sequences_output/video (18)_keypoints.json_keypoints_temporal_sequences.csv
Temporal sequences saved for video (52)_keypoints.json_keypoints.csv to sequences_output/video (52)_keypoints.js

In [65]:
def load_temporal_sequences(directory):
    sequences = []
    labels = [] 
    
    for video_file in os.listdir(directory):
        if video_file.endswith(".csv"):
            video_path = os.path.join(directory, video_file)
            df = pd.read_csv(video_path)
            label = 1 if "fall" in video_file else 0
            
            sequences.append(df.values)
            labels.append(label) 
            
    return sequences, labels

In [71]:
directory = "sequences_output"
temporal_sequences, labels = load_temporal_sequences(directory)

# Pad the sequences to a fixed length
max_sequence_length = 100
X_padded = pad_sequences(temporal_sequences, maxlen=max_sequence_length, dtype='float32', padding='post', truncating='post')

# Convert to numpy arrays for model input
X = np.array(X_padded)
y = np.array(labels)

In [73]:
# Standardize the data
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X.reshape(X.shape[0], -1))

In [75]:
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)

In [77]:
clf = RandomForestClassifier(n_estimators=100, random_state=42)
clf.fit(X_train, y_train)

In [None]:
y_pred = clf.predict(X_test)
print(classification_report(y_test, y_pred))

              precision    recall  f1-score   support

           0       1.00      1.00      1.00        12

    accuracy                           1.00        12
   macro avg       1.00      1.00      1.00        12
weighted avg       1.00      1.00      1.00        12



In [None]:
# Function to draw keypoints on a given frame
def draw_keypoints(frame, keypoints):
    for point in keypoints:
        if point is not None:
            cv2.circle(frame, (int(point[0]), int(point[1])), 5, (0, 0, 255), -1)
    return frame

In [None]:
video_path = "/kaggle/input/falldataset-imvia/Lecture_room/Lecture room/video (1).avi"
cap = cv2.VideoCapture(video_path)

# Save the output with keypoints overlayed
output_path = "output_video.avi"
fourcc = cv2.VideoWriter_fourcc(*'XVID')
fps = int(cap.get(cv2.CAP_PROP_FPS))
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
print(f"Video saved to {output_path}")