# Detecção de Casos de COVID-19 em Tomografias (versão PyTorch)
**Agenda**
- Carregar dataset (ImageFolder)
- Pré-processamento e augmentations
- Transfer Learning (ResNet18)
- Treino, validação, checkpoint, early stopping
- Avaliação final e plots


In [23]:
!pip install kaggle



In [13]:
from google.colab import files
files.upload()  # Envie o arquivo kaggle.json


Saving archive (3).zip to archive (3).zip


In [14]:
# Cell: Imports e configuração
import os
import random

from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision import transforms, datasets, models

from sklearn.metrics import classification_report, confusion_matrix

# Configurações
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", DEVICE)


Device: cpu


In [25]:
# Paths e hiper-parâmetros (ajuste aqui)
# DATA_DIR = "/kaggle/input"  # <-- ajuste para o caminho onde você extraiu o dataset
# The dataset was unzipped to /content/covid_dataset
DATA_DIR = "/content/covid_dataset"
# Espera-se uma estrutura: DATA_DIR/class1/*.png, DATA_DIR/class2/*.png (ImageFolder)
BATCH_SIZE = 16
IMG_SIZE = 224
NUM_EPOCHS = 25
LR = 2e-4
NUM_WORKERS = 4
PATIENCE = 6  # early stopping
MODEL_SAVE_PATH = "best_model_resnet18.pt"

In [26]:
import zipfile
import os

# Path to the uploaded zip file
zip_path = '/content/archive (3).zip'
# Directory where the dataset will be extracted
extract_base_path = '/content/covid_dataset'

# Create the extraction directory if it doesn't exist
if not os.path.exists(extract_base_path):
    os.makedirs(extract_base_path)

# Unzip the file
print(f"Unzipping {zip_path} to {extract_base_path}...")
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_base_path)

print("Unzipping complete.")

# Optional: List contents to verify structure and find the actual dataset root
print("Contents of extracted directory:")
for root, dirs, files in os.walk(extract_base_path):
    level = root.replace(extract_base_path, '').count(os.sep)
    indent = ' ' * 4 * (level)
    print(f'{indent}{os.path.basename(root)}/')
    subindent = ' ' * 4 * (level + 1)
    for f in files:
        if not f.startswith('.'): # Ignore hidden files
            print(f'{subindent}{f}')
    if level > 1: # Limit depth for listing to avoid long output
        break

# The actual DATA_DIR should be adjusted based on the unzipped structure.
# Assuming the zip extracts into a folder named 'CT_Scan_Dataset' like:
# /content/covid_dataset/CT_Scan_Dataset/COVID
# /content/covid_dataset/CT_Scan_Dataset/Non-COVID
# The DATA_DIR variable is updated in cell 'nPLXoMoN7YCI' to reflect this.

Unzipping /content/archive (3).zip to /content/covid_dataset...
Unzipping complete.
Contents of extracted directory:
covid_dataset/
    non-COVID/
        Non-Covid (1144).png
        Non-Covid (466).png
        Non-Covid (1106).png
        Non-Covid (297).png
        Non-Covid (751).png
        Non-Covid (1103).png
        Non-Covid (36).png
        Non-Covid (430).png
        Non-Covid (931).png
        Non-Covid (1224).png
        Non-Covid (996).png
        Non-Covid (840).png
        Non-Covid (1029).png
        Non-Covid (84).png
        Non-Covid (69).png
        Non-Covid (1070).png
        Non-Covid (322).png
        Non-Covid (613).png
        Non-Covid (44).png
        Non-Covid (118).png
        Non-Covid (806).png
        Non-Covid (73).png
        Non-Covid (1220).png
        Non-Covid (988).png
        Non-Covid (1054).png
        Non-Covid (275).png
        Non-Covid (22).png
        Non-Covid (1015).png
        Non-Covid (715).png
        Non-Covid (720).png
        No

In [28]:
# Augmentations e transforms
train_transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.1, contrast=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
])

# Carregar todo dataset via ImageFolder
full_dataset = datasets.ImageFolder(root=DATA_DIR, transform=train_transform)
class_names = full_dataset.classes
num_classes = len(class_names)
print("Classes:", class_names, "Num classes:", num_classes)

# Split train / val (ex.: 80/20)
val_ratio = 0.2
n_total = len(full_dataset)
n_val = int(n_total * val_ratio)
n_train = n_total - n_val

train_dataset, val_dataset = random_split(
    full_dataset,
    [n_train, n_val],
    generator=torch.Generator().manual_seed(SEED)
)

# Aplica a transformação de validação ao conjunto de validação
# Isso é necessário porque `random_split` herda a transformação do `full_dataset`
# que é `train_transform`. Precisamos aplicar `val_transform` ao `val_dataset`.
val_dataset.dataset.transform = val_transform

print(f"Tamanho train: {len(train_dataset)}, val: {len(val_dataset)}")

train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=NUM_WORKERS,
    pin_memory=True
)

val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
    pin_memory=True
)

Classes: ['COVID', 'non-COVID'] Num classes: 2
Tamanho train: 1985, val: 496


In [31]:
# Model (Transfer Learning com ResNet18)
def build_model(num_classes):
    model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
    # Congelar todos os parâmetros do modelo pré-treinado
    for param in model.parameters():
        param.requires_grad = False

    # Substituir a camada final (fully connected) para nosso número de classes
    num_ftrs = model.fc.in_features
    model.fc = nn.Linear(num_ftrs, num_classes)
    model = model.to(DEVICE)
    return model

model = build_model(num_classes)
print(model)

# Loss e Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.fc.parameters(), lr=LR) # Apenas camada final treinável

# Scheduler para ajuste do Learning Rate
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='min',
    factor=0.1,
    patience=PATIENCE // 2
)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [32]:
# Funções de treino e validação
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    for inputs, labels in tqdm(dataloader, desc="Training"):
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        _, predicted = torch.max(outputs.data, 1)
        total_predictions += labels.size(0)
        correct_predictions += (predicted == labels).sum().item()

    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = correct_predictions / total_predictions
    return epoch_loss, epoch_acc

def validate_epoch(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    with torch.no_grad():
        for inputs, labels in tqdm(dataloader, desc="Validation"):
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total_predictions += labels.size(0)
            correct_predictions += (predicted == labels).sum().item()

    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = correct_predictions / total_predictions
    return epoch_loss, epoch_acc


# Loop de Treino
best_val_loss = float('inf')
epochs_no_improve = 0

history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}

print("Starting training...")
for epoch in range(NUM_EPOCHS):
    print(f"Epoch {epoch+1}/{NUM_EPOCHS}")

    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, DEVICE)
    val_loss, val_acc = validate_epoch(model, val_loader, criterion, DEVICE)

    scheduler.step(val_loss) # Atualiza o learning rate com base na val_loss

    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)

    print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
    print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

    # Checkpoint e Early Stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        epochs_no_improve = 0
        torch.save(model.state_dict(), MODEL_SAVE_PATH)
        print("Model saved! Validation loss improved.")
    else:
        epochs_no_improve += 1
        print(f"Validation loss did not improve for {epochs_no_improve} epoch(s).")
        if epochs_no_improve == PATIENCE:
            print(f"Early stopping triggered after {epoch+1} epochs.")
            break

print("Training complete.")

Starting training...
Epoch 1/25


Training: 100%|██████████| 125/125 [03:16<00:00,  1.57s/it]
Validation: 100%|██████████| 31/31 [00:40<00:00,  1.29s/it]


Train Loss: 0.6007, Train Acc: 0.6866
Val Loss: 0.5307, Val Acc: 0.7460
Model saved! Validation loss improved.
Epoch 2/25


Training: 100%|██████████| 125/125 [03:13<00:00,  1.55s/it]
Validation: 100%|██████████| 31/31 [00:41<00:00,  1.33s/it]


Train Loss: 0.4991, Train Acc: 0.8015
Val Loss: 0.4503, Val Acc: 0.8085
Model saved! Validation loss improved.
Epoch 3/25


Training: 100%|██████████| 125/125 [03:04<00:00,  1.47s/it]
Validation: 100%|██████████| 31/31 [00:39<00:00,  1.27s/it]


Train Loss: 0.4522, Train Acc: 0.8131
Val Loss: 0.4213, Val Acc: 0.8246
Model saved! Validation loss improved.
Epoch 4/25


Training: 100%|██████████| 125/125 [02:55<00:00,  1.41s/it]
Validation: 100%|██████████| 31/31 [00:40<00:00,  1.30s/it]


Train Loss: 0.4276, Train Acc: 0.8282
Val Loss: 0.4008, Val Acc: 0.8306
Model saved! Validation loss improved.
Epoch 5/25


Training: 100%|██████████| 125/125 [02:59<00:00,  1.44s/it]
Validation: 100%|██████████| 31/31 [00:42<00:00,  1.38s/it]


Train Loss: 0.4031, Train Acc: 0.8413
Val Loss: 0.3813, Val Acc: 0.8448
Model saved! Validation loss improved.
Epoch 6/25


Training: 100%|██████████| 125/125 [02:56<00:00,  1.41s/it]
Validation: 100%|██████████| 31/31 [00:41<00:00,  1.32s/it]


Train Loss: 0.4018, Train Acc: 0.8343
Val Loss: 0.3665, Val Acc: 0.8387
Model saved! Validation loss improved.
Epoch 7/25


Training: 100%|██████████| 125/125 [02:57<00:00,  1.42s/it]
Validation: 100%|██████████| 31/31 [00:40<00:00,  1.31s/it]


Train Loss: 0.3827, Train Acc: 0.8418
Val Loss: 0.3515, Val Acc: 0.8548
Model saved! Validation loss improved.
Epoch 8/25


Training:  18%|█▊        | 22/125 [00:32<02:40,  1.56s/it]Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x7e8317ff53a0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/dataloader.py", line 1654, in __del__
    self._shutdown_workers()
  File "/usr/local/lib/python3.12/dist-packages/torch/utils/data/dataloader.py", line 1618, in _shutdown_workers
    w.join(timeout=_utils.MP_STATUS_CHECK_INTERVAL)
  File "/usr/lib/python3.12/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
          ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/multiprocessing/popen_fork.py", line 40, in wait
    if not wait([self.sentinel], timeout):
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/multiprocessing/connection.py", line 1136, in wait
    ready = selector.select(timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/selectors.py", line 415, in se

KeyboardInterrupt: 

In [33]:
# Carregar ResNet18 pré-treinado e adaptar
model = models.resnet18(pretrained=True)
# Substituir a última camada fully-connected
in_features = model.fc.in_features
model.fc = nn.Linear(in_features, num_classes)

model = model.to(DEVICE)

# Se quiser "congelar" as camadas iniciais, descomente:
# for param in model.parameters():
#     param.requires_grad = False
# for param in model.fc.parameters():
#     param.requires_grad = True

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LR)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, verbose=True)




TypeError: ReduceLROnPlateau.__init__() got an unexpected keyword argument 'verbose'

In [None]:
def train_one_epoch(model, loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    running_corrects = 0
    total = 0

    for inputs, labels in tqdm(loader, desc="Train", leave=False):
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs, 1)
        running_corrects += torch.sum(preds == labels.data).item()
        total += inputs.size(0)

    epoch_loss = running_loss / total
    epoch_acc = running_corrects / total
    return epoch_loss, epoch_acc

def validate(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    total = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in tqdm(loader, desc="Val", leave=False):
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * inputs.size(0)
            _, preds = torch.max(outputs, 1)
            running_corrects += torch.sum(preds == labels.data).item()
            total += inputs.size(0)

            all_preds.extend(preds.cpu().numpy().tolist())
            all_labels.extend(labels.cpu().numpy().tolist())

    epoch_loss = running_loss / total
    epoch_acc = running_corrects / total
    return epoch_loss, epoch_acc, all_preds, all_labels


In [None]:
best_val_loss = float('inf')
best_val_acc = 0.0
patience_counter = 0

history = {
    'train_loss': [], 'train_acc': [],
    'val_loss': [], 'val_acc': []
}

for epoch in range(1, NUM_EPOCHS + 1):
    print(f"Epoch {epoch}/{NUM_EPOCHS}")
    train_loss, train_acc = train_one_epoch(model, train_loader, criterion, optimizer, DEVICE)
    val_loss, val_acc, _, _ = validate(model, val_loader, criterion, DEVICE)
    scheduler.step(val_loss)  # opcional

    history['train_loss'].append(train_loss)
    history['train_acc'].append(train_acc)
    history['val_loss'].append(val_loss)
    history['val_acc'].append(val_acc)

    print(f" train_loss: {train_loss:.4f} train_acc: {train_acc:.4f}")
    print(f" val_loss:   {val_loss:.4f} val_acc:   {val_acc:.4f}")

    # Checkpoint by val_loss (mudar se preferir val_acc)
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_val_acc = val_acc
        torch.save({
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'class_names': class_names
        }, MODEL_SAVE_PATH)
        print(" Saved best model.")
        patience_counter = 0
    else:
        patience_counter += 1
        print(f" Patience: {patience_counter}/{PATIENCE}")

    if patience_counter >= PATIENCE:
        print("Early stopping triggered.")
        break

print("Treino finalizado. Best val_loss:", best_val_loss, "best val_acc:", best_val_acc)


In [None]:
# Carregar melhor checkpoint
checkpoint = torch.load(MODEL_SAVE_PATH, map_location=DEVICE)
model.load_state_dict(checkpoint['model_state_dict'])
model.to(DEVICE)
model.eval()

# Rodar validação final para obter previsões
val_loss, val_acc, preds, labels = validate(model, val_loader, criterion, DEVICE)
print("Val loss:", val_loss, "Val acc:", val_acc)

# Relatório
print("\nClassification Report:")
print(classification_report(labels, preds, target_names=class_names))

# Matriz de confusão
cm = confusion_matrix(labels, preds)
print("Confusion Matrix:\n", cm)


In [None]:
# Plots simples
plt.figure(figsize=(12,5))
plt.subplot(1,2,1)
plt.plot(history['train_loss'], label='train_loss')
plt.plot(history['val_loss'], label='val_loss')
plt.legend()
plt.title('Loss')

plt.subplot(1,2,2)
plt.plot(history['train_acc'], label='train_acc')
plt.plot(history['val_acc'], label='val_acc')
plt.legend()
plt.title('Accuracy')

plt.show()


In [None]:
# Salvar classes em arquivo para referência futura
import json
with open("class_names.json", "w") as f:
    json.dump(class_names, f, ensure_ascii=False, indent=2)
print("Class names saved to class_names.json")

print("Pronto. Para testar previsões individuais, carregue imagem, aplique val_transform e execute model(img.unsqueeze(0)).")
