In [None]:
!pip install mediapipe opencv-python
import mediapipe as mp
import cv2
import numpy as np
import uuid
import os
from matplotlib import pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from mediapipe import solutions as mp_solutions
import re
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
mp_drawing = mp.solutions.drawing_utils
mp_holistic = mp.solutions.holistic

In [None]:
# Initialize MediaPipe Hands model
mp_hands = mp_solutions.hands
mp_drawing = mp_solutions.drawing_utils

### DEPTH IMAGE DATA

#extract timestamp from filename
def extract_timestamp_from_filename(file_name):
    """Extract timestamp from file name using regex."""
    match = re.search(r'_(\d+)', file_name)
    return match.group(1) if match else None

#preprocess depth images
def preprocess_depth_image(depth_image, min_depth=10, max_depth=1000):
    """Preprocess depth image by applying filtering and normalization."""
    #normalize depth values to [0, 1]
    depth_image_normalized = cv2.normalize(depth_image, None, alpha=0, beta=1,
                                           norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F)

    #remove noisy values outside the valid depth range --> huge issue with noisy data
    depth_image_filtered = np.where((depth_image > min_depth) & (depth_image < max_depth), depth_image, 0)

    #apply median filtering to smooth noise
    depth_image_smoothed = cv2.medianBlur(depth_image_filtered.astype(np.uint8), ksize=5)

    #fill missing regions using inpainting
    mask = (depth_image_smoothed == 0).astype(np.uint8)
    depth_image_inpainted = cv2.inpaint(depth_image_smoothed, mask, inpaintRadius=5, flags=cv2.INPAINT_TELEA)

    #apply bilateral filtering to refine edges
    depth_image_bilateral = cv2.bilateralFilter(depth_image_inpainted, d=9, sigmaColor=75, sigmaSpace=75)

    return depth_image_bilateral

def match_rgb_and_depth_files(folder_path):
    """
    Match RGB and depth files based on exact timestamps.
    Returns a dictionary with timestamp keys and {'rgb': ..., 'depth': ...} values.
    """
    file_names = os.listdir(folder_path)
    rgb_files = [f for f in file_names if f.startswith("rgbFrame")]
    depth_files = [f for f in file_names if f.startswith("depthFrame")]

    # Build dictionaries by timestamp
    rgb_dict = {extract_timestamp(f): f for f in rgb_files}
    depth_dict = {extract_timestamp(f): f for f in depth_files}

    # Only keep matching pairs
    matched_timestamps = set(rgb_dict.keys()).intersection(depth_dict.keys())

    matched_frames = {
        ts: {'rgb': rgb_dict[ts], 'depth': depth_dict[ts]}
        for ts in matched_timestamps
    }

    print(f" Matched {len(matched_frames)} frame pairs.")
    return matched_frames



#validate landmarks using depth data
def validate_landmarks(rgb_image, depth_image, hand_landmarks):
    """Validate hand landmarks using depth data."""
    h, w = depth_image.shape
    for landmark in hand_landmarks.landmark:
        cx, cy = int(landmark.x * w), int(landmark.y * h)
        if not (0 <= cx < w and 0 <= cy < h and depth_image[cy, cx] > 0):
            return False
    return True

#compute angles between three points
def compute_angle(a, b, c):
    """Compute the angle formed by three points."""
    ab = np.array(b) - np.array(a)
    bc = np.array(c) - np.array(b)
    cos_theta = np.dot(ab, bc) / (np.linalg.norm(ab) * np.linalg.norm(bc))
    angle = np.degrees(np.arccos(np.clip(cos_theta, -1.0, 1.0)))
    return angle

def compute_vector_angle(a, b, c):
    """
    Compute the angle between vectors ab and bc using the dot product.
    Parameters:
        a, b, c: 3D coordinates of points (list or ndarray).
    """
    ab = np.array(b) - np.array(a)
    bc = np.array(c) - np.array(b)
    cos_theta = np.dot(ab, bc) / (np.linalg.norm(ab) * np.linalg.norm(bc))
    return np.degrees(np.arccos(np.clip(cos_theta, -1.0, 1.0)))

def compute_abduction_distance(point1, point2):
    """
    Compute the Euclidean distance between two points.
    Parameters:
        point1, point2: 3D coordinates of points.
    """
    return np.linalg.norm(np.array(point1) - np.array(point2))

def process_frames_and_compute_angles(folder_path, matched_frames, finger_indices, finger_colors, use_preprocessed=True):
    """
    Process RGB and depth frames, compute joint angles, and visualize 3D hand poses.
    Uses functions from the referenced paper to calculate flexion and abduction angles.
    """
    hands = mp_hands.Hands(static_image_mode=True, max_num_hands=2, min_detection_confidence=0.5)

    for i, (timestamp, files) in enumerate(matched_frames.items()):
        rgb_path = os.path.join(folder_path, files['rgb'])
        depth_path = os.path.join(folder_path, files['depth'])

        # Load RGB and depth images
        rgb_image = cv2.imread(rgb_path)
        depth_image = cv2.imread(depth_path, cv2.IMREAD_GRAYSCALE)

        if use_preprocessed:
            # Apply preprocessing to depth image
            depth_image = preprocess_depth_image(depth_image)

        # Resize RGB to match depth dimensions
        depth_height, depth_width = depth_image.shape
        rgb_image_resized = cv2.resize(rgb_image, (depth_width, depth_height))
        rgb_image_rgb = cv2.cvtColor(rgb_image_resized, cv2.COLOR_BGR2RGB)

        # Process with MediaPipe Hands
        results = hands.process(rgb_image_rgb)
        if not results.multi_hand_landmarks:
            continue

        # Process each detected hand
        for hand_landmarks in results.multi_hand_landmarks:
            landmarks_3d = []
            for landmark in hand_landmarks.landmark:
                cx, cy = int(landmark.x * depth_width), int(landmark.y * depth_height)
                z = depth_image[cy, cx] if (0 <= cx < depth_width and 0 <= cy < depth_height) else 0
                landmarks_3d.append([cx, cy, z])

            # Compute angles for each finger
            for finger, indices in finger_indices.items():
                if len(landmarks_3d) >= max(indices) + 1:
                    # Flexion Angles
                    mcp_angle = compute_vector_angle(landmarks_3d[indices[0]],
                                                     landmarks_3d[indices[1]],
                                                     landmarks_3d[indices[2]])
                    pip_angle = compute_vector_angle(landmarks_3d[indices[1]],
                                                     landmarks_3d[indices[2]],
                                                     landmarks_3d[indices[3]])
                    dip_angle = compute_vector_angle(landmarks_3d[indices[2]],
                                                     landmarks_3d[indices[3]],
                                                     landmarks_3d[indices[4]])

                    # Abduction Distances (between adjacent fingertips)
                    if finger != "thumb":  # Thumb abduction is handled separately
                        prev_tip = landmarks_3d[finger_indices[finger][4] - 4]
                        current_tip = landmarks_3d[indices[4]]
                        abduction_distance = compute_abduction_distance(prev_tip, current_tip)

                        print(f"{finger.capitalize()} Finger Abduction Distance - Frame {i}: {abduction_distance:.2f} mm")

                    print(f"{finger.capitalize()} Finger Angles - Frame {i}:")
                    print(f"  MCP: {mcp_angle:.2f}°")
                    print(f"  PIP: {pip_angle:.2f}°")
                    print(f"  DIP: {dip_angle:.2f}°")

            # Visualize the 3D Hand Pose
            visualize_3d_hand_pose_with_rgb(landmarks_3d, finger_indices, finger_colors, i)

    hands.close()




In [None]:
# Main Execution
folder_path = "Desktop/study ROM/Nelson/nelson_1"
finger_indices = {
    "thumb": [0, 1, 2, 3, 4],
    "index": [0, 5, 6, 7, 8],
    "middle": [0, 9, 10, 11, 12],
    "ring": [0, 13, 14, 15, 16],
    "pinky": [0, 17, 18, 19, 20],
}
finger_colors = {
    "thumb": "red",
    "index": "green",
    "middle": "blue",
    "ring": "orange",
    "pinky": "purple",
}

matched_frames = match_rgb_and_depth_files(folder_path)

In [None]:
folder_path = "Desktop/study ROM/Nelson/nelson_1"
matched_frames = match_rgb_and_depth_files(folder_path)

In [None]:
def visualize_3d_hand_pose_with_rgb(landmarks_3d, rgb_frame, frame_index):
    """
    Visualize 3D hand pose alongside the corresponding RGB frame using matplotlib.

    Parameters:
        landmarks_3d (list): List of 3D landmarks as [x, y, z].
        rgb_frame (numpy.ndarray): RGB frame as a numpy array (from cv2.imread or similar).
        frame_index (int): Current frame index for visualization.
    """
    import matplotlib.pyplot as plt

    # Ensure RGB frame is valid
    if rgb_frame is None or rgb_frame.size == 0:
        print("Error: RGB frame is empty or invalid.")
        return

    # External definitions of finger_indices and finger_colors
    global finger_indices, finger_colors

    fig = plt.figure(figsize=(12, 6))

    # Plot the RGB frame
    ax1 = fig.add_subplot(1, 2, 1)  # 1 row, 2 columns, 1st subplot
    try:
        ax1.imshow(cv2.cvtColor(rgb_frame, cv2.COLOR_BGR2RGB))  # Convert BGR to RGB for display
        ax1.set_title(f"RGB Frame - Frame {frame_index}")
        ax1.axis('off')  # Hide axis for the image
    except Exception as e:
        print(f"Error displaying RGB frame: {e}")
        return

    # Plot the 3D hand pose
    ax2 = fig.add_subplot(1, 2, 2, projection='3d')  # 1 row, 2 columns, 2nd subplot
    try:
        xs = [pt[0] for pt in landmarks_3d]
        ys = [pt[1] for pt in landmarks_3d]
        zs = [pt[2] for pt in landmarks_3d]

        # Plot all landmarks
        ax2.scatter(xs, ys, zs, c='black', s=20, label='Landmarks')

        # Connect landmarks for each finger
        for finger, indices in finger_indices.items():
            finger_points = [landmarks_3d[i] for i in indices]
            x_finger = [pt[0] for pt in finger_points]
            y_finger = [pt[1] for pt in finger_points]
            z_finger = [pt[2] for pt in finger_points]

            # Use predefined color for the finger
            color = finger_colors[finger]
            ax2.plot(x_finger, y_finger, z_finger, label=finger.capitalize(), color=color)

        # Set axis labels and title
        ax2.set_xlabel('X (pixels)')
        ax2.set_ylabel('Y (pixels)')
        ax2.set_zlabel('Z (depth)')
        ax2.set_title(f"3D Hand Pose - Frame {frame_index}")
        ax2.legend()
    except Exception as e:
        print(f"Error displaying 3D plot: {e}")
        return

    # Display both subplots
    plt.tight_layout()
    plt.show()



def process_frames_and_debug_compute_angles(folder_path, matched_frames, finger_indices, finger_colors, use_preprocessed=True):
    """
    Process RGB and depth frames, debug raw depth data loading, compute joint angles,
    and visualize 3D hand poses.
    """
    mp_hands = mp.solutions.hands
    hands = mp_hands.Hands(static_image_mode=True, max_num_hands=2, min_detection_confidence=0.5)

    for i, (timestamp, files) in enumerate(matched_frames.items()):
        rgb_path = os.path.join(folder_path, files['rgb'])
        depth_path = os.path.join(folder_path, files['depth'])

        print(f"\nProcessing Frame {i}: Timestamp {timestamp}")
        print(f"RGB Path: {rgb_path}")
        print(f"Depth Path: {depth_path}")

        # Load RGB and depth images
        rgb_image = cv2.imread(rgb_path)
        if rgb_image is None:
            print(f"Error: Failed to load RGB image at {rgb_path}")
            continue

        try:
            with open(depth_path, 'rb') as f:
                depth_image = np.fromfile(f, dtype=np.float32)
                print(f"Loaded depth image of size: {depth_image.size}")  # Should be 230400
            
                # Calculate dimensions
                height = depth_image.size // 640
                depth_image = depth_image.reshape((height, 640))
           
        except Exception as e:
            print(f"Error loading depth file {depth_path}: {e}")
            continue

        if use_preprocessed:
            # Apply preprocessing to depth image
            try:
                depth_image = preprocess_depth_image(depth_image)
            except Exception as e:
                print(f"Error preprocessing depth image: {e}")
                continue

        # Resize RGB to match depth dimensions
        depth_height, depth_width = depth_image.shape
        try:
            rgb_image_resized = cv2.resize(rgb_image, (depth_width, depth_height))
            rgb_image_rgb = cv2.cvtColor(rgb_image_resized, cv2.COLOR_BGR2RGB)
        except Exception as e:
            print(f"Error resizing or converting RGB image: {e}")
            continue

        # Process with MediaPipe Hands
        results = hands.process(rgb_image_rgb)
        if not results.multi_hand_landmarks:
            print("No hands detected in this frame.")
            continue

        # Process each detected hand
        for hand_landmarks in results.multi_hand_landmarks:
            landmarks_3d = []
            for landmark in hand_landmarks.landmark:
                cx, cy = int(landmark.x * depth_width), int(landmark.y * depth_height)
                z = depth_image[cy, cx] if (0 <= cx < depth_width and 0 <= cy < depth_height) else 0
                landmarks_3d.append([cx, cy, z])

            # Compute angles for each finger
            for finger, indices in finger_indices.items():
                if len(landmarks_3d) >= max(indices) + 1:
                    try:
                        # Flexion Angles
                        mcp_angle = compute_vector_angle(landmarks_3d[indices[0]],
                                                         landmarks_3d[indices[1]],
                                                         landmarks_3d[indices[2]])
                        pip_angle = compute_vector_angle(landmarks_3d[indices[1]],
                                                         landmarks_3d[indices[2]],
                                                         landmarks_3d[indices[3]])
                        dip_angle = compute_vector_angle(landmarks_3d[indices[2]],
                                                         landmarks_3d[indices[3]],
                                                         landmarks_3d[indices[4]])

                        print(f"{finger.capitalize()} Finger Angles - Frame {i}:")
                        print(f"  MCP: {mcp_angle:.2f}°")
                        print(f"  PIP: {pip_angle:.2f}°")
                        print(f"  DIP: {dip_angle:.2f}°")

                        # Abduction Distances
                        if finger != "thumb":  # Thumb abduction is handled separately
                            prev_tip = landmarks_3d[finger_indices[finger][4] - 4]
                            current_tip = landmarks_3d[indices[4]]
                            abduction_distance = compute_abduction_distance(prev_tip, current_tip)
                            print(f"{finger.capitalize()} Finger Abduction Distance - Frame {i}: {abduction_distance:.2f} mm")
                    except Exception as e:
                        print(f"Error computing angles or distances for {finger}: {e}")

            # Visualize the 3D Hand Pose
            try:
                visualize_3d_hand_pose_with_rgb(landmarks_3d, rgb_image_rgb, i)
            except Exception as e:
                print(f"Error visualizing 3D hand pose: {e}")

    hands.close()


In [None]:
process_frames_and_debug_compute_angles(folder_path, matched_frames, finger_indices, finger_colors, use_preprocessed=True)

In [None]:
import cv2
import os
import numpy as np
import matplotlib.pyplot as plt

def process_and_save_video_output(folder_path, matched_frames, finger_indices, finger_colors, use_preprocessed=True, output_path='hand_pose_output.mp4'):
    """
    Process frames and save a video of RGB and 3D hand pose side-by-side.
    """
    mp_hands = mp.solutions.hands
    hands = mp_hands.Hands(static_image_mode=True, max_num_hands=2, min_detection_confidence=0.5)

    frames = []

    for i, (timestamp, files) in enumerate(matched_frames.items()):
        rgb_path = os.path.join(folder_path, files['rgb'])
        depth_path = os.path.join(folder_path, files['depth'])

        rgb_image = cv2.imread(rgb_path)
        if rgb_image is None:
            continue

        try:
            with open(depth_path, 'rb') as f:
                depth_image = np.fromfile(f, dtype=np.float32)
                depth_image = depth_image.reshape((depth_image.size // 640, 640))
        except:
            continue

        if use_preprocessed:
            try:
                depth_image = preprocess_depth_image(depth_image)
            except:
                continue

        depth_height, depth_width = depth_image.shape
        try:
            rgb_image_resized = cv2.resize(rgb_image, (depth_width, depth_height))
            rgb_image_rgb = cv2.cvtColor(rgb_image_resized, cv2.COLOR_BGR2RGB)
        except:
            continue

        results = hands.process(rgb_image_rgb)
        if not results.multi_hand_landmarks:
            continue

        for hand_landmarks in results.multi_hand_landmarks:
            landmarks_3d = []
            for landmark in hand_landmarks.landmark:
                cx, cy = int(landmark.x * depth_width), int(landmark.y * depth_height)
                z = depth_image[cy, cx] if (0 <= cx < depth_width and 0 <= cy < depth_height) else 0
                landmarks_3d.append([cx, cy, z])

            # Create figure with side-by-side plots
            fig = plt.figure(figsize=(12, 6))
            ax1 = fig.add_subplot(1, 2, 1)
            ax1.imshow(rgb_image_rgb)
            ax1.set_title(f"RGB Frame {i}")
            ax1.axis('off')

            ax2 = fig.add_subplot(1, 2, 2, projection='3d')
            xs = [pt[0] for pt in landmarks_3d]
            ys = [pt[1] for pt in landmarks_3d]
            zs = [pt[2] for pt in landmarks_3d]
            ax2.scatter(xs, ys, zs, c='black', s=20)

            for finger, indices in finger_indices.items():
                finger_points = [landmarks_3d[i] for i in indices]
                x_f = [p[0] for p in finger_points]
                y_f = [p[1] for p in finger_points]
                z_f = [p[2] for p in finger_points]
                ax2.plot(x_f, y_f, z_f, color=finger_colors[finger], label=finger)

            ax2.set_xlim([0, depth_width])
            ax2.set_ylim([depth_height, 0])
            ax2.set_zlim([0, np.max(zs)*1.2 if len(zs) > 0 else 1])
            ax2.set_title(f"3D Hand Pose - Frame {i}")
            ax2.axis('off')

            plt.tight_layout()
            fig.canvas.draw()

            # Convert figure to image array
            img_np = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
            img_np = img_np.reshape(fig.canvas.get_width_height()[::-1] + (3,))
            frames.append(img_np)

            plt.close(fig)

    hands.close()

    # Save video
    if frames:
        height, width, _ = frames[0].shape
        out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), 5, (width, height))
        for frame in frames:
            bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
            out.write(bgr)
        out.release()
        print(f" Video saved to {output_path}")
    else:
        print(" No frames were processed. Video not saved.")


In [None]:


def create_rgb_video_from_frames(folder_path, output_filename="rgb_video_output.mp4", fps=30):
    """
    Creates a 30 FPS video from RGB frames using their original resolution.
    """
    file_names = sorted([f for f in os.listdir(folder_path) if f.startswith('rgbFrame') and f.endswith('.png')])

    if not file_names:
        print(" No RGB frames found.")
        return

    output_path = os.path.join(os.path.dirname(folder_path), output_filename)

    # Get frame size from the first image
    first_frame = cv2.imread(os.path.join(folder_path, file_names[0]))
    if first_frame is None:
        print(" Could not read the first frame.")
        return

    height, width, _ = first_frame.shape
    video_writer = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

    for filename in file_names:
        path = os.path.join(folder_path, filename)
        frame = cv2.imread(path)
        if frame is not None:
            video_writer.write(frame)
        else:
            print(f" Skipped: {filename}")

    video_writer.release()
    print(f" 30 FPS RGB video saved to: {output_path}")


In [None]:
folder_path = "Desktop/study ROM/Andrew/2"
create_rgb_video_from_frames(folder_path)

In [None]:


DATA_PATH = "Desktop/Classification_ROM"  

CLASSES = {'healthy': 0, 'limited': 1}

for class_name in CLASSES:
    class_dir = os.path.join(DATA_PATH, class_name)
    subjects = sorted([
        d for d in os.listdir(class_dir)
        if os.path.isdir(os.path.join(class_dir, d)) and not d.startswith('.')
    ])
    print(f"\nClass: {class_name}")
    print("Subjects found:", subjects)


In [None]:


def extract_number(file_name):
    match = re.search(r'_(\d+)', file_name)
    return int(match.group(1)) if match else None

subject_path = os.path.join(DATA_PATH, 'healthy', 'S1')
all_files = sorted(os.listdir(subject_path))

rgb_files = [f for f in all_files if f.startswith('rgbFrame')]
depth_files = [f for f in all_files if f.startswith('rawDepth')]

frame_ids = sorted(set(extract_number(f) for f in rgb_files if extract_number(f) is not None))

print("Example RGB files:", rgb_files[:5])
print("Example Depth files:", depth_files[:5])
print("Extracted frame numbers:", frame_ids[:10])


In [None]:
def extract_timestamp(filename):
    match = re.search(r'_(\d+)', filename)
    return int(match.group(1)) if match else None

subject_path = os.path.join(DATA_PATH, 'healthy', 'S1')
all_files = sorted(os.listdir(subject_path))

rgb_files = [f for f in all_files if f.startswith('rgbFrame')]
depth_files = [f for f in all_files if f.startswith('rawDepth')]

# Match files by timestamp
timestamps_rgb = {extract_timestamp(f): f for f in rgb_files}
timestamps_depth = {extract_timestamp(f): f for f in depth_files}

# Find common timestamps
matched_timestamps = sorted(set(timestamps_rgb.keys()) & set(timestamps_depth.keys()))

if matched_timestamps:
    print("Matched timestamps:", matched_timestamps[:10])
    print("Example matched pair:",
          timestamps_rgb[matched_timestamps[0]],
          timestamps_depth[matched_timestamps[0]])
else:
    print(" No matched depth files found for healthy/S1.")
    print("You can proceed with RGB-only model first.")


In [None]:

WINDOW_SIZE = 10
X_rgb = []
y = []

mp_hands = mp.solutions.hands

def extract_timestamp(filename):
    match = re.search(r'_(\d+)', filename)
    return int(match.group(1)) if match else None

def extract_rgb_sequence_features(image_paths):
    coords = []

    with mp_hands.Hands(static_image_mode=True, max_num_hands=1) as hands:
        for path in image_paths:
            img = cv2.imread(path)
            if img is None:
                continue

            img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            result = hands.process(img_rgb)

            if not result.multi_hand_landmarks:
                continue

            hand = result.multi_hand_landmarks[0]
            coords.append(np.array([(lm.x, lm.y) for lm in hand.landmark]).flatten())

    if len(coords) < WINDOW_SIZE:
        return None

    coords = np.array(coords[:WINDOW_SIZE])
    return np.concatenate([np.mean(coords, axis=0), np.std(coords, axis=0)])

# Loop through all classes and subjects
for class_name, label in CLASSES.items():
    class_dir = os.path.join(DATA_PATH, class_name)
    subjects = sorted([
        d for d in os.listdir(class_dir)
        if os.path.isdir(os.path.join(class_dir, d)) and not d.startswith('.')
    ])

    for subj in subjects:
        subj_path = os.path.join(class_dir, subj)
        files = sorted(os.listdir(subj_path))
        rgb_files = [f for f in files if f.startswith('rgbFrame')]
        timestamps = sorted([extract_timestamp(f) for f in rgb_files if extract_timestamp(f)])

        for i in range(0, len(timestamps) - WINDOW_SIZE + 1, WINDOW_SIZE):
            window_ts = timestamps[i:i + WINDOW_SIZE]
            rgb_paths = [os.path.join(subj_path, f"rgbFrame_{ts}.png") for ts in window_ts]
            feat = extract_rgb_sequence_features(rgb_paths)
            if feat is not None:
                X_rgb.append(feat)
                y.append(label)

print(f"Extracted {len(X_rgb)} RGB-only sequences.")


In [None]:

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
    X_rgb, y, test_size=0.2, stratify=y, random_state=42
)

# Train Random Forest
clf = RandomForestClassifier(n_estimators=100, random_state=42)
clf.fit(X_train, y_train)

# Predict
y_pred = clf.predict(X_test)

# Report
print("\n🎯 Classification Report (RGB only):\n")
print(classification_report(y_test, y_pred, target_names=['Healthy', 'Limited']))

# Confusion matrix
cm = confusion_matrix(y_test, y_pred)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Healthy', 'Limited'], yticklabels=['Healthy', 'Limited'])
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Confusion Matrix (RGB Only)")
plt.show()


In [None]:


WINDOW_SIZE = 10
SEQ_X = []
SEQ_Y = []

mp_hands = mp.solutions.hands

def extract_timestamp(filename):
    match = re.search(r'_(\d+)', filename)
    return int(match.group(1)) if match else None

def extract_raw_sequence(image_paths):
    coords = []
    with mp_hands.Hands(static_image_mode=True, max_num_hands=1) as hands:
        for path in image_paths:
            img = cv2.imread(path)
            if img is None:
                continue
            img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            result = hands.process(img_rgb)
            if not result.multi_hand_landmarks:
                return None  # skip sequences with missing hands
            hand = result.multi_hand_landmarks[0]
            coords.append(np.array([(lm.x, lm.y) for lm in hand.landmark]).flatten())
    if len(coords) < WINDOW_SIZE:
        return None
    return np.array(coords[:WINDOW_SIZE])  # shape: (10, 42)

# Main loop
SEQ_X = []
SEQ_Y = []

for class_name, label in CLASSES.items():
    class_dir = os.path.join(DATA_PATH, class_name)
    subjects = sorted([
        d for d in os.listdir(class_dir)
        if os.path.isdir(os.path.join(class_dir, d)) and not d.startswith('.')
    ])
    for subj in subjects:
        subj_path = os.path.join(class_dir, subj)
        files = sorted(os.listdir(subj_path))
        rgb_files = [f for f in files if f.startswith('rgbFrame')]
        timestamps = sorted([extract_timestamp(f) for f in rgb_files if extract_timestamp(f)])

        for i in range(0, len(timestamps) - WINDOW_SIZE + 1, WINDOW_SIZE):
            window_ts = timestamps[i:i + WINDOW_SIZE]
            rgb_paths = [os.path.join(subj_path, f"rgbFrame_{ts}.png") for ts in window_ts]
            sequence = extract_raw_sequence(rgb_paths)
            if sequence is not None:
                SEQ_X.append(sequence)  # (10, 42)
                SEQ_Y.append(label)

SEQ_X = np.array(SEQ_X)  # shape: (num_sequences, 10, 42)
SEQ_Y = np.array(SEQ_Y)

print(" Shape of sequence data:", SEQ_X.shape)


In [None]:
from sklearn.decomposition import PCA

# Reshape to apply PCA: (3420, 42)
X_flat = SEQ_X.reshape(-1, 42)

# Fit PCA
pca = PCA(n_components=10)  # Choose based on explained variance or course alignment
X_pca_flat = pca.fit_transform(X_flat)

# Reshape back to sequences: (342, 10, 10)
X_seq_pca = X_pca_flat.reshape(SEQ_X.shape[0], SEQ_X.shape[1], -1)

print(" After PCA, shape is:", X_seq_pca.shape)


In [None]:
!pip install tensorflow

In [None]:
from sklearn.decomposition import PCA

# Reshape to apply PCA: (3420, 42)
X_flat = SEQ_X.reshape(-1, 42)

# Fit PCA
pca = PCA(n_components=10)  # Choose based on explained variance or course alignment
X_pca_flat = pca.fit_transform(X_flat)

# Reshape back to sequences: (342, 10, 10)
X_seq_pca = X_pca_flat.reshape(SEQ_X.shape[0], SEQ_X.shape[1], -1)

print(" After PCA, shape is:", X_seq_pca.shape)


In [None]:


# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
    X_seq_pca, SEQ_Y, test_size=0.2, stratify=SEQ_Y, random_state=42
)

# One-hot encode labels
y_train_cat = to_categorical(y_train)
y_test_cat = to_categorical(y_test)

# Define model
model = Sequential([
    Bidirectional(LSTM(64, return_sequences=False), input_shape=(10, 10)),
    Dropout(0.3),
    Dense(32, activation='relu'),
    Dropout(0.2),
    Dense(2, activation='softmax')
])

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Train
history = model.fit(
    X_train, y_train_cat,
    epochs=100,
    batch_size=16,
    validation_split=0.2,
    callbacks=[EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)],
    verbose=1
)


In [None]:
from sklearn.metrics import classification_report, confusion_matrix
import numpy as np

# Predict on test set
y_pred_prob = model.predict(X_test)
y_pred = np.argmax(y_pred_prob, axis=1)
y_true = np.argmax(y_test_cat, axis=1)

# Report
print("\n📊 LSTM Classification Report:\n")
print(classification_report(y_true, y_pred, target_names=['Healthy', 'Limited']))


In [None]:
from sklearn.random_projection import GaussianRandomProjection

# Flatten all frames: shape (3420, 42)
X_flat = SEQ_X.reshape(-1, 42)

# Apply Random Projection
rp = GaussianRandomProjection(n_components=10, random_state=42)
X_rp_flat = rp.fit_transform(X_flat)

# Reshape back: (342, 10, 10)
X_seq_rp = X_rp_flat.reshape(SEQ_X.shape[0], SEQ_X.shape[1], -1)

print(" After Random Projection, shape is:", X_seq_rp.shape)


In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split

# Train-test split
X_train_rp, X_test_rp, y_train_rp, y_test_rp = train_test_split(
    X_seq_rp, SEQ_Y, test_size=0.2, stratify=SEQ_Y, random_state=42
)

# One-hot encode labels
y_train_rp_cat = to_categorical(y_train_rp)
y_test_rp_cat = to_categorical(y_test_rp)

# Define LSTM model
model_rp = Sequential([
    Bidirectional(LSTM(64, return_sequences=False), input_shape=(10, 10)),
    Dropout(0.3),
    Dense(32, activation='relu'),
    Dropout(0.2),
    Dense(2, activation='softmax')
])

model_rp.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Train with early stopping
history_rp = model_rp.fit(
    X_train_rp, y_train_rp_cat,
    epochs=100,
    batch_size=16,
    validation_split=0.2,
    callbacks=[EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)],
    verbose=1
)


In [None]:
from sklearn.metrics import classification_report

y_pred_rp = model_rp.predict(X_test_rp)
y_pred_rp_class = np.argmax(y_pred_rp, axis=1)
y_true_rp = np.argmax(y_test_rp_cat, axis=1)

print("\n LSTM with Random Projection:\n")
print(classification_report(y_true_rp, y_pred_rp_class, target_names=["Healthy", "Limited"]))


In [None]:
def load_depth_raw(path):
    with open(path, 'rb') as f:
        depth = np.fromfile(f, dtype=np.float32)
    return depth.reshape((480, 640))  # Confirm this matches your image resolution

def extract_3d_sequence(rgb_paths, depth_paths):
    coords = []
    with mp_hands.Hands(static_image_mode=True, max_num_hands=1) as hands:
        for rgb_path, depth_path in zip(rgb_paths, depth_paths):
            img = cv2.imread(rgb_path)
            if img is None:
                continue
            img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            result = hands.process(img_rgb)
            if not result.multi_hand_landmarks:
                return None
            hand = result.multi_hand_landmarks[0]

            # Load depth
            depth_img = load_depth_raw(depth_path)
            h, w = img_rgb.shape[:2]

            # 3D landmarks
            coords_3d = []
            for lm in hand.landmark:
                cx, cy = int(lm.x * w), int(lm.y * h)
                z = depth_img[cy, cx] if (0 <= cx < 640 and 0 <= cy < 480) else 0
                coords_3d.append([lm.x, lm.y, z])
            coords.append(np.array(coords_3d).flatten())

    if len(coords) < WINDOW_SIZE:
        return None
    return np.array(coords[:WINDOW_SIZE])  # shape: (10, 63)


In [None]:
# Init
mp_hands = mp.solutions.hands
WINDOW_SIZE = 10
DATA_PATH = "Desktop/Classification_ROM"  # <-- update this
CLASSES = {'healthy': 0, 'limited': 1}

# Extract timestamp from filenames like rgbFrame_1745435851837.png
def extract_timestamp(filename):
    match = re.search(r'_(\d+)', filename)
    return int(match.group(1)) if match else None

# Load depth from .raw file
def load_depth_raw(path):
    with open(path, 'rb') as f:
        depth = np.fromfile(f, dtype=np.float32)
    return depth.reshape((360, 640))  # Adjust if your depth resolution is different

# Extract 3D (x, y, z) sequence from a 10-frame window
def extract_3d_sequence(rgb_paths, depth_paths):
    coords = []
    with mp_hands.Hands(static_image_mode=True, max_num_hands=1) as hands:
        for rgb_path, depth_path in zip(rgb_paths, depth_paths):
            img = cv2.imread(rgb_path)
            if img is None:
                return None

            img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            result = hands.process(img_rgb)
            if not result.multi_hand_landmarks:
                return None

            hand = result.multi_hand_landmarks[0]
            depth_img = load_depth_raw(depth_path)
            h, w = img_rgb.shape[:2]

            coords_3d = []
            for lm in hand.landmark:
                cx, cy = int(lm.x * w), int(lm.y * h)
                z = depth_img[cy, cx] if (0 <= cx < 640 and 0 <= cy < 480) else 0
                coords_3d.append([lm.x, lm.y, z])

            coords.append(np.array(coords_3d).flatten())

    if len(coords) < WINDOW_SIZE:
        return None
    return np.array(coords[:WINDOW_SIZE])  # Shape: (10, 63)

# MAIN: Loop over data and extract RGBD 3D sequences
SEQ_X_3D = []
SEQ_Y_3D = []

for class_name, label in CLASSES.items():
    class_dir = os.path.join(DATA_PATH, class_name)
    subjects = sorted([
        d for d in os.listdir(class_dir)
        if os.path.isdir(os.path.join(class_dir, d)) and not d.startswith('.')
    ])
    for subj in subjects:
        subj_path = os.path.join(class_dir, subj)
        files = sorted(os.listdir(subj_path))
        rgb_files = [f for f in files if f.startswith('rgbFrame')]
        depth_files = [f for f in files if f.startswith('depthFrame')]

        timestamps_rgb = {extract_timestamp(f): f for f in rgb_files}
        timestamps_depth = {extract_timestamp(f): f for f in depth_files}
        matched_ts = sorted(set(timestamps_rgb) & set(timestamps_depth))

        for i in range(0, len(matched_ts) - WINDOW_SIZE + 1, WINDOW_SIZE):
            window_ts = matched_ts[i:i + WINDOW_SIZE]
            rgb_paths = [os.path.join(subj_path, timestamps_rgb[ts]) for ts in window_ts]
            depth_paths = [os.path.join(subj_path, timestamps_depth[ts]) for ts in window_ts]

            seq_3d = extract_3d_sequence(rgb_paths, depth_paths)
            if seq_3d is not None:
                SEQ_X_3D.append(seq_3d)
                SEQ_Y_3D.append(label)

SEQ_X_3D = np.array(SEQ_X_3D)
SEQ_Y_3D = np.array(SEQ_Y_3D)

print(" RGBD 3D Sequences Extracted:", SEQ_X_3D.shape)


In [None]:
from sklearn.decomposition import PCA

# Flatten for PCA: (342 × 10, 63)
X_3D_flat = SEQ_X_3D.reshape(-1, 63)

# Apply PCA
pca_3D = PCA(n_components=10)
X_3D_pca_flat = pca_3D.fit_transform(X_3D_flat)

# Reshape back to sequence format: (342, 10, 10)
X_seq_3D_pca = X_3D_pca_flat.reshape(SEQ_X_3D.shape[0], SEQ_X_3D.shape[1], -1)

print(" Shape after PCA (RGBD):", X_seq_3D_pca.shape)


In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split

# Split the RGBD data
X_train_3D, X_test_3D, y_train_3D, y_test_3D = train_test_split(
    X_seq_3D_pca, SEQ_Y_3D, test_size=0.2, stratify=SEQ_Y_3D, random_state=42
)

# One-hot encode labels
y_train_3D_cat = to_categorical(y_train_3D)
y_test_3D_cat = to_categorical(y_test_3D)

# Define LSTM model
model_3D = Sequential([
    Bidirectional(LSTM(64, return_sequences=False), input_shape=(10, 10)),
    Dropout(0.3),
    Dense(32, activation='relu'),
    Dropout(0.2),
    Dense(2, activation='softmax')
])

model_3D.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Train with early stopping
history_3D = model_3D.fit(
    X_train_3D, y_train_3D_cat,
    epochs=100,
    batch_size=16,
    validation_split=0.2,
    callbacks=[EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)],
    verbose=1
)


In [None]:
from sklearn.metrics import classification_report

y_pred_3D = model_3D.predict(X_test_3D)
y_pred_3D_class = np.argmax(y_pred_3D, axis=1)
y_true_3D = np.argmax(y_test_3D_cat, axis=1)

print("\nRGBD LSTM Classification Report:\n")
print(classification_report(y_true_3D, y_pred_3D_class, target_names=["Healthy", "Limited"]))


In [None]:
# Split without PCA
X_train_3D_raw, X_test_3D_raw, y_train_3D_raw, y_test_3D_raw = train_test_split(
    SEQ_X_3D, SEQ_Y_3D, test_size=0.2, stratify=SEQ_Y_3D, random_state=42
)

# One-hot encode
y_train_3D_raw_cat = to_categorical(y_train_3D_raw)
y_test_3D_raw_cat = to_categorical(y_test_3D_raw)

# Define new LSTM model (adjusted for 63 input features)
model_3D_raw = Sequential([
    Bidirectional(LSTM(64, return_sequences=False), input_shape=(10, 63)),
    Dropout(0.3),
    Dense(32, activation='relu'),
    Dropout(0.2),
    Dense(2, activation='softmax')
])

model_3D_raw.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Train
history_3D_raw = model_3D_raw.fit(
    X_train_3D_raw, y_train_3D_raw_cat,
    epochs=100,
    batch_size=16,
    validation_split=0.2,
    callbacks=[EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)],
    verbose=1
)


In [None]:
from sklearn.metrics import classification_report

y_pred_3D_raw = model_3D_raw.predict(X_test_3D_raw)
y_pred_3D_raw_class = np.argmax(y_pred_3D_raw, axis=1)
y_true_3D_raw = np.argmax(y_test_3D_raw_cat, axis=1)

print("\nRGBD LSTM (No PCA) Classification Report:\n")
print(classification_report(y_true_3D_raw, y_pred_3D_raw_class, target_names=["Healthy", "Limited"]))


In [None]:
def create_jittered_sequences_3d(sequences, labels, jitter_std=0.005, num_copies=1):
    synthetic_X = []
    synthetic_y = []

    for _ in range(num_copies):
        for seq, label in zip(sequences, labels):
            noise = np.random.normal(loc=0.0, scale=jitter_std, size=seq.shape)
            jittered = seq + noise
            synthetic_X.append(jittered)
            synthetic_y.append(label)
    
    return np.array(synthetic_X), np.array(synthetic_y)


In [None]:
# Create 1 synthetic version per real sequence
synthetic_X_3D, synthetic_y_3D = create_jittered_sequences_3d(SEQ_X_3D, SEQ_Y_3D, jitter_std=0.005, num_copies=1)

# Combine real + synthetic
X_3D_augmented = np.concatenate([SEQ_X_3D, synthetic_X_3D], axis=0)
y_3D_augmented = np.concatenate([SEQ_Y_3D, synthetic_y_3D], axis=0)

print("Augmented RGBD dataset shape:", X_3D_augmented.shape)


In [None]:
from sklearn.decomposition import PCA

# Flatten: (684 × 10, 63)
X_3D_aug_flat = X_3D_augmented.reshape(-1, 63)

# Fit and apply PCA
pca_aug = PCA(n_components=10)
X_3D_aug_pca_flat = pca_aug.fit_transform(X_3D_aug_flat)

# Reshape back: (684, 10, 10)
X_seq_3D_aug_pca = X_3D_aug_pca_flat.reshape(X_3D_augmented.shape[0], X_3D_augmented.shape[1], -1)

print("Shape after PCA on augmented data:", X_seq_3D_aug_pca.shape)


In [None]:

# Train-test split
X_train_aug, X_test_aug, y_train_aug, y_test_aug = train_test_split(
    X_seq_3D_aug_pca, y_3D_augmented, test_size=0.2, stratify=y_3D_augmented, random_state=42
)

# One-hot encode labels
y_train_aug_cat = to_categorical(y_train_aug)
y_test_aug_cat = to_categorical(y_test_aug)

# LSTM model
model_aug = Sequential([
    Bidirectional(LSTM(64, return_sequences=False), input_shape=(10, 10)),
    Dropout(0.3),
    Dense(32, activation='relu'),
    Dropout(0.2),
    Dense(2, activation='softmax')
])

model_aug.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Train
history_aug = model_aug.fit(
    X_train_aug, y_train_aug_cat,
    epochs=100,
    batch_size=16,
    validation_split=0.2,
    callbacks=[EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)],
    verbose=1
)


In [None]:
from sklearn.metrics import classification_report

y_pred_aug = model_aug.predict(X_test_aug)
y_pred_aug_class = np.argmax(y_pred_aug, axis=1)
y_true_aug = np.argmax(y_test_aug_cat, axis=1)

print("\nAugmented RGBD LSTM Classification Report:\n")
print(classification_report(y_true_aug, y_pred_aug_class, target_names=["Healthy", "Limited"]))


In [None]:
import matplotlib.pyplot as plt

models = [
    "RGB + PCA", "RGB + RP", "RGBD + PCA", "RGBD (no PCA)", 
    "RGBD + PCA + Jitter"
]
accuracies = [0.75, 0.64, 0.77, 0.64, 0.93]

plt.figure(figsize=(10, 5))
plt.bar(models, accuracies, color='skyblue')
plt.ylim(0, 1)
plt.ylabel("Accuracy")
plt.title("Classification Accuracy Across Models")
plt.xticks(rotation=30)
plt.grid(axis='y')
plt.tight_layout()
plt.show()


In [None]:
from sklearn.metrics import confusion_matrix
import seaborn as sns

cm = confusion_matrix(y_true_aug, y_pred_aug_class)
plt.figure(figsize=(5, 4))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=["Healthy", "Limited"], 
            yticklabels=["Healthy", "Limited"])
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Confusion Matrix (RGBD + Augmented)")
plt.tight_layout()
plt.show()


In [None]:
from mpl_toolkits.mplot3d import Axes3D

def plot_hand_pose_3d(landmarks_3d):
    fig = plt.figure(figsize=(6, 5))
    ax = fig.add_subplot(111, projection='3d')
    landmarks_3d = np.array(landmarks_3d).reshape(21, 3)
    ax.scatter(landmarks_3d[:, 0], landmarks_3d[:, 1], landmarks_3d[:, 2], color='blue')
    ax.set_title("3D Hand Pose from RGBD")
    ax.set_xlabel("x")
    ax.set_ylabel("y")
    ax.set_zlabel("z")
    plt.tight_layout()
    plt.show()


In [None]:
pca_temp = PCA(n_components=30)
pca_temp.fit(X_3D_aug_flat)
plt.figure(figsize=(6, 4))
plt.plot(np.cumsum(pca_temp.explained_variance_ratio_), marker='o')
plt.xlabel("Number of Components")
plt.ylabel("Cumulative Explained Variance")
plt.title("PCA Variance Explained (RGBD)")
plt.grid(True)
plt.tight_layout()
plt.show()
