In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
import cv2
from tqdm import tqdm
from sklearn.metrics import classification_report


In [None]:
train = '/content/drive/My Drive/train'
val = '/content/drive/My Drive/validation'
test = '/content/drive/My Drive/test'

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load pretrained ResNet-18 and freeze layers
resnet = models.resnet18(pretrained=True)
for param in resnet.parameters():
    param.requires_grad = False
resnet = nn.Sequential(*list(resnet.children())[:-1])  # Remove final FC
resnet.to(device)
resnet.eval()



Sequential(
  (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (2): ReLU(inplace=True)
  (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (4): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Con

In [None]:
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

def preprocess_frame(frame):
    return transform(frame).unsqueeze(0).to(device)  # [1, 3, 224, 224]


In [None]:
def extract_frames(video_path, frame_rate=5):
    cap = cv2.VideoCapture(video_path)
    frames = []
    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        if frame_count % frame_rate == 0:
            frames.append(frame)
        frame_count += 1
    cap.release()
    return frames


In [None]:
class VideoDataset(Dataset):
    def __init__(self, video_files, labels, frame_rate=5, max_frames=40):
        self.video_files = video_files
        self.labels = labels
        self.frame_rate = frame_rate
        self.max_frames = max_frames

    def __len__(self):
        return len(self.video_files)

    def __getitem__(self, idx):
        video_path = self.video_files[idx]
        label = self.labels[idx]

        frames = extract_frames(video_path, self.frame_rate)
        frame_features = []

        for frame in frames[:self.max_frames]:
            frame = preprocess_frame(frame)
            with torch.no_grad():
                feature = resnet(frame)  # [1, 512, 1, 1]
                feature = feature.view(-1)  # [512]
            frame_features.append(feature)

        if len(frame_features) < self.max_frames:
            pad = [torch.zeros(512).to(device)] * (self.max_frames - len(frame_features))
            frame_features.extend(pad)

        frame_features = torch.stack(frame_features)  # [max_frames, 512]
        return frame_features, label


In [None]:
# param_grid = {
#     "hidden_dim": [128, 256],
#     "num_layers": [2, 3],
#     "dropout": [0.3, 0.5],
#     "learning_rate": [0.01, 0.001, 0.0001],
#     "batch_size": [1],
# }
hidden_dim = 128
num_layers = 2
dropout = 0.3
learning_rate = 0.0001
batch_size = 16

In [None]:
classes = ["A", "B1", "B2", "B4", "B5", "B6", "G"]

def extract_label(file):
    parts = file[:-4].split("label_")
    if len(parts) < 2:
        return 0

    labels = parts[1].split('-')
    for i, cls in enumerate(classes):
        if cls in labels:
            return i
    return 0

In [None]:
def create_dataset_from_folder(folder_path):
    video_files = []
    labels = []

    for file in os.listdir(folder_path):
        if file.endswith((".mp4", ".avi", ".mkv")):  # adjust extensions if needed
            full_path = os.path.join(folder_path, file)
            label = extract_label(file)
            video_files.append(full_path)
            labels.append(label)
    return video_files, labels

In [None]:
class CNNLSTM(nn.Module):
    def __init__(self, feature_dim=512, hidden_dim=hidden_dim, num_classes=7, num_layers=num_layers, dropout=dropout):
        super(CNNLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size=feature_dim,
                            hidden_size=hidden_dim,
                            num_layers=num_layers,
                            batch_first=True,
                            dropout=dropout if num_layers > 1 else 0)
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x):  # x: [batch_size, seq_len, 512]
        _, (hn, _) = self.lstm(x)
        return self.fc(hn[-1])  # Output: [batch_size, num_classes]


In [None]:
x_train, y_train = create_dataset_from_folder(train)
x_val, y_val = create_dataset_from_folder(val)
x_test, y_test = create_dataset_from_folder(test)

In [None]:
train_dataset = VideoDataset(x_train, y_train)
val_dataset = VideoDataset(x_val, y_val)

In [None]:
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

In [None]:
from sklearn.metrics import classification_report

In [None]:
model = CNNLSTM().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

epochs = 10
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for batch_idx, (x, y) in enumerate(train_loader):
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad()
        outputs = model(x)
        loss = criterion(outputs, y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        print(f"Epoch [{epoch+1}/{epochs}], Step [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}")
    print(f"Epoch [{epoch+1}/{epochs}], Avg Loss: {total_loss / len(train_loader):.4f}")

    model.eval()
    correct, total, val_loss = 0, 0, 0
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for videos, labels in val_loader:
            videos, labels = videos.to(device), labels.to(device)
            outputs = model(videos)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            loss = criterion(outputs, labels)
            val_loss += loss.item()

            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    avg_val_loss = val_loss / len(val_loader)
    val_accuracy = 100 * correct / total

    target_names = classes
    print("Classification report:")
    print(classification_report(all_labels, all_preds, target_names=target_names, zero_division=0))
    print(f"✅ Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%\n")


In [None]:

from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns

cm = confusion_matrix(all_labels, all_preds)

plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()


In [None]:
torch.save(model.state_dict(), f"cnn_lstm_weights_{hidden_dim}_{num_layers}_{dropout}_{learning_rate}_{batch_size}.pth")
torch.save(model, f"cnn_lstm_model_full_{hidden_dim}_{num_layers}_{dropout}_{learning_rate}_{batch_size}.pth")

In [None]:
test_dataset = VideoDataset(x_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=1)

In [None]:
model.eval()
correct, total = 0, 0
all_preds = []
all_labels = []
with torch.no_grad():
    for videos, labels in test_loader:
        videos, labels = videos.to(device), labels.to(device)
        outputs = model(videos)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

test_accuracy = 100 * correct / total
print(f"Validation Accuracy: {test_accuracy:.2f}%")

# Classification report
target_names = classes
print("Classification report:")
print(classification_report(all_labels, all_preds, target_names=target_names, zero_division=0))

In [None]:
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

cm = confusion_matrix(all_labels, all_preds)

plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()
