In [None]:
% pip install torch torchvision torchaudio

UsageError: Line magic function `%` not found.


In [None]:
# LIBRARIES

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import json
from sklearn.metrics import classification_report

In [None]:
# CONSTANTS

NUM_KEYPOINTS = 33  # MediaPipe's gives 33 landmakrs
FEATURES_PER_KEYPOINT = 2  # Only using (x, y) coordinates for now
SEQUENCE_LENGTH = 16  # Number of frames analyzed at a time
NUM_CLASSES = 3  # "eccentric", "concentric", "none"
LABEL_MAP = {"none": 0, "eccentric": 1, "concentric": 2}
INPUT_SIZE = NUM_KEYPOINTS * FEATURES_PER_KEYPOINT  # 66


In [None]:
class RepStageDataset(Dataset):
    def __init__(self, npy_path, json_path, sequence_length=16):
        self.pose_data = np.load(npy_path)[:, :66]  # (x, y) only
        with open(json_path) as f:
            labels = json.load(f)["rep_stages"]
        self.labels = [LABEL_MAP[l] for l in labels]
        self.seq_len = sequence_length
        assert len(self.pose_data) == len(self.labels)

    def __len__(self):
        return len(self.pose_data) - self.seq_len

    def __getitem__(self, idx):
        x = self.pose_data[idx : idx + self.seq_len]
        y = self.labels[idx + self.seq_len // 2]  # label for center frame
        return torch.tensor(x, dtype=torch.float32), torch.tensor(y)


In [16]:
import numpy as np
import json

pose_data = np.load("../data/pose_np/sldl_israel_npy.npy")
with open("../data/label_data/sldl_israel_data.json") as f:
    labels = json.load(f)["rep_stages"]

print("Pose frames:", len(pose_data))
print("Labels:", len(labels))


Pose frames: 3322
Labels: 3322


In [17]:
dataset = RepStageDataset(
    npy_path="../data/pose_np/sldl_israel_npy.npy",
    json_path="../data/label_data/sldl_israel_data.json",
    sequence_length=SEQUENCE_LENGTH
)

loader = DataLoader(dataset, batch_size=32, shuffle=True)

In [18]:
# LSTM-based Rep Stage Classifier
# This model takes a short sequence of pose keypoints (e.g., 16 frames)
# and predicts the current stage of the exercise: eccentric or concentric.

class RepStageClassifier(nn.Module):
    def __init__(self, input_size, hidden_size=128, num_classes=NUM_CLASSES):
        super(RepStageClassifier, self).__init__()
        
        # LSTM processes the input sequence of keypoints over time
        # input_size: features per frame (e.g., 66 for 33 keypoints × 2D)
        # hidden_size: size of LSTM's internal memory (can be tuned)
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        
        # Final linear layer maps LSTM output to class scores (eccentric/concentric)
        self.fc = nn.Linear(hidden_size, num_classes)
    
    def forward(self, x):
        # Input x shape: (batch_size, sequence_length, input_size)
        
        # LSTM returns:
        # - out: LSTM outputs for each time step
        # - _: (hidden_state, cell_state), not used here
        out, _ = self.lstm(x)
        
        # We use the output from the last time step (i.e., the last frame in the sequence)
        # to make the final prediction
        out = self.fc(out[:, -1, :])  # shape: (batch_size, num_classes)
        return out

# Instantiate the model with the correct input size
model = RepStageClassifier(INPUT_SIZE)


In [19]:
# Define loss function — CrossEntropyLoss good for classification
criterion = nn.CrossEntropyLoss()

# Define optimizer — Adam widely used for sequence models like LSTM
optimizer = optim.Adam(model.parameters(), lr=0.001)


In [20]:
# TRAINING LOOP
EPOCHS = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = RepStageClassifier(INPUT_SIZE).to(device)

for epoch in range(EPOCHS):
    model.train()
    total_loss, correct, total = 0, 0, 0

    for x, y in loader:
        x, y = x.to(device), y.to(device)

        optimizer.zero_grad()
        outputs = model(x)
        loss = criterion(outputs, y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        correct += (outputs.argmax(dim=1) == y).sum().item()
        total += y.size(0)

    print(f"Epoch {epoch+1} | Loss: {total_loss:.4f} | Accuracy: {correct / total:.2%}")


Epoch 1 | Loss: 121.9628 | Accuracy: 13.67%
Epoch 2 | Loss: 121.9852 | Accuracy: 13.67%
Epoch 3 | Loss: 121.9658 | Accuracy: 13.67%
Epoch 4 | Loss: 122.0047 | Accuracy: 13.67%
Epoch 5 | Loss: 121.9658 | Accuracy: 13.67%
Epoch 6 | Loss: 121.9720 | Accuracy: 13.67%
Epoch 7 | Loss: 121.9687 | Accuracy: 13.67%
Epoch 8 | Loss: 121.9764 | Accuracy: 13.67%
Epoch 9 | Loss: 121.9891 | Accuracy: 13.67%
Epoch 10 | Loss: 121.9535 | Accuracy: 13.67%


In [None]:
# After training, evaluate
all_preds, all_labels = [], []

model.eval()
with torch.no_grad():
    for x, y in loader:
        x, y = x.to(device), y.to(device)
        preds = model(x).argmax(dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(y.cpu().numpy())

print(classification_report(all_labels, all_preds, target_names=["none", "eccentric", "concentric"]))


              precision    recall  f1-score   support

        none       0.00      0.00      0.00      2379
   eccentric       0.14      1.00      0.24       452
  concentric       0.00      0.00      0.00       475

    accuracy                           0.14      3306
   macro avg       0.05      0.33      0.08      3306
weighted avg       0.02      0.14      0.03      3306



  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
  _warn_prf(average, modifier, f"{metric.capitalize()} is", result.shape[0])
