In [2]:
from ultralytics import YOLO
import torch
import torch.nn as nn
import sys
sys.path.append("../Scrpits/")
from yololstm import YOLO_LSTM
from dataloader import ActionLoad
from torch.utils.data import DataLoader
import os

In [3]:
from sklearn.model_selection import train_test_split
import pandas as pd
samples_df = pd.read_csv('../Dataset_2/EduAction-A_8Frames.csv')

train, test = train_test_split(samples_df, test_size=0.2, random_state=42, shuffle=True,stratify=samples_df["class"])
train, val = train_test_split(train, test_size=0.2, random_state=42, stratify=train["class"])


train_dataset = ActionLoad(root_dir="../Dataset_2", df_path=train)
val_dataset = ActionLoad(root_dir="../Dataset_2", df_path=val)
test_dataset = ActionLoad(root_dir="../Dataset_2", df_path=test)

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=True)

In [10]:
# x = next(iter(train_dataloader))
# x[0].shape, x[1].shape

(torch.Size([32, 8, 3, 640, 640]), torch.Size([32, 1]))

In [15]:
from collections import Counter
label_counts = Counter(samples_df["class"])
num_classes = 7
total = sum(label_counts.values())
class_weights = [total / (num_classes * label_counts[i]) for i in label_counts.keys()]

In [16]:
from ultralytics import YOLO
from torchinfo import summary
yolo_model = YOLO('../Models/yoloactiondata2.pt') 

backbone = yolo_model.model.model[:10]  # nn.ModuleList
model = YOLO_LSTM(yolo_backbone=backbone, hidden_size = 512, num_classes=7)
# Verify by printing model info
summary(model, (1, 8, 3, 640, 640), device='cpu')

YOLO backbone trainable params: 0
YOLO backbone extracted. Its last layer is: SPPF(
  (cv1): Conv(
    (conv): Conv2d(256, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (bn): BatchNorm2d(128, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
    (act): SiLU(inplace=True)
  )
  (cv2): Conv(
    (conv): Conv2d(512, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (bn): BatchNorm2d(256, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
    (act): SiLU(inplace=True)
  )
  (m): MaxPool2d(kernel_size=5, stride=1, padding=2, dilation=1, ceil_mode=False)
)
DEBUG: Shape of dummy_backbone_output: torch.Size([1, 256, 20, 20])
Dynamically determined YOLO backbone feature shape: (256, 20, 20)
Calculated LSTM input dimension: 256 (C*H*W)
LSTM initialized with hidden_size=512, num_layers=2
Fully connected layer initialized with 512 -> 7 outputs


Layer (type:depth-idx)                                       Output Shape              Param #
YOLO_LSTM                                                    [1, 7]                    --
├─Sequential: 1-1                                            [1, 256, 20, 20]          --
│    └─Conv: 2-1                                             [1, 16, 320, 320]         --
│    │    └─Conv2d: 3-1                                      [1, 16, 320, 320]         (432)
│    │    └─BatchNorm2d: 3-2                                 [1, 16, 320, 320]         (32)
│    └─SPPF: 2-503                                           --                        (recursive)
│    │    └─Conv: 3-580                                      --                        (recursive)
│    └─Conv: 2-3                                             [1, 32, 160, 160]         --
│    │    └─Conv2d: 3-4                                      [1, 32, 160, 160]         (4,608)
│    │    └─BatchNorm2d: 3-5                                 [1, 32

In [17]:
total = 0
for name, param in model.named_parameters():
    if param.requires_grad:
        print(f"{name}: {param.numel():,}")
        total += param.numel()
print(f"Total trainable params: {total:,}")

lstm.weight_ih_l0: 524,288
lstm.weight_hh_l0: 1,048,576
lstm.bias_ih_l0: 2,048
lstm.bias_hh_l0: 2,048
lstm.weight_ih_l1: 1,048,576
lstm.weight_hh_l1: 1,048,576
lstm.bias_ih_l1: 2,048
lstm.bias_hh_l1: 2,048
fc.weight: 3,584
fc.bias: 7
Total trainable params: 3,681,799


In [18]:
def calculate_accuracy(preds, labels):
    _, predicted = torch.max(preds, dim=1)
    correct = (predicted == labels).sum().item()
    total = labels.size(0)
    return correct / total
from torch.utils.tensorboard import SummaryWriter

In [23]:
from time import time, strftime, localtime
from IPython.display import clear_output
def train(model, train_loader, val_loader, criterion, optimizer, device, epochs=10):
    now = localtime(time())
    now = strftime("%m-%d_%H_%M", now)
    writer = SummaryWriter(f"Logs/{now}/")
    model.to(device)
    best_val_acc = 0.0
    def train_one_epoch(epoch):
        model.train()
        total_loss, total_acc = 0.0, 0.0
        for step, (x_seq, label) in enumerate(train_loader):
            # Ensure input shape: (B, 8, 3, 640, 640)
            x_seq = x_seq.unsqueeze(0).to(device) if x_seq.ndim == 4 else x_seq.to(device)  # (B=1, 8, 3, 640, 640)
            label = label.squeeze().to(device)

            optimizer.zero_grad()
            outputs = model(x_seq)
            loss = criterion(outputs, label)
            loss.backward()
            optimizer.step()

            acc = calculate_accuracy(outputs, label)
            total_loss += loss.item()
            total_acc += acc

            print(f"[Train Step {step}/{len(train_loader)}] Loss: {loss.item():.4f}, Acc: {acc:.4f}")
            clear_output(wait=True)
            writer.add_scalars("Train", {
                "Loss": loss.item(),
                "Accuracy": acc
            }, epoch*len(train_loader)+step)
            writer.flush()

        avg_loss = total_loss / len(train_loader)
        avg_acc = total_acc / len(train_loader)
        writer.add_scalars("Epoch", {
            "Loss": avg_loss,
            "Accuracy": avg_acc
        }, epoch)
        writer.flush()

        print(f"[Epoch {epoch}/{epochs}] Train Loss: {avg_loss:.4f}, Accuracy: {avg_acc:.4f}")
        
    def validate(epoch, best_val_acc):

        model.eval()
        val_loss, val_acc = 0.0, 0.0

        with torch.no_grad():
            for x_seq, label in val_loader:
                x_seq = x_seq.unsqueeze(0).to(device) if x_seq.ndim == 4 else x_seq.to(device)
                label = label.squeeze().to(device)

                outputs = model(x_seq)
                loss = criterion(outputs, label)
                acc = calculate_accuracy(outputs, label)

                val_loss += loss.item()
                val_acc += acc.item() if isinstance(acc, torch.Tensor) else acc
        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), '../Models/best_yololstm2.pth')
            print(f"✅ Saved new best model with val_acc: {val_acc:.4f}")
        avg_loss = val_loss / len(val_loader)
        avg_acc = val_acc / len(val_loader)
        writer.add_scalars("Validation", {
            "Loss": avg_loss,
            "Accuracy": avg_acc
        }, epoch)
        writer.flush()

        print(f"[Epoch {epoch}/{epochs}] Val Loss: {avg_loss:.4f}, Accuracy: {avg_acc:.4f}")

    for epoch in range(epochs):
        train_one_epoch(epoch)
        validate(epoch, best_val_acc)

    writer.close()


In [24]:
import torch.nn as nn
import torch.optim as optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float).to(device)

criterion = nn.CrossEntropyLoss(weight=class_weights_tensor)
optimizer = optim.Adam(model.parameters(), lr=1e-4)
try:
    train(model, train_dataloader, val_dataloader, criterion, optimizer, device, epochs=10)
except KeyboardInterrupt:
    print("Training interrupted by user.")

[Epoch 9/10] Train Loss: 0.0654, Accuracy: 0.9790
✅ Saved new best model with val_acc: 183.4375
[Epoch 9/10] Val Loss: 0.0510, Accuracy: 0.9862


In [25]:
from sklearn.metrics import f1_score

def calculate_f1_score(outputs, labels):
    preds = torch.argmax(outputs, dim=1).cpu().numpy()
    targets = labels.cpu().numpy()
    return f1_score(targets, preds, average='weighted')  # or 'macro' if preferred

def test_model(model, test_loader, criterion, device):
    model.eval()
    test_loss, test_acc, test_f1 = 0.0, 0.0, 0.0

    with torch.no_grad():
        for x_seq, label in test_loader:
            x_seq = x_seq.unsqueeze(0).to(device) if x_seq.ndim == 4 else x_seq.to(device)
            label = label.squeeze().to(device)

            outputs = model(x_seq)
            loss = criterion(outputs, label)
            acc = calculate_accuracy(outputs, label)
            f1 = calculate_f1_score(outputs, label)

            test_loss += loss.item()
            test_acc += acc
            test_f1 += f1

    avg_loss = test_loss / len(test_loader)
    avg_acc = test_acc / len(test_loader)
    avg_f1 = test_f1 / len(test_loader)

    print(f"[Test] Loss: {avg_loss:.4f}, Accuracy: {avg_acc:.4f}, F1 Score: {avg_f1:.4f}")

In [26]:
test_model(model, test_dataloader, criterion, device)

[Test] Loss: 0.0552, Accuracy: 0.9853, F1 Score: 0.9851


In [27]:
len(train_dataset), len(val_dataset), len(test_dataset)

(23752, 5938, 7423)

In [28]:
from time import time
x = torch.rand((1, 8, 3, 640, 640)).to(device)
times = []
for i in range(50):
    start = time()
    y = model(x)
    end = time()
    times.append(end-start)
print(f"Average time: {sum(times)/len(times):.4f} Seconds")

Average time: 0.1096 Seconds


In [29]:
from time import time
x = torch.rand((1, 8, 3, 640, 640)).to("cpu")
model_cpu = model.to("cpu")
times = []
for i in range(50):
    start = time()
    y = model_cpu(x)
    end = time()
    times.append(end-start)
print(f"Average time: {sum(times)/len(times):.4f} Seconds")

Average time: 0.7743 Seconds
