In [None]:
!pip install roboflow torch torchvision timm pyyaml scikit-learn matplotlib seaborn -q

In [None]:
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import torch
import torch.nn as nn
import timm
from torch.utils.data import Dataset
from pathlib import Path
import yaml
from torch.utils.data import DataLoader
from torchvision import transforms
from PIL import Image
import io
import uvicorn
import nest_asyncio
from datetime import datetime
import base64
import cv2
import numpy as np
from tqdm import tqdm
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, classification_report

In [None]:
from roboflow import Roboflow

rf = Roboflow(api_key="bJOCz45QyqwU2ubTQNxo")
project = rf.workspace("suelen").project("focus-of-attention-aujxc")
version = project.version(1)
dataset = version.download("yolov8")

In [None]:
class Config:
    # Dados
    img_size = 112

    # Treinamento
    batch_size = 64
    epochs = 10
    lr = 0.001

    # Hardware
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    num_workers = 2

    # Modelo
    backbone = 'mobilenetv3_small_100'
    hidden_lstm = 64

    #  'Focus-of-Attention-1' ou './Focus-of-Attention-1'
    dataset_path = 'Focus-of-Attention-1'

    classes = []
    num_classes = 0

cfg = Config()

print(f"  - Dataset: {cfg.dataset_path}")
print(f"  - Device: {cfg.device}")
print(f"  - Batch size: {cfg.batch_size}")
print(f"  - Épocas: {cfg.epochs}")

In [None]:
class RoboflowAttentionDataset(Dataset):
    """Dataset agrupado: Atento (Frontal) vs Desatento (resto)"""

    def __init__(self, dataset_path, split='train', transform=None):
        self.dataset_path = Path(dataset_path)
        self.split = split
        self.transform = transform

        if split == 'val':
            split = 'valid'

        self.img_dir = self.dataset_path / split / 'images'
        self.label_dir = self.dataset_path / split / 'labels'

        # Lê o data.yaml
        yaml_path = self.dataset_path / 'data.yaml'
        with open(yaml_path, 'r') as f:
            data_config = yaml.safe_load(f)

        self.original_classes = data_config['names']

        # MAPEIA PARA 2 CLASSES
        # 0 = Atento (Front Frontal)
        # 1 = Desatento (todas as outras)
        self.class_mapping = {}
        for idx, class_name in enumerate(self.original_classes):
            if 'frontal' in class_name.lower():
                self.class_mapping[idx] = 0  # ATENTO
            else:
                self.class_mapping[idx] = 1  # DESATENTO

        print(f"Mapeamento de classes:")
        for orig_idx, orig_name in enumerate(self.original_classes):
            new_class = "ATENTO" if self.class_mapping[orig_idx] == 0 else "DESATENTO"
            print(f"    {orig_name} → {new_class}")

        # Coleta amostras
        self.samples = []
        if self.img_dir.exists():
            for img_path in self.img_dir.glob('*.*'):
                if img_path.suffix.lower() in ['.jpg', '.jpeg', '.png']:
                    label_path = self.label_dir / f"{img_path.stem}.txt"
                    if label_path.exists():
                        with open(label_path, 'r') as f:
                            line = f.readline().strip()
                            if line:
                                original_class_id = int(line.split()[0])
                                # MAPEIA PARA NOVA CLASSE (0=Atento, 1=Desatento)
                                new_class_id = self.class_mapping[original_class_id]
                                self.samples.append((str(img_path), new_class_id))

        print(f"{split.upper()}: {len(self.samples)} imagens")

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_path, label = self.samples[idx]

        img = cv2.imread(img_path)
        if img is None:
            img = np.zeros((224, 224, 3), dtype=np.uint8)
        else:
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        if self.transform:
            img = self.transform(img)

        return img, label

    def get_class_names(self):
        return ['Atento', 'Desatento']

In [None]:
def get_transforms(train=True):
    if train:
        return transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((cfg.img_size, cfg.img_size)),
            transforms.RandomHorizontalFlip(0.5),
            transforms.ColorJitter(brightness=0.2, contrast=0.2),
            transforms.RandomRotation(10),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])
    else:
        return transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((cfg.img_size, cfg.img_size)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

In [None]:
train_dataset = RoboflowAttentionDataset(
    cfg.dataset_path,
    split='train',
    transform=get_transforms(train=True)
)

# Validação
val_dataset = RoboflowAttentionDataset(
    cfg.dataset_path,
    split='valid',
    transform=get_transforms(train=False)
)

# Teste
test_dataset = RoboflowAttentionDataset(
    cfg.dataset_path,
    split='test',
    transform=get_transforms(train=False)
)

# Atualiza configurações
cfg.classes = train_dataset.get_class_names()
cfg.num_classes = len(cfg.classes)

print(f"Classes: {cfg.classes}")
print(f"Total de classes: {cfg.num_classes}")

# DataLoaders
train_loader = DataLoader(
    train_dataset,
    batch_size=cfg.batch_size,
    shuffle=True,
    num_workers=cfg.num_workers,
    pin_memory=True
)

val_loader = DataLoader(
    val_dataset,
    batch_size=cfg.batch_size,
    shuffle=False,
    num_workers=cfg.num_workers,
    pin_memory=True
)

test_loader = DataLoader(
    test_dataset,
    batch_size=cfg.batch_size,
    shuffle=False,
    num_workers=cfg.num_workers,
    pin_memory=True
)

print(f"Train batches: {len(train_loader)}")
print(f"Val batches: {len(val_loader)}")
print(f"Test batches: {len(test_loader)}")

In [None]:
from collections import Counter

# Conta classes
train_labels = [label for _, label in train_dataset.samples]
class_counts = Counter(train_labels)

print(f"Distribuição (Atento/Desatento):")
for class_id in sorted(class_counts.keys()):
    count = class_counts[class_id]
    pct = (count / len(train_labels)) * 100
    print(f"  {cfg.classes[class_id]}: {count} ({pct:.1f}%)")

# Calcula pesos
total = len(train_labels)
weights = [total / (len(class_counts) * class_counts[i]) for i in sorted(class_counts.keys())]
class_weights_tensor = torch.FloatTensor(weights).to(cfg.device)

print(f"Pesos: {class_weights_tensor}")

In [None]:
class LightAttentionModel(nn.Module):
    """Modelo híbrido CNN + LSTM"""

    def __init__(self, num_classes=2, hidden_size=64):
        super().__init__()

        self.cnn = timm.create_model(
            cfg.backbone,
            pretrained=True,
            num_classes=0,
            global_pool=''
        )

        with torch.no_grad():
            dummy = torch.randn(1, 3, cfg.img_size, cfg.img_size)
            features = self.cnn(dummy)
            self.feature_size = features.shape[1]

        self.pool = nn.AdaptiveAvgPool2d(1)

        self.lstm = nn.LSTM(
            input_size=self.feature_size,
            hidden_size=hidden_size,
            num_layers=1,
            batch_first=True
        )

        self.classifier = nn.Sequential(
            nn.Linear(hidden_size, 32),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(32, num_classes)
        )

    def forward(self, x):
        if len(x.shape) == 4:
            x = x.unsqueeze(1)

        batch_size, seq_len = x.shape[0], x.shape[1]
        x = x.view(batch_size * seq_len, *x.shape[2:])
        features = self.cnn(x)
        features = self.pool(features).squeeze(-1).squeeze(-1)
        features = features.view(batch_size, seq_len, -1)

        lstm_out, _ = self.lstm(features)
        lstm_out = lstm_out[:, -1, :]

        out = self.classifier(lstm_out)
        return out

model = LightAttentionModel(num_classes=cfg.num_classes, hidden_size=cfg.hidden_lstm)
model = model.to(cfg.device)

total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"  - Total parâmetros: {total_params:,}")
print(f"  - Treináveis: {trainable_params:,}")

In [None]:
def evaluate_model(model, dataloader, split_name='Test'):
    """Avalia o modelo e retorna métricas detalhadas"""
    model.eval()

    all_preds = []
    all_labels = []
    all_probs = []

    with torch.no_grad():
        for images, labels in tqdm(dataloader, desc=f'Eval {split_name}'):
            images = images.to(cfg.device)
            labels = labels.to(cfg.device)

            outputs = model(images)
            probs = torch.softmax(outputs, dim=1)
            _, predicted = outputs.max(1)

            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())

    # Métricas
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, average='weighted'
    )

    print(f"Resultados {split_name.upper()}")
    print(f"  Acurácia:  {accuracy*100:.2f}%")
    print(f"  Precisão:  {precision*100:.2f}%")
    print(f"  Recall:    {recall*100:.2f}%")
    print(f"  F1-Score:  {f1*100:.2f}%")

    # Relatório por classe
    print(f"Relatório por Classe:")
    print(classification_report(
        all_labels, all_preds,
        target_names=cfg.classes,
        digits=4
    ))