In [16]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision import models
from PIL import Image
import pandas as pd
from pathlib import Path
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cpu


In [17]:
LABEL_PATH = Path("../data/labels/labels_task2.csv")
FRAME_DIR = Path("../data/frames")

df = pd.read_csv(LABEL_PATH)
available_videos = {p.name for p in FRAME_DIR.iterdir() if p.is_dir() and any(p.glob("*.jpg"))}
df = df[df["VIDEO"].isin(available_videos)].reset_index(drop=True)
print(f"Number of videos with frames: {len(df)}")

Number of videos with frames: 30


In [18]:
class OSATSDataset(Dataset):
    def __init__(self, dataframe, frame_dir, transform=None, sequence_length=16):
        self.data = dataframe.copy()
        self.frame_dir = frame_dir
        self.transform = transform
        self.sequence_length = sequence_length
        self.osats_cols = [col for col in dataframe.columns if col.startswith("OSATS_")]

        for col in self.osats_cols:
            self.data[col] = self.data[col].clip(0, 4).astype(np.int64)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        video_id = row["VIDEO"]
        y = row[self.osats_cols].values.astype(np.int64)
        path = self.frame_dir / video_id

        frames = sorted(path.glob("*.jpg"))
        selected = frames[:self.sequence_length]
        if len(selected) == 0:
            raise IndexError(f"No frames for video {video_id}")
        while len(selected) < self.sequence_length:
            selected.append(selected[-1])

        images = [self.transform(Image.open(f).convert("RGB")) for f in selected]
        return torch.stack(images), torch.tensor(y)

In [19]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

dataset = OSATSDataset(df, FRAME_DIR, transform)
loader = DataLoader(dataset, batch_size=4, shuffle=True)

In [20]:
from torch.nn.init import kaiming_uniform_, xavier_uniform_

class CNNModel_1(nn.Module):
    def __init__(self, num_classes=40, sequence_length=16, input_shape=(3,224,224)):
        super(CNNModel_1, self).__init__()
        self.sequence_length = sequence_length

        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 32, 3),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(32, 32, 3),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )

        conv_out_size = self._get_conv_output((sequence_length, *input_shape))
        self.fc1 = nn.Linear(conv_out_size, 100)
        kaiming_uniform_(self.fc1.weight, nonlinearity='relu')
        self.act1 = nn.ReLU()
        self.fc2 = nn.Linear(100, num_classes)
        xavier_uniform_(self.fc2.weight)
        self.act2 = nn.Softmax(dim=1)

    def _get_conv_output(self, shape):
        bs = 1
        input = torch.rand(bs, *shape)
        B, T, C, H, W = input.shape
        input = input.view(B * T, C, H, W)
        output_feat = self.layer1(input)
        output_feat = self.layer2(output_feat)
        output_feat = output_feat.view(output_feat.size(0), -1)
        output_feat = output_feat.view(bs, T, -1).mean(dim=1)
        return int(np.prod(output_feat.size()[1:]))

    def forward(self, x):
        B, T, C, H, W = x.shape
        x = x.view(B * T, C, H, W)
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.view(out.size(0), -1)
        out = out.view(B, T, -1).mean(dim=1)
        out = self.fc1(out)
        out = self.act1(out)
        out = self.fc2(out)
        out = self.act2(out)
        return out


In [21]:
class CNNModel_2(nn.Module):
    def __init__(self, num_classes=40, sequence_length=16, input_shape=(3,224,224)):
        super(CNNModel_2, self).__init__()
        self.sequence_length = sequence_length

        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 16, 3),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, 3),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )

        conv_out_size = self._get_conv_output((sequence_length, *input_shape))
        self.fc1 = nn.Linear(conv_out_size, num_classes)

    def _get_conv_output(self, shape):
        bs = 1
        input = torch.rand(bs, *shape)
        B, T, C, H, W = input.shape
        input = input.view(B * T, C, H, W)
        output_feat = self.layer1(input)
        output_feat = self.layer2(output_feat)
        output_feat = output_feat.view(output_feat.size(0), -1)
        output_feat = output_feat.view(bs, T, -1).mean(dim=1)
        return int(np.prod(output_feat.size()[1:]))

    def forward(self, x):
        B, T, C, H, W = x.shape
        x = x.view(B * T, C, H, W)
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.view(out.size(0), -1)
        out = out.view(B, T, -1).mean(dim=1)
        out = self.fc1(out)
        return out


In [22]:
from torch.nn import BatchNorm2d, Dropout
import torch.nn.functional as F

class CNNModel_3(nn.Module):
    def __init__(self, num_classes=40, sequence_length=16, input_shape=(3,224,224)):
        super(CNNModel_3, self).__init__()
        self.sequence_length = sequence_length

        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1),
            BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(32, 64, 3),
            BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )

        conv_out_size = self._get_conv_output((sequence_length, *input_shape))
        self.fc1 = nn.Linear(conv_out_size, 600)
        self.drop = Dropout(0.25)
        self.fc2 = nn.Linear(600, 120)
        self.fc3 = nn.Linear(120, num_classes)

    def _get_conv_output(self, shape):
        bs = 1
        input = torch.rand(bs, *shape)
        B, T, C, H, W = input.shape
        input = input.view(B * T, C, H, W)
        output_feat = self.layer1(input)
        output_feat = self.layer2(output_feat)
        output_feat = output_feat.view(output_feat.size(0), -1)
        output_feat = output_feat.view(bs, T, -1).mean(dim=1)
        return int(np.prod(output_feat.size()[1:]))

    def forward(self, x):
        B, T, C, H, W = x.shape
        x = x.view(B * T, C, H, W)
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.view(out.size(0), -1)
        out = out.view(B, T, -1).mean(dim=1)
        out = F.relu(self.fc1(out))
        out = self.drop(out)
        out = F.relu(self.fc2(out))
        out = self.fc3(out)
        return out


In [23]:
from torch.nn import Dropout2d
import torch.nn.functional as F

class CNNModel_4(nn.Module):
    def __init__(self, num_classes=40, sequence_length=16, input_shape=(3,224,224)):
        super(CNNModel_4, self).__init__()
        self.sequence_length = sequence_length

        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 32, 5),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),
            Dropout2d(0.2)
        )

        conv_out_size = self._get_conv_output((sequence_length, *input_shape))
        self.fc1 = nn.Linear(conv_out_size, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def _get_conv_output(self, shape):
        bs = 1
        input = torch.rand(bs, *shape)
        B, T, C, H, W = input.shape
        input = input.view(B * T, C, H, W)
        output_feat = self.layer1(input)
        output_feat = output_feat.view(output_feat.size(0), -1)
        output_feat = output_feat.view(bs, T, -1).mean(dim=1)
        return int(np.prod(output_feat.size()[1:]))

    def forward(self, x):
        B, T, C, H, W = x.shape
        x = x.view(B * T, C, H, W)
        out = self.layer1(x)
        out = out.view(out.size(0), -1)
        out = out.view(B, T, -1).mean(dim=1)
        out = F.relu(self.fc1(out))
        out = self.fc2(out)
        return out


In [24]:
def train_model(model, dataloader, epochs):
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

    model.train()
    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, targets in dataloader:
            inputs = inputs.to(device)
            targets = targets.to(device)

            outputs = model(inputs)  # [batch_size, num_classes]

            if targets.dim() > 1:
                targets = targets[:, 0]  # assume mesma label para toda a sequência

            if outputs.shape[0] != targets.shape[0]:
                raise ValueError(f"Shape mismatch: outputs {outputs.shape} vs targets {targets.shape}")

            loss = criterion(outputs, targets)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f"Epoch [{epoch+1}/{epochs}], Loss: {running_loss/len(dataloader):.4f}")

    return model

In [None]:
model1 = CNNModel_1(num_classes=40, sequence_length=16, input_shape=(3,224,224))
hist1 = train_model(model1, loader, 100)

model2 = CNNModel_2(num_classes=40, sequence_length=16, input_shape=(3,224,224))
hist2 = train_model(model2, loader, 100)

model3 = CNNModel_3(num_classes=40, sequence_length=16, input_shape=(3,224,224))
hist3 = train_model(model3, loader, 100)

model4 = CNNModel_4(num_classes=40, sequence_length=16, input_shape=(3,224,224))
hist4 = train_model(model4, loader, 100)

Epoch [1/100], Loss: 3.5715
Epoch [2/100], Loss: 3.4184
Epoch [3/100], Loss: 3.4497
Epoch [4/100], Loss: 3.4184
Epoch [5/100], Loss: 3.4184
Epoch [6/100], Loss: 3.4497
Epoch [7/100], Loss: 3.4184
Epoch [8/100], Loss: 3.4184
Epoch [9/100], Loss: 3.4497
Epoch [10/100], Loss: 3.3872
Epoch [11/100], Loss: 3.4184
Epoch [12/100], Loss: 3.4497
Epoch [13/100], Loss: 3.4497
Epoch [14/100], Loss: 3.4184
Epoch [15/100], Loss: 3.3872
Epoch [16/100], Loss: 3.4184
Epoch [17/100], Loss: 3.3872
Epoch [18/100], Loss: 3.3872
Epoch [19/100], Loss: 3.4497
Epoch [20/100], Loss: 3.4497
Epoch [21/100], Loss: 3.3872
Epoch [22/100], Loss: 3.4497
Epoch [23/100], Loss: 3.4184
Epoch [24/100], Loss: 3.4184
Epoch [25/100], Loss: 3.3872
Epoch [26/100], Loss: 3.4497
Epoch [27/100], Loss: 3.4497
Epoch [28/100], Loss: 3.4184
Epoch [29/100], Loss: 3.4497
Epoch [30/100], Loss: 3.4497
Epoch [31/100], Loss: 3.4497
Epoch [32/100], Loss: 3.4497
Epoch [33/100], Loss: 3.4497
Epoch [34/100], Loss: 3.4497
Epoch [35/100], Loss: 3

In [None]:
plt.plot(hist1, label='CNN Model 1')
plt.plot(hist2, label='CNN Model 2')
plt.plot(hist3, label='CNN Model 3')
plt.plot(hist4, label='CNN Model 4')
plt.title('Loss durante treino')
plt.xlabel('Época')
plt.ylabel('Loss')
plt.legend()
plt.grid()
plt.show()

In [None]:
torch.save(model1.state_dict(), "../outputs/models/osats_cnn_model1.pt")
print("Modelo 1 salvo com sucesso!")

## ResNet18 + MLP

In [None]:
frame_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

def load_video_frames(video_path, max_frames=16):
    frames = sorted(video_path.glob("*.jpg"))[:max_frames]
    video_tensor = torch.stack([frame_transform(Image.open(f).convert("RGB")) for f in frames])
    if len(frames) < max_frames:
        padding = torch.zeros((max_frames - len(frames), 3, 224, 224))
        video_tensor = torch.cat([video_tensor, padding], dim=0)
    return video_tensor

X = []
y = []

for _, row in df.iterrows():
    video_id = row["VIDEO"]
    video_dir = FRAME_DIR / video_id
    if not video_dir.exists():
        continue
    video_tensor = load_video_frames(video_dir, max_frames=16)
    labels = torch.tensor([
        row["OSATS_RESPECT"], row["OSATS_MOTION"], row["OSATS_INSTRUMENT"],
        row["OSATS_SUTURE"], row["OSATS_FLOW"], row["OSATS_KNOWLEDGE"],
        row["OSATS_PERFORMANCE"], row["OSATS_FINAL_QUALITY"]
    ], dtype=torch.long)
    X.append(video_tensor)
    y.append(labels)

X = torch.stack(X)
y = torch.stack(y)

# Corrigir labels se estiverem na escala 1–5 (passar para 0–4)
if torch.any(y > 4):
    print("Corrigindo escala de labels de 1–5 para 0–4...")
    y = y - 1

# Verificação de segurança
assert torch.all((y >= 0) & (y <= 4)), "Erro: targets fora do intervalo 0–4"


Corrigindo escala de labels de 1–5 para 0–4...


In [None]:
from torch.utils.data import random_split

class OSATSDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

dataset = OSATSDataset(X, y)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)


In [None]:
class OSATSResNet(nn.Module):
    def __init__(self):
        super().__init__()
        resnet = models.resnet18(pretrained=True)
        resnet.fc = nn.Identity()
        self.backbone = resnet
        self.fc_shared = nn.Linear(512, 128)
        self.heads = nn.ModuleList([nn.Linear(128, 5) for _ in range(8)])

    def forward(self, x):
        B, T, C, H, W = x.shape
        x = x.view(B * T, C, H, W)
        feat = self.backbone(x).view(B, T, -1)
        feat = feat.mean(dim=1)
        shared = F.relu(self.fc_shared(feat))
        return torch.stack([head(shared) for head in self.heads], dim=1)


In [None]:
loss_fn = nn.CrossEntropyLoss()

def compute_loss(preds, targets):
    loss = 0
    for i in range(8):
        loss += loss_fn(preds[:, i], targets[:, i])
    return loss / 8


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = OSATSResNet().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.0005)
num_epochs = 100

for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    all_preds, all_targets = [], []

    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = compute_loss(outputs, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * inputs.size(0)
        preds = torch.argmax(outputs, dim=2)
        all_preds.append(preds.cpu())
        all_targets.append(targets.cpu())

    avg_loss = train_loss / len(train_loader.dataset)
    print(f"Época {epoch+1}/{num_epochs} - Loss treino: {avg_loss:.4f}")




Época 1/20 - Loss treino: 1.6470
Época 2/20 - Loss treino: 1.3331
Época 3/20 - Loss treino: 1.1281
Época 4/20 - Loss treino: 0.9567
Época 5/20 - Loss treino: 0.8140
Época 6/20 - Loss treino: 0.6887
Época 7/20 - Loss treino: 0.5815
Época 8/20 - Loss treino: 0.4888
Época 9/20 - Loss treino: 0.4093
Época 10/20 - Loss treino: 0.3413
Época 11/20 - Loss treino: 0.2832
Época 12/20 - Loss treino: 0.2338
Época 13/20 - Loss treino: 0.1925
Época 14/20 - Loss treino: 0.1584
Época 15/20 - Loss treino: 0.1303
Época 16/20 - Loss treino: 0.1075
Época 17/20 - Loss treino: 0.0892
Época 18/20 - Loss treino: 0.0744
Época 19/20 - Loss treino: 0.0624
Época 20/20 - Loss treino: 0.0527


In [None]:
all_preds = torch.cat(all_preds)
all_targets = torch.cat(all_targets)

for i in range(8):
    print(f"\n--- Critério OSATS {i+1} ---")
    print(classification_report(all_targets[:, i], all_preds[:, i], digits=3))
