In [None]:
# ===============================
# 1. Imports & device
import os
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split, WeightedRandomSampler
import numpy as np

device = torch.device("cpu")
print("Using device:", device)

# ===============================
# 2. Transformations
IMG_SIZE = 224

train_transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(0.5),
    transforms.RandomRotation(15),  # rotation ±15°
    transforms.RandomAffine(0, translate=(0.1, 0.1), scale=(0.9, 1.1)),  # translation & zoom
    transforms.ColorJitter(brightness=0.2, contrast=0.2),  # légère variation
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor(),
    # Normalisation sera ajoutée après calcul de mean/std
])

val_test_transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.Grayscale(num_output_channels=1),
    transforms.ToTensor(),
    # Normalisation sera ajoutée après calcul de mean/std
])

# ===============================
# 3. Dataset & split
data_dir = "/home/chaouchix/data/chest_xray/train"
full_dataset = datasets.ImageFolder(data_dir, transform=train_transform)

# Calcul de la vraie moyenne et std
loader = DataLoader(full_dataset, batch_size=64, shuffle=False, num_workers=0)
mean = 0.
std = 0.
nb_samples = 0
for data, _ in loader:
    batch_samples = data.size(0)
    data = data.view(batch_samples, data.size(1), -1)
    mean += data.mean(2).sum(0)
    std += data.std(2).sum(0)
    nb_samples += batch_samples
mean /= nb_samples
std /= nb_samples
print("Computed mean:", mean)
print("Computed std:", std)

# Ajouter normalisation aux transformations
train_transform.transforms.append(transforms.Normalize(mean.tolist(), std.tolist()))
val_test_transform.transforms.append(transforms.Normalize(mean.tolist(), std.tolist()))

# Split train/val
val_size = int(0.1 * len(full_dataset))
train_size = len(full_dataset) - val_size
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])
val_dataset.dataset.transform = val_test_transform

# Test dataset
test_dir = "/home/chaouchix/data/chest_xray/test"
test_dataset = datasets.ImageFolder(test_dir, transform=val_test_transform)

# ===============================
# 4. WeightedRandomSampler pour balance des classes
targets = [y for _, y in train_dataset]
class_sample_count = np.bincount(targets)
weights = 1. / class_sample_count
samples_weights = np.array([weights[t] for t in targets])
samples_weights = torch.from_numpy(samples_weights).double()
sampler = WeightedRandomSampler(samples_weights, len(samples_weights))

# ===============================
# 5. DataLoaders
dataloaders = {
    'train': DataLoader(train_dataset, batch_size=32, sampler=sampler, num_workers=0),
    'val': DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=0),
    'test': DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=0),
}

# ===============================
# 6. Info
class_names = full_dataset.classes
dataset_sizes = {'train': len(train_dataset), 'val': len(val_dataset), 'test': len(test_dataset)}

print("Classes:", class_names)
print("Dataset sizes:", dataset_sizes)

# ===============================
# 7. Check a batch
x, y = next(iter(dataloaders['train']))
print("Batch images shape:", x.shape)
print("Batch labels shape:", y.shape)
print("Sample labels:", y[:10])


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class PneumoCNN(nn.Module):
    """
    CNN pour classification Pneumonia vs Normal
    - 5 blocs conv
    - BatchNorm + ReLU
    - Dropout pour régularisation
    - GAP pour réduire les paramètres fully-connected
    """
    def __init__(self, num_classes=2):
        super().__init__()

        def conv_block(in_c, out_c, p_dropout=0.3):
            return nn.Sequential(
                nn.Conv2d(in_c, out_c, kernel_size=3, padding=1, bias=False),
                nn.BatchNorm2d(out_c),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_c, out_c, kernel_size=3, padding=1, bias=False),
                nn.BatchNorm2d(out_c),
                nn.ReLU(inplace=True),
                nn.MaxPool2d(2),
                nn.Dropout2d(p_dropout)
            )

        self.features = nn.Sequential(
            conv_block(1, 32, p_dropout=0.2),    # 224 -> 112
            conv_block(32, 64, p_dropout=0.3),   # 112 -> 56
            conv_block(64, 128, p_dropout=0.3),  # 56 -> 28
            conv_block(128, 256, p_dropout=0.4), # 28 -> 14
            conv_block(256, 512, p_dropout=0.4)  # 14 -> 7
        )

        self.gap = nn.AdaptiveAvgPool2d(1)
        self.dropout = nn.Dropout(0.5)
        self.classifier = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.features(x)
        x = self.gap(x)
        x = torch.flatten(x, 1)
        x = self.dropout(x)
        x = self.classifier(x)
        return x

# Exemple d'instanciation CPU
device = torch.device("cpu")
model = PneumoCNN().to(device)
print(f"Params: {sum(p.numel() for p in model.parameters())/1e6:.2f} M")


In [None]:
from collections import Counter

# Récupérer les labels du train_dataset
train_targets = [y for _, y in train_dataset]
counts = Counter(train_targets)

# Pondérations inverses
total = sum(counts.values())
class_weights = torch.tensor([total/counts[i] for i in range(len(class_names))], dtype=torch.float)

criterion = nn.CrossEntropyLoss(weight=class_weights.to(device))
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)


In [None]:
num_epochs = 5  # tu peux ajuster

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    running_corrects = 0

    for i, (inputs, labels) in enumerate(dataloaders['train']):
        optimizer.zero_grad()
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels)

        # Affichage toutes les 10 batches
        if i % 10 == 0:
            print(f"Epoch {epoch+1}, Batch {i}/{len(dataloaders['train'])} - Loss: {loss.item():.4f}", flush=True)

    epoch_loss = running_loss / dataset_sizes['train']
    epoch_acc = running_corrects.double() / dataset_sizes['train']

    # Évaluation sur le set de validation
    model.eval()
    val_loss = 0.0
    val_corrects = 0
    with torch.no_grad():
        for inputs, labels in dataloaders['val']:
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)
            val_loss += loss.item() * inputs.size(0)
            val_corrects += torch.sum(preds == labels)
    val_loss /= dataset_sizes['val']
    val_acc = val_corrects.double() / dataset_sizes['val']

    print(f"*** Epoch {epoch+1} complete - "
          f"Train Loss: {epoch_loss:.4f}, Acc: {epoch_acc:.4f} | "
          f"Val Loss: {val_loss:.4f}, Acc: {val_acc:.4f} ***")


In [None]:
model.eval()
test_correct, test_total = 0, 0
with torch.no_grad():
    for inputs, labels in dataloaders['test']:
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        test_correct += torch.sum(preds == labels)
        test_total += labels.size(0)

print("Final Test Accuracy:", test_correct.double() / test_total)
