## Model Definition

### Res Block

In [23]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import os


class ResBlock(nn.Module):
    """
    A single residual layer with three Conv-BN-ReLU stacks and a skip connection.
    """
    def __init__(self, channels=256, kernel_size=3, stride=1, downsample=False):
        super(ResBlock, self).__init__()

        new_stride = 2 if downsample else 1

        self.conv1 = nn.Conv1d(channels, channels,
                               kernel_size=kernel_size,
                               stride=new_stride,
                               padding=kernel_size // 2)
        self.bn1 = nn.BatchNorm1d(channels)
        self.relu = nn.ReLU(inplace=True)

        self.conv2 = nn.Conv1d(channels, channels,
                               kernel_size=kernel_size,
                               stride=1,
                               padding=kernel_size // 2)
        self.bn2 = nn.BatchNorm1d(channels)

        self.conv3 = nn.Conv1d(channels, channels,
                               kernel_size=kernel_size,
                               stride=1,
                               padding=kernel_size // 2)
        self.bn3 = nn.BatchNorm1d(channels)

        if downsample:
          self.skip = nn.Sequential(
            nn.Conv1d(channels, channels, kernel_size=1, stride=2, padding=0),
            nn.BatchNorm1d(channels)
          )
        else:
          self.skip = nn.Identity()

    def forward(self, x):
        identity = self.skip(x)

        # First conv + BN + ReLU
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        # Second conv + BN + ReLU
        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        # Third conv + BN + ReLU
        out = self.conv3(out)
        out = self.bn3(out)
        out = self.relu(out)

        # Add skip connection
        out = out + identity

        out = self.relu(out)
        return out

In [28]:
class DilatedResBlock(nn.Module):
    def __init__(self, channels, kernel_size=3, dilation=1):
        super().__init__()
        pad = ((kernel_size - 1) * dilation) // 2
        # First dilated conv
        self.conv1 = nn.Conv1d(channels, channels,
                               kernel_size=kernel_size,
                               dilation=dilation,
                               padding=pad)
        self.bn1   = nn.BatchNorm1d(channels)
        self.relu1 = nn.ReLU(inplace=True)
        # Second dilated conv (same params)
        self.conv2 = nn.Conv1d(channels, channels,
                               kernel_size=kernel_size,
                               dilation=dilation,
                               padding=pad)
        self.bn2   = nn.BatchNorm1d(channels)
        # Final activation
        self.relu2 = nn.ReLU(inplace=True)
        self.dropout = nn.Dropout(p=0.2)
    def forward(self, x):
        identity = x
        out = self.relu1(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out = self.dropout(out)
        out = out + identity
        return self.relu2(out)

### Full Network

In [29]:
class SquatResNet(nn.Module):
    """
    Implements the network from your figure (left):
      1) conv(3, 2, 256) + BN + ReLU
      2) 4 x ResBlock (the structure on the right)
      3) AdaptiveAvgPool1d(1)
      4) conv(1, 2, 256) + BN
      5) Final output dimension is (batch_size, 256, 1) in the time axis
         (assuming the stride=2 doesn't collapse it completely).

    If you need a classification output (e.g. good vs. bad squat),
    you can add a final linear layer or conv layer to produce 2 logits.
    """
    def __init__(self,
                 input_channels=528,   # e.g., 3 if each "time step" has 3 features
                 kernel_size=3,
                 stride=2,
                 base_channels=128,
                 num_res_layers=4):
        super(SquatResNet, self).__init__()

        # 1) Initial Conv(3, 2, 256):
        #    kernel_size=3, stride=2, out_channels=256
        #    We'll pad = kernel_size//2 to maintain shape in convolution
        self.conv1 = nn.Conv1d(input_channels, base_channels,
                               kernel_size=kernel_size,
                              #  stride=stride,
                               stride=1,
                              #  padding=kernel_size // 2
                               padding=1)
        self.bn1 = nn.BatchNorm1d(base_channels)
        self.relu = nn.ReLU(inplace=True)
        self.dropout = nn.Dropout(p=0.4)

        # 2) 4 Residual Layers
        layers = []
        for i in range(num_res_layers//2):
            down = (i % 2 == 0)
            # down = True
            layers.append(ResBlock(channels=base_channels, kernel_size=kernel_size, stride=1, downsample=down))
            # if (i % 2 == 0):
            #   layers.append(nn.Dropout(p=0.3))

        for i in range(2, 4):
            layers.append(DilatedResBlock(channels=base_channels, kernel_size=kernel_size, dilation=i))

        # layers.append(nn.Dropout(p=0.3))
        self.res_layers = nn.Sequential(*layers)

        # 3) Adaptive Average Pool -> output size = 1 in the time dimension
        self.avgpool = nn.AdaptiveAvgPool1d(1)

        # Optional: final classifier
        # For a 2-class problem, you can add a linear layer or another conv:
        self.fc = nn.Linear(base_channels, 2)
        # or a final conv1d with out_channels=2, kernel_size=1, stride=1, etc.

    def forward(self, x):
        """
        x shape: (batch_size, input_channels, time_length)
        """
        # 1) Initial conv + BN + ReLU
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        # x = self.dropout(x)

        # 2) Residual layers
        x = self.res_layers(x)
        x = self.dropout(x)

        # 3) Adaptive average pooling -> (batch_size, base_channels, 1)
        x = self.avgpool(x)

        # You might want a final classifier. For example:
        x = F.relu(x)
        x = x.squeeze(-1)               # (batch_size, base_channels)
        x = self.fc(x)            # (batch_size, 2)

        return x


## Training

In [5]:
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
from sklearn.model_selection import train_test_split

class SquatDataset(Dataset):
    def __init__(self, mats, labels, max_frames=100):
        self.tensors = []
        for mat in mats:
          if isinstance(mat, np.ndarray) and mat.dtype == np.object_:
            mat = np.array(mat.tolist(), dtype=np.float32)  # convert from object array to regular float array
          else:
            mat = np.array(mat, dtype=np.float32)
          t = np.linspace(0, 1, mat.shape[1])
          mat = np.vstack([mat, t])
          mat = torch.tensor(mat, dtype=torch.float32)
          mat = (mat - mat.mean(dim=1, keepdim=True)) / (mat.std(dim=1, keepdim=True) + 1e-6)
          self.tensors.append(mat)

        self.label_numbers = torch.tensor(labels, dtype=torch.long)
        self.max_frames = max_frames

    def __len__(self):
        return len(self.tensors)

    def __getitem__(self, idx):
        mat = self.tensors[idx]
        label_nums = self.label_numbers[idx]

        return mat, label_nums

### Set Up Training

In [6]:
mats = np.load("/content/mats_small.npy", allow_pickle=True)
labels = np.load("/content/labels_small.npy")

cnt = 0
for i in range(len(labels)):
  if labels[i] == 1:
    cnt+=1

print(cnt)

265


In [12]:
# dataset = SquatDataset(mats, labels)
# train_dataset, val_dataset, test_dataset = random_split(dataset, [0.8, 0.1, 0.1], generator=torch.Generator().manual_seed(42))
train_mats, val_mats, train_labels, val_labels = train_test_split(
    mats, labels, test_size=0.2, stratify=labels, random_state=42
)

# Then wrap each in your Dataset
train_dataset = SquatDataset(train_mats, train_labels)
val_dataset   = SquatDataset(val_mats,   val_labels)

train_dataloader = DataLoader(train_dataset, batch_size=256, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=256, shuffle=False)
# test_dataloader = DataLoader(test_dataset, batch_size=256, shuffle=False)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("device:", device)
best_val_loss = 100.0

device: cuda


### Simple Baseline Model

In [9]:
class SimpleResBlock(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.conv1 = nn.Conv1d(channels, channels, 3, padding=1)
        self.bn1   = nn.BatchNorm1d(channels)
        self.conv2 = nn.Conv1d(channels, channels, 3, padding=1)
        self.bn2   = nn.BatchNorm1d(channels)
    def forward(self, x):
        identity = x
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        return F.relu(out + identity)

class SmallCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv1d(528, 256, kernel_size=3, padding=1)
        self.bn1   = nn.BatchNorm1d(256)
        self.conv2 = nn.Conv1d(256, 256, kernel_size=3, padding=1)
        self.bn2   = nn.BatchNorm1d(256)
        self.conv3 = nn.Conv1d(256, 256, kernel_size=3, padding=1)
        self.bn3   = nn.BatchNorm1d(256)
        self.pool  = nn.AdaptiveAvgPool1d(1)
        self.fc    = nn.Linear(256, 2)
        self.dropout = nn.Dropout(p=0.3)
    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn2(self.conv3(x)))
        x = self.pool(x).squeeze(-1)
        x = self.dropout(x)
        return self.fc(x)

In [62]:
class SquatHybrid(nn.Module):
    def __init__(self, input_channels=528, base_channels=128, num_res_layers=4):
        super().__init__()
        # 1) Initial conv + ResLayers (same as yours)
        self.conv1 = nn.Conv1d(input_channels, base_channels, 3, padding=1)
        self.bn1   = nn.BatchNorm1d(base_channels)
        self.relu  = nn.ReLU(inplace=True)
        self.dropout = nn.Dropout(0.3)
        layers = []
        for i in range(num_res_layers//2):
            down = (i % 2 == 0)
            # down = True
            layers.append(ResBlock(channels=base_channels, kernel_size=3, stride=1, downsample=down))
            # if (i % 2 == 0):
            #   layers.append(nn.Dropout(p=0.3))

        for i in range(2, 4):
            layers.append(DilatedResBlock(channels=base_channels, kernel_size=3, dilation=i))

        # layers.append(nn.Dropout(p=0.3))
        self.res_layers = nn.Sequential(*layers)
        # 2) Sequence model: pick one
        # a) Bi-LSTM
        # self.lstm = nn.LSTM(base_channels, base_channels, batch_first=True,
        #                     bidirectional=True, num_layers=1)
        # self.fc   = nn.Linear(base_channels*2, 2)
        # OR b) Transformer Encoder
        encoder_layer = nn.TransformerEncoderLayer(d_model=base_channels, nhead=4, batch_first=True, dropout=0.5)
        self.transf = nn.TransformerEncoder(encoder_layer, num_layers=2)
        self.fc    = nn.Linear(base_channels, 2)

    def forward(self, x):
        # x: (B,528,T)
        x = self.relu(self.bn1(self.conv1(x)))        # (B, C, T)
        x = self.res_layers(x)                                # (B, C, T)
        x = x.permute(0,2,1)                           # (B, T, C) for seq models

        # a) LSTM path
        # out, _ = self.lstm(x)                          # (B, T, 2C)
        # out     = out.mean(1)                          # average over time → (B, 2C)

        # b) Transformer path (uncomment if using)
        # out = x.permute(1,0,2)  # (T,B,C)
        x = self.transf(x)  # (T,B,C)
        x = x.mean(dim=1)       # → (B,C)

        x = self.dropout(x)
        return self.fc(x)


### Training Cycle

In [65]:
model = SquatResNet(input_channels=529).to(device) # Define model
# model = SquatHybrid(input_channels=529).to(device)

counts = np.bincount(train_labels)           # e.g. [800, 200]
weights = 1.0 / counts                   # inverse frequency
class_weights = weights / weights.sum()  # normalize
NUM_EPOCHS = 20

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
loss_fn = nn.CrossEntropyLoss(label_smoothing=0.1, weight=torch.tensor(class_weights, dtype=torch.float32).to(device))
scheduler = torch.optim.lr_scheduler.OneCycleLR(
    optimizer,
    max_lr=2e-3,
    epochs=NUM_EPOCHS,
    steps_per_epoch=len(train_dataloader),
    pct_start=0.3
)

In [66]:
# ***If model already been trained, load model)
# if os.path.exists("/content/model.pth"):
#   model.load_state_dict(torch.load("/content/model.pth", map_location=device))
# else:
#   model = SquatResNet().to(device) # Define model
# Train the model
for epoch in range(NUM_EPOCHS):
    model.train()
    train_loss = 0.0
    for inputs, targets in train_dataloader:
        inputs = inputs.to(device)
        targets = targets.to(device)

        assert targets.dtype == torch.long
        assert targets.max() <= 1
        assert targets.min() >= 0

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_fn(outputs, targets)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()
        train_loss += loss.item()

    # Validate the model
    model.eval()
    with torch.no_grad():
        val_loss = 0.0
        correct, total = 0, 0
        for inputs, targets in val_dataloader:
            inputs = inputs.to(device)
            targets = targets.to(device)

            assert targets.dtype == torch.long
            assert targets.max() <= 1
            assert targets.min() >= 0

            outputs = model(inputs)
            predictions = torch.argmax(outputs, dim=1)
            correct += (predictions.cpu() == targets.cpu()).sum().item()
            total += targets.size(0)
            loss = loss_fn(outputs, targets)
            val_loss += loss.item()

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), "/content/model.pth")
            print("Best loss:", best_val_loss)
        # Test the model
        test_loss = 0.0
        # for inputs, targets in test_dataloader:
        #     inputs = inputs.to(device)
        #     targets = targets.to(device)
        #     outputs = model(inputs)
        #     loss = loss_fn(outputs, targets)
        #     test_loss += loss.item()
    val_acc = correct / total

    # if (epoch+1) % 10 == 0:
    # print(f"Epoch {epoch}, Train Loss: {train_loss / len(train_dataloader)}, Val Loss: {val_loss / len(val_dataloader)}, Test Loss: {test_loss / len(test_dataloader)}")
    print(f"Epoch {epoch}, Train Loss: {train_loss / len(train_dataloader)}, Val Loss: {val_loss / len(val_dataloader)}, Val Accuracy: {val_acc}")

# ***Save model****
# torch.save(model.state_dict(), "/content/model.pth")


Epoch 0, Train Loss: 0.8987034857273102, Val Loss: 0.6981440782546997, Val Accuracy: 0.5
Epoch 1, Train Loss: 0.7715025246143341, Val Loss: 0.6989044547080994, Val Accuracy: 0.5
Epoch 2, Train Loss: 0.5904654860496521, Val Loss: 0.6937092542648315, Val Accuracy: 0.49056603773584906
Epoch 3, Train Loss: 0.5616771876811981, Val Loss: 0.6948764324188232, Val Accuracy: 0.4811320754716981
Epoch 4, Train Loss: 0.434390589594841, Val Loss: 0.6957876682281494, Val Accuracy: 0.5283018867924528
Epoch 5, Train Loss: 0.5597544014453888, Val Loss: 0.6923713684082031, Val Accuracy: 0.6037735849056604
Epoch 6, Train Loss: 0.43800026178359985, Val Loss: 0.8162083625793457, Val Accuracy: 0.5283018867924528
Epoch 7, Train Loss: 0.4013144075870514, Val Loss: 0.7959104776382446, Val Accuracy: 0.5188679245283019
Epoch 8, Train Loss: 0.31558799743652344, Val Loss: 0.8141824007034302, Val Accuracy: 0.5283018867924528
Epoch 9, Train Loss: 0.30789582431316376, Val Loss: 0.7974138259887695, Val Accuracy: 0.5660

## Inference

In [None]:
import torch
import torch.nn.functional as F
import numpy as np

# Load trained model weights (skip this if model was just trained in this session)
# torch.save(model.state_dict(), "squat_model.pth")  <-- save after training (one-time)
model = SquatResNet().to(device)
model.load_state_dict(torch.load("/content/model.pth", map_location=device))  # make sure model is saved first
model.eval()

# Load the new pose matrix (.npy file) for inference
pose_matrix = np.load("pose_matrix.npy")  # shape: (528, T)
tensor = torch.tensor(pose_matrix, dtype=torch.float32).unsqueeze(0).to(device)  # shape: (1, 528, T)

# Run the model to get prediction
with torch.no_grad():
    logits = model(tensor)  # shape: (1, 2)
    probs = F.softmax(logits, dim=1)  # convert logits to probabilities
    pred = torch.argmax(probs, dim=1).item()  # get predicted class (0 or 1)
    conf = probs[0][pred].item()  # get confidence of the predicted class

# Map prediction to label
label_map = {0: "Bad Squat", 1: "Good Squat"}
print(f"✅ Your squat is {conf * 100:.2f}% {label_map[pred].lower()}.")
