In [71]:
import os
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import mediapipe as mp
import matplotlib.pyplot as plt
from torch_geometric.data import Data, Dataset
from torch_geometric.loader import DataLoader
from torch_geometric.nn import GCNConv, global_mean_pool

# ✅ Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=True, min_detection_confidence=0.5)

# ✅ Define All 33 Keypoints for Full Skeleton
edges = [
    (0, 1), (1, 2), (2, 3), (3, 4),  # Right arm
    (0, 5), (5, 6), (6, 7), (7, 8),  # Left arm
    (0, 9), (9, 10), (10, 11), (11, 12),  # Upper torso
    (11, 23), (12, 24),  # Hips
    (23, 24), (23, 25), (24, 26),  # Waist to legs
    (25, 27), (26, 28), (27, 29), (28, 30),  # Knees to feet
    (15, 19), (16, 20), (19, 21), (20, 22),  # Hands & Fingers
    (9, 13), (13, 15), (10, 14), (14, 16)  # Shoulders to hands
]

# ✅ Function to Extract Keypoints and Create Graph
def extract_keypoints(image_path):
    """ Extract keypoints from an image using MediaPipe"""
    image = cv2.imread(image_path)
    if image is None:
        print(f"❌ ERROR: Cannot load {image_path}")
        return None

    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    results = pose.process(image_rgb)

    if not results.pose_landmarks:
        print(f"⚠️ WARNING: No keypoints detected for {image_path}")
        return None

    # Extract keypoints and scale to image size
    keypoints = []
    for landmark in results.pose_landmarks.landmark:
        if landmark.visibility > 0.5:
            keypoints.append([landmark.x * image.shape[1], landmark.y * image.shape[0]])
        else:
            keypoints.append([np.nan, np.nan])  # Replace missing keypoints

    # Convert keypoints to tensor
    keypoints = np.array(keypoints)

    # Handle missing keypoints
    valid_indices = ~np.isnan(keypoints).any(axis=1)  # ✅ Mask valid keypoints
    keypoints = keypoints[valid_indices]  # ✅ Remove invalid keypoints

    return torch.tensor(keypoints, dtype=torch.float)

# ✅ Convert Keypoints into Graph
def create_graph_from_keypoints(keypoints, label):
    """Converts keypoints into a PyTorch Geometric Graph"""

    if keypoints is None or keypoints.numel() == 0:
        return None  # Skip images with no keypoints

    edge_index = torch.tensor(edges, dtype=torch.long).t().contiguous()
    return Data(x=keypoints, edge_index=edge_index, y=torch.tensor(label, dtype=torch.long))


I0000 00:00:1741708560.560706  321053 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1741708560.593784  332558 gl_context.cc:369] GL version: 3.2 (OpenGL ES 3.2 NVIDIA 550.120), renderer: NVIDIA GeForce GTX 1650/PCIe/SSE2
W0000 00:00:1741708560.651367  332550 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1741708560.682489  332553 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


In [73]:
class YogaPoseDataset(Dataset):
    def __init__(self, root_dir, split="train"):
        self.root_dir = os.path.join(root_dir, split)
        self.image_paths = []
        self.labels = []
        self.classes = sorted(os.listdir(self.root_dir))  # ✅ Dynamically detect classes

        # Read dataset and prepare labels
        for idx, pose_class in enumerate(self.classes):
            class_path = os.path.join(self.root_dir, pose_class)
            if not os.path.exists(class_path):
                continue
            
            for img_name in os.listdir(class_path):
                img_path = os.path.join(class_path, img_name)
                self.image_paths.append(img_path)
                self.labels.append(idx)  # Assign numerical label

        # ✅ Remove bad images during dataset initialization
        valid_data = []
        for img, label in zip(self.image_paths, self.labels):
            keypoints = extract_keypoints(img)
            if keypoints is not None:
                valid_data.append((img, label))

        self.image_paths, self.labels = zip(*valid_data) if valid_data else ([], [])

    def len(self):
        return len(self.image_paths)

    def get(self, idx):
        image_path = self.image_paths[idx]
        label = self.labels[idx]

        keypoints = extract_keypoints(image_path)
        return create_graph_from_keypoints(keypoints, label)


In [74]:
# ✅ Define Graph Convolutional Network
class ST_GCN(nn.Module):
    def __init__(self, in_channels, hidden_channels, num_classes):
        super(ST_GCN, self).__init__()
        self.gcn1 = GCNConv(in_channels, hidden_channels)
        self.gcn2 = GCNConv(hidden_channels, hidden_channels)
        self.gcn3 = GCNConv(hidden_channels, num_classes)
        self.relu = nn.ReLU()

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch
        x = self.gcn1(x, edge_index)
        x = self.relu(x)
        x = self.gcn2(x, edge_index)
        x = self.relu(x)
        x = self.gcn3(x, edge_index)
        return global_mean_pool(x, batch)  # ✅ Use mean pooling for classification


In [75]:
# ✅ Move Model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BASE_DIR = "/home/raichuboy/Projects/yoga/archive"

train_dataset = YogaPoseDataset(BASE_DIR, split="train")
valid_dataset = YogaPoseDataset(BASE_DIR, split="valid")
test_dataset = YogaPoseDataset(BASE_DIR, split="test")

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=16, shuffle=False)

model = ST_GCN(in_channels=2, hidden_channels=16, num_classes=len(train_dataset.classes)).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.01)
loss_fn = nn.CrossEntropyLoss()

# ✅ Train Function
def train_model(model, train_loader, valid_loader, epochs=50):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for batch_idx, data in enumerate(train_loader):
            if data is None or not hasattr(data, "x") or not hasattr(data, "edge_index"):
                print(f"⚠️ Skipping Invalid Batch {batch_idx}...")
                continue

            data = data.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = loss_fn(output, data.y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        # ✅ Validate Model
        valid_loss = 0
        model.eval()
        with torch.no_grad():
            for batch_idx, data in enumerate(valid_loader):
                if data is None or not hasattr(data, "x") or not hasattr(data, "edge_index"):
                    print(f"⚠️ Skipping Invalid Validation Batch {batch_idx}...")
                    continue

                data = data.to(device)
                output = model(data)
                loss = loss_fn(output, data.y)
                valid_loss += loss.item()

        print(f"Epoch {epoch}, Train Loss: {total_loss/len(train_loader):.4f}, Valid Loss: {valid_loss/len(valid_loader):.4f}")

# ✅ Train the model
train_model(model, train_loader, valid_loader, epochs=50)


❌ ERROR: Cannot load /home/raichuboy/Projects/yoga/archive/train/Akarna_Dhanurasana/Akarna_Dhanurasana_image_17.jpg
❌ ERROR: Cannot load /home/raichuboy/Projects/yoga/archive/train/Akarna_Dhanurasana/Akarna_Dhanurasana_image_36.jpg




❌ ERROR: Cannot load /home/raichuboy/Projects/yoga/archive/train/Bound_Angle_Pose_or_Baddha_Konasana_/Bound_Angle_Pose_or_Baddha_Konasana__image_186.jpg






KeyboardInterrupt: 