In [None]:
!pip install tensorflow tensorflow-hub opencv-python numpy
!pip install torch torchvision
!pip install torch-scatter torch-sparse torch-cluster torch-spline-conv torch-geometric -f https://data.pyg.org/whl/torch-2.0.0+cu118.html


Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5

In [None]:

import cv2
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub

def load_movenet_model():
    model_url = "https://tfhub.dev/google/movenet/singlepose/thunder/4"
    model = hub.load(model_url)
    return model
def preprocess_frame(frame, target_size=(256, 256)):
    # Resize and convert from BGR to RGB.
    img = cv2.resize(frame, target_size)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    # Explicitly convert to int32 (do not normalize)
    img = img.astype(np.int32)
    # Add batch dimension.
    input_img = np.expand_dims(img, axis=0)
    print("Preprocessed image dtype:", input_img.dtype)  # Should be int32
    return input_img

def extract_keypoints_from_frame(model, frame):
    input_img = preprocess_frame(frame)
    # Double-check dtype and force cast if needed.
    if input_img.dtype != np.int32:
        input_img = input_img.astype(np.int32)
    input_tensor = tf.constant(input_img, dtype=tf.int32)
    print("Input tensor dtype:", input_tensor.dtype)  # Should print int32
    # Pass the tensor as a keyword argument.
    outputs = model.signatures['serving_default'](input=input_tensor)
    keypoints = outputs['output_0'].numpy().squeeze()  # Shape: (17, 3)
    return keypoints



def extract_keypoints_from_video(model, video_path, frame_skip=5):
    cap = cv2.VideoCapture(video_path)
    keypoints_sequence = []
    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        # Inside SquatDataset.process() when iterating over frames:
        if frame_count % frame_skip == 0:
            try:
                keypoints = extract_keypoints_from_frame(model, frame)
                keypoints_sequence.append(keypoints)
            except Exception as e:
                print(f"Error processing frame {frame_count} in {video_path}: {e}")

        frame_count += 1
    cap.release()
    return keypoints_sequence


In [None]:
model = load_movenet_model()
cap = cv2.VideoCapture("/video_dataset/Correct/0928_squat_000105.mp4")
ret, frame = cap.read()
if ret:
    kps = extract_keypoints_from_frame(model, frame)
    print("Extracted keypoints shape:", kps.shape)
cap.release()


Preprocessed image dtype: int32
Input tensor dtype: <dtype: 'int32'>
Extracted keypoints shape: (17, 3)


In [None]:
# graph_utils.py
import torch
from torch_geometric.data import Data

# Define the skeleton connections (indices based on COCO order)
COCO_SKELETON = [
    (0, 1), (0, 2),
    (1, 3), (2, 4),
    (0, 5), (0, 6),
    (5, 7), (7, 9),
    (6, 8), (8, 10),
    (5, 6), (5, 11),
    (6, 12), (11, 12),
    (11, 13), (13, 15),
    (12, 14), (14, 16)
]

def build_graph_from_keypoints(keypoints_sequence):
    """
    keypoints_sequence: list of numpy arrays of shape (17,3) for each frame.
    Returns a torch_geometric.data.Data object representing the spatio-temporal graph.
    """
    num_frames = len(keypoints_sequence)
    num_keypoints = keypoints_sequence[0].shape[0]  # usually 17
    # Create node features list.
    node_features = []
    for frame in keypoints_sequence:
        # Each frame has shape (17, 3)
        node_features.append(frame)
    # Stack into (num_frames * 17, 3)
    x = torch.tensor(np.vstack(node_features), dtype=torch.float)

    edge_index_list = []

    # Build spatial edges for each frame.
    for t in range(num_frames):
        base_idx = t * num_keypoints
        for (i, j) in COCO_SKELETON:
            src = base_idx + i
            dst = base_idx + j
            # Add bidirectional edges.
            edge_index_list.append([src, dst])
            edge_index_list.append([dst, src])

    # Build temporal edges: connect the same joint between consecutive frames.
    for t in range(num_frames - 1):
        for i in range(num_keypoints):
            src = t * num_keypoints + i
            dst = (t + 1) * num_keypoints + i
            edge_index_list.append([src, dst])
            edge_index_list.append([dst, src])

    edge_index = torch.tensor(edge_index_list, dtype=torch.long).t().contiguous()

    # Create and return a Data object. (Assume label is provided later.)
    data = Data(x=x, edge_index=edge_index)
    return data




In [None]:
# squat_dataset.py
import os
import torch
from torch_geometric.data import InMemoryDataset


class SquatDataset(InMemoryDataset):
    def __init__(self, root, transform=None, pre_transform=None):
        super(SquatDataset, self).__init__(root, transform, pre_transform)
        self.data, self.slices = torch.load(self.processed_paths[0])

    @property
    def raw_file_names(self):
        # Since we are not using raw files for downloading,
        # we can simply return an empty list.
        return []

    @property
    def processed_file_names(self):
        return ['squat_dataset.pt']

    def download(self):
        # No download necessary because data is already present.
        pass

    def process(self):
      import torch
      data_list = []
      model = load_movenet_model()  # load once for all videos

      for label_name, label in [('Correct', 1), ('Incorrect', 0)]:
          folder = os.path.join(self.root, label_name)
          for file in os.listdir(folder):
              if file.endswith('.mp4'):
                  video_path = os.path.join(folder, file)
                  print(f"Processing {video_path}...")
                  cap = cv2.VideoCapture(video_path)
                  keypoints_sequence = []
                  frame_count = 0
                  while cap.isOpened():
                      ret, frame = cap.read()
                      if not ret:
                          break
                      if frame_count % 5 == 0:
                          try:
                              # Debug print for frame dtype
                              print(f"Frame {frame_count} dtype: {frame.dtype}")
                              keypoints = extract_keypoints_from_frame(model, frame)
                              keypoints_sequence.append(keypoints)
                          except Exception as e:
                              print(f"Error processing frame {frame_count} in {video_path}: {e}")
                      frame_count += 1
                  cap.release()
                  if len(keypoints_sequence) < 2:
                      continue
                  graph_data = build_graph_from_keypoints(keypoints_sequence)
                  graph_data.y = torch.tensor([label], dtype=torch.long)
                  graph_data.video_name = file
                  data_list.append(graph_data)

      data, slices = self.collate(data_list)
      torch.save((data, slices), self.processed_paths[0])



In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import TransformerConv, global_mean_pool, BatchNorm

class AdvancedGNNClassifier(nn.Module):
    def __init__(self, in_channels=3, hidden_channels=1028, num_classes=2, heads=4, dropout=0.5):
        super(AdvancedGNNClassifier, self).__init__()
        # First Transformer layer: expands features using multi-head attention.
        self.conv1 = TransformerConv(in_channels, hidden_channels, heads=heads, dropout=dropout)
        # The output dimension will be hidden_channels * heads.
        self.bn1 = BatchNorm(hidden_channels * heads)

        # Second Transformer layer.
        self.conv2 = TransformerConv(hidden_channels * heads, hidden_channels, heads=heads, dropout=dropout)
        self.bn2 = BatchNorm(hidden_channels * heads)

        # Third Transformer layer with a reduced head count to allow a skip-like connection.
        self.conv3 = TransformerConv(hidden_channels * heads, hidden_channels, heads=1, dropout=dropout)
        self.bn3 = BatchNorm(hidden_channels)

        # Final classification layer.
        self.lin = nn.Linear(hidden_channels, num_classes)
        self.dropout = dropout

    def forward(self, x, edge_index, batch):
        # First layer
        x1 = self.conv1(x, edge_index)
        x1 = self.bn1(x1)
        x1 = F.relu(x1)
        x1 = F.dropout(x1, p=self.dropout, training=self.training)

        # Second layer
        x2 = self.conv2(x1, edge_index)
        x2 = self.bn2(x2)
        x2 = F.relu(x2)
        x2 = F.dropout(x2, p=self.dropout, training=self.training)

        # Third layer (optionally you can add a residual connection from x1 or x2)
        x3 = self.conv3(x2, edge_index)
        x3 = self.bn3(x3)
        x3 = F.relu(x3)

        # Global pooling to get graph-level representation.
        x_pool = global_mean_pool(x3, batch)
        out = self.lin(x_pool)
        return out



In [None]:
# train.py
import torch
from torch_geometric.loader import DataLoader
import torch.nn.functional as F

# Load the dataset (adjust the root path as needed)
dataset = SquatDataset(root="/video_dataset")
# Shuffle and split dataset into train and test sets (80/20 split)
torch.manual_seed(42)
dataset = dataset.shuffle()
split_idx = int(0.8 * len(dataset))
train_dataset = dataset[:split_idx]
test_dataset = dataset[split_idx:]

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GNNClassifier().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

def train():
    model.train()
    total_loss = 0
    for data in train_loader:
        data = data.to(device)
        optimizer.zero_grad()
        out = model(data.x, data.edge_index, data.batch)
        loss = F.cross_entropy(out, data.y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(train_loader)

def test(loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in loader:
            data = data.to(device)
            out = model(data.x, data.edge_index, data.batch)
            pred = out.argmax(dim=1)
            correct += int((pred == data.y).sum())
            total += data.y.size(0)
    return correct / total

num_epochs = 3000
for epoch in range(1, num_epochs + 1):
    loss = train()
    train_acc = test(train_loader)
    test_acc = test(test_loader)
    print(f"Epoch {epoch:03d}: Loss {loss:.4f}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}")

# Save the trained model's state dict.
torch.save(model.state_dict(), "gnn_model.pth")


Processing...


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Preprocessed image dtype: int32
Input tensor dtype: <dtype: 'int32'>
Frame 115 dtype: uint8
Preprocessed image dtype: int32
Input tensor dtype: <dtype: 'int32'>
Frame 120 dtype: uint8
Preprocessed image dtype: int32
Input tensor dtype: <dtype: 'int32'>
Frame 125 dtype: uint8
Preprocessed image dtype: int32
Input tensor dtype: <dtype: 'int32'>
Frame 130 dtype: uint8
Preprocessed image dtype: int32
Input tensor dtype: <dtype: 'int32'>
Frame 135 dtype: uint8
Preprocessed image dtype: int32
Input tensor dtype: <dtype: 'int32'>
Frame 140 dtype: uint8
Preprocessed image dtype: int32
Input tensor dtype: <dtype: 'int32'>
Frame 145 dtype: uint8
Preprocessed image dtype: int32
Input tensor dtype: <dtype: 'int32'>
Frame 150 dtype: uint8
Preprocessed image dtype: int32
Input tensor dtype: <dtype: 'int32'>
Frame 155 dtype: uint8
Preprocessed image dtype: int32
Input tensor dtype: <dtype: 'int32'>
Frame 160 dtype: uint8
Preprocessed im

Done!
  self.data, self.slices = torch.load(self.processed_paths[0])


Epoch 001: Loss 0.6955, Train Acc: 0.4990, Test Acc: 0.5124
Epoch 002: Loss 0.6938, Train Acc: 0.5735, Test Acc: 0.5289
Epoch 003: Loss 0.6938, Train Acc: 0.5052, Test Acc: 0.5124
Epoch 004: Loss 0.6926, Train Acc: 0.5776, Test Acc: 0.5207
Epoch 005: Loss 0.6922, Train Acc: 0.5963, Test Acc: 0.5455
Epoch 006: Loss 0.6889, Train Acc: 0.4865, Test Acc: 0.4628
Epoch 007: Loss 0.6905, Train Acc: 0.6025, Test Acc: 0.5950
Epoch 008: Loss 0.6875, Train Acc: 0.5300, Test Acc: 0.5124
Epoch 009: Loss 0.6868, Train Acc: 0.5259, Test Acc: 0.5702
Epoch 010: Loss 0.6858, Train Acc: 0.5072, Test Acc: 0.5289
Epoch 011: Loss 0.6848, Train Acc: 0.5797, Test Acc: 0.5372
Epoch 012: Loss 0.6872, Train Acc: 0.5631, Test Acc: 0.5702
Epoch 013: Loss 0.6852, Train Acc: 0.5052, Test Acc: 0.5455
Epoch 014: Loss 0.6859, Train Acc: 0.5921, Test Acc: 0.6116
Epoch 015: Loss 0.6827, Train Acc: 0.6066, Test Acc: 0.5620
Epoch 016: Loss 0.6828, Train Acc: 0.5880, Test Acc: 0.5455
Epoch 017: Loss 0.6839, Train Acc: 0.604

In [None]:
def augment_data(data, noise_std=0.02):
    # Add small Gaussian noise to keypoint coordinates (first two dimensions)
    noise = noise_std * torch.randn_like(data.x[:, :2])
    data.x[:, :2] += noise
    return data


In [None]:
import torch
from torch_geometric.loader import DataLoader
import torch.nn.functional as F
import torch.optim as optim

# Load the preprocessed dataset directly.
# Make sure that the 'root' here is the parent directory containing the 'processed' folder.
dataset = SquatDataset(root="/video_dataset")
print(f"Loaded {len(dataset)} graphs from the preprocessed dataset.")

# Shuffle and split the dataset (80% training, 20% testing).
torch.manual_seed(42)
dataset = dataset.shuffle()
split_idx = int(0.8 * len(dataset))
train_dataset = dataset[:split_idx]
test_dataset = dataset[split_idx:]
print(f"Training on {len(train_dataset)} graphs and testing on {len(test_dataset)} graphs.")

# Create DataLoaders.
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

# Initialize the improved model.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ImprovedGNNClassifier(in_channels=3, hidden_channels=128, num_classes=2, dropout=0.5).to(device)

# Define the optimizer.
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=5e-4)

# Training loop.
def train():
    model.train()
    total_loss = 0
    for data in train_loader:
        data = data.to(device)
        # Apply on-the-fly augmentation
        data = augment_data(data, noise_std=0.02)
        optimizer.zero_grad()
        out = model(data.x, data.edge_index, data.batch)
        loss = F.cross_entropy(out, data.y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(train_loader)

# Testing loop.
def test(loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in loader:
            data = data.to(device)
            out = model(data.x, data.edge_index, data.batch)
            pred = out.argmax(dim=1)
            correct += int((pred == data.y).sum())
            total += data.y.size(0)
    return correct / total

# Train for a set number of epochs.
num_epochs = 100
for epoch in range(1, num_epochs + 1):
    loss = train()
    train_acc = test(train_loader)
    test_acc = test(test_loader)
    print(f"Epoch {epoch:03d}: Loss {loss:.4f}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}")

# Save the trained model.
torch.save(model.state_dict(), "improved_gnn_model_from_pt.pth")


  self.data, self.slices = torch.load(self.processed_paths[0])


Loaded 604 graphs from the preprocessed dataset.
Training on 483 graphs and testing on 121 graphs.
Epoch 001: Loss 0.6987, Train Acc: 0.5983, Test Acc: 0.5702
Epoch 002: Loss 0.6817, Train Acc: 0.6087, Test Acc: 0.6198
Epoch 003: Loss 0.6830, Train Acc: 0.6128, Test Acc: 0.6116
Epoch 004: Loss 0.6832, Train Acc: 0.6128, Test Acc: 0.5537
Epoch 005: Loss 0.6806, Train Acc: 0.6087, Test Acc: 0.6281
Epoch 006: Loss 0.6825, Train Acc: 0.6253, Test Acc: 0.6529
Epoch 007: Loss 0.6784, Train Acc: 0.6273, Test Acc: 0.5702
Epoch 008: Loss 0.6793, Train Acc: 0.6398, Test Acc: 0.6116
Epoch 009: Loss 0.6776, Train Acc: 0.6584, Test Acc: 0.6116
Epoch 010: Loss 0.6736, Train Acc: 0.6418, Test Acc: 0.6529
Epoch 011: Loss 0.6890, Train Acc: 0.6232, Test Acc: 0.6198
Epoch 012: Loss 0.6754, Train Acc: 0.6522, Test Acc: 0.6198
Epoch 013: Loss 0.6720, Train Acc: 0.6170, Test Acc: 0.6033
Epoch 014: Loss 0.6747, Train Acc: 0.6398, Test Acc: 0.5950
Epoch 015: Loss 0.6679, Train Acc: 0.6460, Test Acc: 0.6529
E

In [None]:
from google.colab import files
files.download("improved_gnn_model_from_pt.pth")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>