In [55]:
import torch
from torch.utils.data import Dataset
import torch.nn as nn
from torch.optim.lr_scheduler import ReduceLROnPlateau
from sklearn.metrics import f1_score
import torch.nn.functional as F
import json
from glob import glob
from tqdm import tqdm
from torch.nn.utils.rnn import pad_sequence
class SquatRepDataset(Dataset):
    def __init__(self, data_list, label_list):
        self.data_list = data_list  # list of dicts like the one you provided
        self.label_list = label_list

    def __len__(self):
        return len(self.data_list)

    def __getitem__(self, idx):
        data = self.data_list[idx]
        sequence = torch.tensor([
            data["knee_angle"],
            data["torso_angle"],
            data["hip_angle"],
            data["symmetry_score"],
            data["alignment_score"],
            data["head_angle"],
            data["inter_thigh_angle"],
            data["heel_angle"],
            data["back_angle"]
        ], dtype=torch.float).T  # shape: [seq_len, num_features]

        label = torch.tensor(self.label_list[idx], dtype=torch.long)
        return sequence, label


In [56]:
class PureConvClassifier(nn.Module):
    def __init__(self, in_ch=9, conv_ch=[64,128,256], kernel=3, num_classes=6):
        super().__init__()
        layers = []
        prev = in_ch
        for ch in conv_ch:
            layers += [
                nn.Conv1d(prev, ch, kernel, padding=kernel//2),
                nn.BatchNorm1d(ch),
                nn.ReLU(),
                nn.MaxPool1d(2)
            ]
            prev = ch
        self.net = nn.Sequential(*layers)
        self.global_pool = nn.AdaptiveMaxPool1d(1)
        self.fc1 = nn.Linear(prev, prev//2)
        self.fc2 = nn.Linear(prev//2, num_classes)
        
    def forward(self, x_packed):
        x, lengths = nn.utils.rnn.pad_packed_sequence(x_packed, batch_first=True)
        x = x.transpose(1,2)              
        x = self.net(x)                   
        x = self.global_pool(x).squeeze(2) 
        x = F.relu(self.fc1(x))
        return self.fc2(x)              


In [57]:
# Example data prep


def squat_collate_fn(batch):
    sequences, labels = zip(*batch)
    lengths = torch.tensor([seq.shape[0] for seq in sequences])  
    padded_sequences = pad_sequence(sequences, batch_first=True)
    labels = torch.stack(labels)
    return padded_sequences, lengths, labels



bad_data_list=glob('temp_data/*')
future = (bad_data_list)
bad_list_new_list=[]
labels_list=[]
for i in range(len(bad_data_list)):
    bad_list_new_list.extend(glob(bad_data_list[i]+'/*.json'))
    one_hot=[0]*6
    one_hot[i]=1
    # print(one_hot)
    for i in range(len(glob(bad_data_list[i]+'/*.json'))):
        labels_list.append(one_hot)
    # break
# print(labels_list[:2])

bad_data_list_new=[json.load(open(file)) for file in bad_list_new_list]
# print(bad_list_new_list[0])
with open(bad_list_new_list[0]) as f:
    data = json.load(f)
# print(data[1])
bad_data_list_new_actual=[]
label_list_actual=[]
for i in bad_data_list_new:
    for k in range(len(i)):
        label_list_actual.append(labels_list[bad_data_list_new.index(i)])
    for j in i:
        bad_data_list_new_actual.append(j)
bad_data_list=bad_data_list_new_actual
bad_label_list=label_list_actual
# print(data_list[0])
# print(bad_data_list[0])
# print(bad_label_list[0])

device = "cuda" if torch.cuda.is_available() else "cpu"
    
# data_list = [your_json_dict]  # can add more samples here
# bad_label_list = [0]*len(bad_data_list_new)  # class index for this sample

from torch.utils.data import random_split, DataLoader

dataset = SquatRepDataset(bad_data_list, bad_label_list)

train_size = int(0.8 * len(dataset))
val_size   = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(
    train_dataset,
    batch_size=8,
    shuffle=True,
    collate_fn=squat_collate_fn
)
val_loader = DataLoader(
    val_dataset,
    batch_size=8,
    shuffle=False,
    collate_fn=squat_collate_fn
)




In [58]:


model = PureConvClassifier().to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
criterion = nn.BCEWithLogitsLoss()

scheduler = ReduceLROnPlateau(
    optimizer,
    mode='min',
    factor=0.5,
    patience=5,
    verbose=True
)

best_val_loss = float('inf')

num_epochs = 100
for epoch in range(1, num_epochs+1):
    model.train()
    train_loss = 0.0
    for X_batch, lengths, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device).float()
        optimizer.zero_grad()

        packed  = nn.utils.rnn.pack_padded_sequence(
                      X_batch, lengths, batch_first=True, enforce_sorted=False
                  )
        logits  = model(packed)
        loss    = criterion(logits, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    train_loss /= len(train_loader)

    model.eval()
    val_loss = 0.0

    all_preds = []
    all_targets = []

    with torch.no_grad():
        for X_batch, lengths, y_batch in val_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device).float()

            packed = nn.utils.rnn.pack_padded_sequence(
                         X_batch, lengths,
                         batch_first=True,
                         enforce_sorted=False
                     )
            logits = model(packed)
            loss   = criterion(logits, y_batch)
            val_loss += loss.item()

            probs = torch.sigmoid(logits)
            preds = (probs >= 0.5).long()
            all_preds.append(preds.cpu())
            all_targets.append(y_batch.cpu().long())

    val_loss /= len(val_loader)

    all_preds   = torch.cat(all_preds, dim=0).numpy()
    all_targets = torch.cat(all_targets, dim=0).numpy()
    val_f1 = f1_score(all_targets, all_preds, average='macro')

    print(
        f"Epoch {epoch:3d} | "
        f"Train Loss: {train_loss:.4f} | "
        f"Val Loss: {val_loss:.4f} | "
        f"Val F1: {val_f1:.3f}"
    )

    scheduler.step(val_loss)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'best_model.pth')
        print(f"  ↳ New best model saved (val_loss = {val_loss:.4f})\n")


Epoch   1 | Train Loss: 0.4379 | Val Loss: 0.3972 | Val F1: 0.194
  ↳ New best model saved (val_loss = 0.3972)

Epoch   2 | Train Loss: 0.4080 | Val Loss: 0.3690 | Val F1: 0.238
  ↳ New best model saved (val_loss = 0.3690)

Epoch   3 | Train Loss: 0.3633 | Val Loss: 0.3901 | Val F1: 0.347
Epoch   4 | Train Loss: 0.3441 | Val Loss: 0.3038 | Val F1: 0.420
  ↳ New best model saved (val_loss = 0.3038)

Epoch   5 | Train Loss: 0.3046 | Val Loss: 0.3277 | Val F1: 0.370
Epoch   6 | Train Loss: 0.2958 | Val Loss: 0.3386 | Val F1: 0.461
Epoch   7 | Train Loss: 0.2703 | Val Loss: 0.2643 | Val F1: 0.605
  ↳ New best model saved (val_loss = 0.2643)

Epoch   8 | Train Loss: 0.2197 | Val Loss: 0.2571 | Val F1: 0.615
  ↳ New best model saved (val_loss = 0.2571)

Epoch   9 | Train Loss: 0.2120 | Val Loss: 0.2910 | Val F1: 0.605
Epoch  10 | Train Loss: 0.2140 | Val Loss: 0.2550 | Val F1: 0.648
  ↳ New best model saved (val_loss = 0.2550)

Epoch  11 | Train Loss: 0.1492 | Val Loss: 0.2275 | Val F1: 0.71

In [61]:
pedofile = json.load(open("rep_metrics.json"))
actual_list = []
for j in pedofile:
    actual_list.append(j)
pedo_input = [torch.tensor([
            data["knee_angle"],
            data["torso_angle"],
            data["hip_angle"],
            data["symmetry_score"],
            data["alignment_score"],
            data["head_angle"],
            data["inter_thigh_angle"],
            data["heel_angle"],
            data["back_angle"]
        ], dtype=torch.float).T  
        for data in actual_list]

In [62]:
model.eval()
with torch.no_grad():
    for seq in pedo_input:
        seq = seq.to(device)

        seq = seq.unsqueeze(0)

        lengths = [seq.size(1)]    

        packed = nn.utils.rnn.pack_padded_sequence(
            seq,
            lengths,
            batch_first=True,
            enforce_sorted=False
        )

        logits = model(packed)     
        probs  = torch.sigmoid(logits)
        preds  = (probs >= 0.5).long()
        print(torch.argmax(probs))
        print(f"preds: {preds.squeeze(0)}")
print(future)


tensor(4, device='cuda:0')
preds: tensor([0, 0, 0, 0, 1, 0], device='cuda:0')
tensor(4, device='cuda:0')
preds: tensor([0, 0, 0, 0, 1, 0], device='cuda:0')
tensor(4, device='cuda:0')
preds: tensor([0, 0, 0, 0, 1, 0], device='cuda:0')
tensor(4, device='cuda:0')
preds: tensor([0, 0, 0, 0, 1, 0], device='cuda:0')
tensor(3, device='cuda:0')
preds: tensor([0, 0, 0, 1, 0, 0], device='cuda:0')
['temp_data/bad_inner_thigh', 'temp_data/bad_shallow', 'temp_data/good', 'temp_data/bad_head', 'temp_data/bad_back_warp', 'temp_data/bad_toe']
