In [None]:
#1️⃣ Instalar as bibliotecas
!pip install torch torchvision torchaudio optuna

In [None]:
#2️⃣ Verificar se o PyTorch detecta GPU
#Execute:
import torch

print("Versão do PyTorch:", torch.__version__)
print("CUDA disponível:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("Nome da GPU:", torch.cuda.get_device_name(0))

In [None]:
#Estrutura do projeto

#O notebook final será dividido em:

#Instalação de bibliotecas e imports

#Upload e configuração do dataset (EyePACS ou similar)

#Definição do modelo (AnyNet + CORAL + OLR)

#Funções de treino, validação e métricas

#Configuração do Optuna (TPE)

#Treinamento e salvamento do melhor modelo

In [None]:
# ============================================
# 🔧 1. Instalação e imports
# ============================================

!pip install optuna albumentations timm --quiet

import os
import cv2
import math
import random
import numpy as np
import pandas as pd
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from sklearn.metrics import cohen_kappa_score

import timm
import optuna

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Usando dispositivo:", DEVICE)


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/400.9 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━[0m [32m225.3/400.9 kB[0m [31m7.1 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m400.9/400.9 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25hUsando dispositivo: cpu


In [None]:
# ============================================
# 📂 2. Dataset e DataLoader
# ============================================

# Estrutura esperada:
# /content/eyepacs/
# ├── train_images/
# │      ├── 0001.png
# │      ├── 0002.png
# │      └── ...
# ├── labels.csv  (colunas: image_id,label)

DATA_DIR = "/content/sample_data"
CSV_PATH = os.path.join(DATA_DIR, "labels.csv")

# Se você ainda não tem, crie e envie:
# !mkdir -p /content/eyepacs/train_images
# depois use o painel lateral do Colab → "Arquivos" → "Upload"

df = pd.read_csv(CSV_PATH)
print("Amostras:", len(df))
print(df.head())

# Split treino/val
from sklearn.model_selection import train_test_split
train_df, val_df = train_test_split(df, test_size=0.2, stratify=df['label'], random_state=42)

class EyeDataset(Dataset):
    def __init__(self, df, img_dir, transform=None):
        self.df = df.reset_index(drop=True)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        path = os.path.join(self.img_dir, str(row['image_id']) + ".png")
        img = cv2.imread(path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        if self.transform:
            img = self.transform(image=img)["image"]
        label = torch.tensor(row['label']).long()
        return img, label

import albumentations as A
from albumentations.pytorch import ToTensorV2

def get_transforms(train=True):
    if train:
        return A.Compose([
            A.Resize(320, 320),
            A.HorizontalFlip(),
            A.Rotate(limit=25),
            A.RandomBrightnessContrast(),
            A.Normalize(mean=(0.485, 0.456, 0.406),
                        std=(0.229, 0.224, 0.225)),
            ToTensorV2(),
        ])
    else:
        return A.Compose([
            A.Resize(320, 320),
            A.Normalize(mean=(0.485, 0.456, 0.406),
                        std=(0.229, 0.224, 0.225)),
            ToTensorV2(),
        ])


In [None]:
# ============================================
# 🧩 3. Modelo AnyNet + CORAL + OLR
# ============================================

class SEBlock(nn.Module):
    def __init__(self, c, reduction=16):
        super().__init__()
        self.fc1 = nn.Linear(c, c // reduction)
        self.fc2 = nn.Linear(c // reduction, c)

    def forward(self, x):
        b, c, _, _ = x.size()
        y = F.adaptive_avg_pool2d(x, 1).view(b, c)
        y = F.relu(self.fc1(y))
        y = torch.sigmoid(self.fc2(y)).view(b, c, 1, 1)
        return x * y

class ResNeXtBlock(nn.Module):
    def __init__(self, in_c, out_c, stride=1, cardinality=8, use_se=True):
        super().__init__()
        mid_c = out_c // 2
        self.conv1 = nn.Conv2d(in_c, mid_c, kernel_size=1, bias=False)
        self.conv2 = nn.Conv2d(mid_c, mid_c, kernel_size=3,
                               stride=stride, padding=1, groups=cardinality, bias=False)
        self.conv3 = nn.Conv2d(mid_c, out_c, kernel_size=1, bias=False)
        self.bn = nn.BatchNorm2d(out_c)
        self.shortcut = nn.Sequential()
        if stride != 1 or in_c != out_c:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_c, out_c, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_c)
            )
        self.use_se = use_se
        if use_se:
            self.se = SEBlock(out_c)

    def forward(self, x):
        out = F.relu(self.conv1(x))
        out = F.relu(self.conv2(out))
        out = self.conv3(out)
        if self.use_se:
            out = self.se(out)
        out += self.shortcut(x)
        return F.relu(self.bn(out))

# ---- HEADS ---- #

class CoralHead(nn.Module):
    """Head para classificação ordinal CORAL"""
    def __init__(self, in_features, num_classes):
        super().__init__()
        self.num_classes = num_classes
        self.fc = nn.Linear(in_features, 1)
        self.register_parameter("bias", nn.Parameter(torch.zeros(num_classes - 1)))

    def forward(self, x):
        logit = self.fc(x).squeeze(-1)
        logits = logit.unsqueeze(1) - self.bias.unsqueeze(0)
        probs = torch.sigmoid(logits)
        return probs

def coral_loss(probs, targets):
    Kminus1 = probs.size(1)
    cum_targets = (torch.arange(Kminus1).to(targets.device).unsqueeze(0) < targets.unsqueeze(1)).float()
    return F.binary_cross_entropy(probs, cum_targets, reduction='mean')

class OrdinalLogisticHead(nn.Module):
    """Head de Regressão Logística Ordinal (OLR)"""
    def __init__(self, in_features, num_classes):
        super().__init__()
        self.num_classes = num_classes
        self.fc = nn.Linear(in_features, 1)
        # thresholds (cortes)
        self.theta = nn.Parameter(torch.arange(num_classes - 1).float())

    def forward(self, x):
        # retorna probabilidades cumulativas P(y > k)
        eta = self.fc(x)
        logits = eta - self.theta
        probs = torch.sigmoid(logits)
        return probs

def olr_loss(probs, targets):
    Kminus1 = probs.size(1)
    # y > k cumulativo
    cum_targets = (torch.arange(Kminus1).to(targets.device).unsqueeze(0) < targets.unsqueeze(1)).float()
    loss = F.binary_cross_entropy(probs, cum_targets, reduction='mean')
    return loss

# ---- Modelo AnyNet ---- #

class AnyNet(nn.Module):
    def __init__(self, num_classes=5, use_se=True, head_type="coral"):
        super().__init__()
        self.head_type = head_type
        self.stem = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=2, padding=1, bias=False),
            nn.LayerNorm([32, 160, 160]),
            nn.ReLU(inplace=True)
        )
        self.stage1 = ResNeXtBlock(32, 64, stride=1, use_se=use_se)
        self.stage2 = ResNeXtBlock(64, 128, stride=2, use_se=use_se)
        self.stage3 = ResNeXtBlock(128, 256, stride=2, use_se=use_se)
        self.stage4 = ResNeXtBlock(256, 512, stride=2, use_se=use_se)
        self.pool = nn.AdaptiveAvgPool2d(1)
        if head_type == "coral":
            self.head = CoralHead(512, num_classes)
        else:
            self.head = OrdinalLogisticHead(512, num_classes)

    def forward(self, x):
        x = self.stem(x)
        x = self.stage1(x)
        x = self.stage2(x)
        x = self.stage3(x)
        x = self.stage4(x)
        x = self.pool(x).flatten(1)
        return self.head(x)


In [None]:
# ============================================
# ⚙️ 4. Funções de treino e validação
# ============================================

def ordinal_predict(probs):
    return torch.sum(probs > 0.5, dim=1)

def train_one_epoch(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0
    for imgs, labels in loader:
        imgs, labels = imgs.to(DEVICE), labels.to(DEVICE)
        optimizer.zero_grad()
        probs = model(imgs)
        loss = criterion(probs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * imgs.size(0)
    return total_loss / len(loader.dataset)

def validate(model, loader, criterion):
    model.eval()
    preds, trues = [], []
    total_loss = 0
    with torch.no_grad():
        for imgs, labels in loader:
            imgs, labels = imgs.to(DEVICE), labels.to(DEVICE)
            probs = model(imgs)
            loss = criterion(probs, labels)
            total_loss += loss.item() * imgs.size(0)
            pred = ordinal_predict(probs)
            preds.extend(pred.cpu().numpy())
            trues.extend(labels.cpu().numpy())
    qwk = cohen_kappa_score(trues, preds, weights='quadratic')
    return total_loss / len(loader.dataset), qwk


In [None]:
# ============================================
# 🧪 5. Otimização com Optuna (TPE)
# ============================================

def objective(trial):
    lr = trial.suggest_loguniform("lr", 1e-5, 1e-3)
    wd = trial.suggest_loguniform("wd", 1e-6, 1e-3)
    use_se = trial.suggest_categorical("use_se", [True, False])
    batch_size = trial.suggest_categorical("batch_size", [8, 16, 32])
    head_type = trial.suggest_categorical("head_type", ["coral", "olr"])

    model = AnyNet(num_classes=5, use_se=use_se, head_type=head_type).to(DEVICE)
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=wd)
    criterion = coral_loss if head_type == "coral" else olr_loss

    train_set = EyeDataset(train_df, os.path.join(DATA_DIR, "train_images"), transform=get_transforms(True))
    val_set = EyeDataset(val_df, os.path.join(DATA_DIR, "train_images"), transform=get_transforms(False))
    train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=2)
    val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False, num_workers=2)

    best_qwk = -1
    patience, bad_epochs = 5, 0
    for epoch in range(15):
        tr_loss = train_one_epoch(model, train_loader, optimizer, criterion)
        val_loss, qwk = validate(model, val_loader, criterion)
        trial.report(qwk, epoch)
        if trial.should_prune():
            raise optuna.TrialPruned()
        if qwk > best_qwk:
            best_qwk = qwk
            bad_epochs = 0
        else:
            bad_epochs += 1
            if bad_epochs >= patience:
                break
    return best_qwk


In [None]:
# ============================================
# 🎯 6. Rodar o Optuna e treinar o melhor modelo
# ============================================

study = optuna.create_study(direction="maximize", sampler=optuna.samplers.TPESampler())
study.optimize(objective, n_trials=10)  # aumente para 50–200 se tiver GPU

print("Melhores parâmetros:", study.best_params)
print("Melhor QWK:", study.best_value)

best_params = study.best_params
model = AnyNet(num_classes=5, use_se=best_params['use_se'], head_type=best_params['head_type']).to(DEVICE)
optimizer = torch.optim.AdamW(model.parameters(), lr=best_params['lr'], weight_decay=best_params['wd'])
criterion = coral_loss if best_params['head_type'] == "coral" else olr_loss

train_set = EyeDataset(train_df, os.path.join(DATA_DIR, "train_images"), transform=get_transforms(True))
val_set = EyeDataset(val_df, os.path.join(DATA_DIR, "train_images"), transform=get_transforms(False))
train_loader = DataLoader(train_set, batch_size=best_params['batch_size'], shuffle=True, num_workers=2)
val_loader = DataLoader(val_set, batch_size=best_params['batch_size'], shuffle=False, num_workers=2)

best_qwk = -1
for epoch in range(25):
    tr_loss = train_one_epoch(model, train_loader, optimizer, criterion)
    val_loss, qwk = validate(model, val_loader, criterion)
    print(f"Epoch {epoch+1}: Train {tr_loss:.4f} | Val {val_loss:.4f} | QWK {qwk:.4f}")
    if qwk > best_qwk:
        best_qwk = qwk
        torch.save(model.state_dict(), "best_model.pth")

print("Treinamento finalizado. Melhor QWK:", best_qwk)
print("Modelo salvo como best_model.pth")
