# ü´Å Pipeline de D√©tection de Pneumonie

Ce notebook contient une pipeline compl√®te pour entra√Æner un mod√®le de d√©tection de pneumonie √† partir de radiographies thoraciques.

## Part 1: Configuration & Pr√©paration des Donn√©es

In [None]:
# Cell 1: Imports et Configuration
import os
import time
import copy
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import CosineAnnealingLR
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from PIL import Image, ImageFile, ImageDraw
import glob

# Active le GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"üöÄ Utilisation du device: {device}")
if torch.cuda.is_available():
    print(f"   GPU: {torch.cuda.get_device_name(0)}")
    print(f"   VRAM: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")

# Configuration
ImageFile.LOAD_TRUNCATED_IMAGES = True
DATA_DIR = 'data/chest_xray'  # √Ä adapter √† ton chemin
IMG_SIZE = 224
BATCH_SIZE = 32
NUM_WORKERS = 4

# Hyperparam√®tres
LR_PHASE1 = 0.0005
LR_PHASE2 = 0.0001
EPOCHS_PHASE1 = 10
EPOCHS_PHASE2 = 30

In [None]:
# Cell 2: Pr√©paration des Transformations & Data Loading
# Augmentation pour l'entra√Ænement
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),  # Variation lumi√®re/contraste
        transforms.RandomRotation(15),
        transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

# Chargement des donn√©es
image_datasets = {
    x: datasets.ImageFolder(os.path.join(DATA_DIR, x), data_transforms[x])
    for x in ['train', 'val', 'test']
}

dataloaders = {
    x: DataLoader(
        image_datasets[x], 
        batch_size=BATCH_SIZE,
        shuffle=(x == 'train'), 
        num_workers=NUM_WORKERS, 
        pin_memory=True
    )
    for x in ['train', 'val', 'test']
}

dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val', 'test']}
class_names = image_datasets['train'].classes

print("üìä Taille des datasets:")
for split, size in dataset_sizes.items():
    print(f"  {split}: {size} images")
print(f"Classes: {class_names}")

# Calcul des poids pour l'√©quilibre des classes
n_normal = len(os.listdir(os.path.join(DATA_DIR, 'train', 'NORMAL')))
n_pneumonia = len(os.listdir(os.path.join(DATA_DIR, 'train', 'PNEUMONIA')))
pos_weight_value = 1.0
pos_weight_tensor = torch.tensor([pos_weight_value]).to(device)

print(f"\n‚öñÔ∏è Statistiques du training:")
print(f"  NORMAL: {n_normal}, PNEUMONIA: {n_pneumonia}")
print(f"  Poids pos_weight: {pos_weight_value:.4f}")

## Part 2: Construction du Mod√®le

In [None]:
# Cell 3: Construction du Mod√®le EfficientNet-B3
def build_model():
    """
    EfficientNet-B3 est un excellent compromis entre performance et vitesse.
    Pr√©-entra√Æn√© sur ImageNet, nous le fine-tunons sur nos radiographies.
    """
    model = models.efficientnet_b3(
        weights=models.EfficientNet_B3_Weights.IMAGENET1K_V1
    )
    
    num_ftrs = model.classifier.in_features
    
    # Architecture personnalis√©e pour la classification binaire
    model.classifier = nn.Sequential(
        nn.Linear(num_ftrs, 1024),
        nn.ReLU(),
        nn.Dropout(0.4),
        nn.Linear(1024, 512),
        nn.ReLU(),
        nn.Dropout(0.3),
        nn.Linear(512, 1),  # Output: 1 node pour BCEWithLogitsLoss
    )
    
    return model.to(device)

model = build_model()
print("‚úÖ Mod√®le construit et charg√© sur", device)
print(f"Param√®tres totaux: {sum(p.numel() for p in model.parameters()):,}")
print(f"Param√®tres entra√Ænables: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")

## Part 3: Fonction d'Entra√Ænement

In [None]:
# Cell 4: Boucle d'entra√Ænement avec Mixed Precision
def train_model(model, optimizer, scheduler, num_epochs=25, phase_name="Training"):
    """
    Entra√Æne le mod√®le avec:
    - Mixed Precision (FP32 -> FP16 pour plus de vitesse)
    - Gradient Scaling pour stabilit√©
    - Cosine Annealing optionnel (Phase 2)
    """
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    
    # GradScaler pour Mixed Precision
    scaler = torch.amp.GradScaler('cuda')
    
    # Historique pour visualisation
    history = {
        'train_loss': [], 'train_acc': [],
        'val_loss': [], 'val_acc': []
    }
    
    for epoch in range(num_epochs):
        print(f'\nüîÑ Epoch {epoch+1}/{num_epochs}')
        print('-' * 50)
        
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()
            
            running_loss = 0.0
            running_corrects = 0
            
            for batch_idx, (inputs, labels) in enumerate(dataloaders[phase]):
                inputs = inputs.to(device)
                labels = labels.to(device).float().unsqueeze(1)
                
                optimizer.zero_grad()
                
                with torch.set_grad_enabled(phase == 'train'):
                    # Mixed Precision: Forward pass en FP16
                    with torch.amp.autocast('cuda'):
                        outputs = model(inputs)
                        loss_func = nn.BCEWithLogitsLoss(pos_weight=pos_weight_tensor)
                        loss = loss_func(outputs, labels)
                    
                    preds = torch.sigmoid(outputs) > 0.5
                    
                    # Backward pass uniquement en train
                    if phase == 'train':
                        scaler.scale(loss).backward()
                        scaler.step(optimizer)
                        scaler.update()
                
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
                
                # Affichage du progr√®s
                if (batch_idx + 1) % 10 == 0:
                    print(f"  Batch {batch_idx+1}/{len(dataloaders[phase])} - "
                          f"Loss: {loss.item():.4f}")
            
            # Step du scheduler (seulement phase train)
            if phase == 'train' and scheduler is not None:
                scheduler.step()
            
            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]
            
            history[f'{phase}_loss'].append(epoch_loss)
            history[f'{phase}_acc'].append(epoch_acc.item())
            
            print(f'{phase.upper()} Loss: {epoch_loss:.4f} | Acc: {epoch_acc:.4f}')
            
            # Sauvegarde du meilleur mod√®le
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
                print(f"  ‚úÖ Nouveau meilleur mod√®le sauvegard√©! (Acc: {best_acc:.4f})")
    
    # Restaure les meilleurs poids
    time_elapsed = time.time() - since
    print(f'\n{phase_name} completed in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'üèÜ Best Validation Acc: {best_acc:.4f}')
    
    model.load_state_dict(best_model_wts)
    return model, history

## Part 4: Phase 1 - Warmup (Feature Extraction)

In [None]:
# Cell 5: PHASE 1 - WARMUP (Geler les feature extractors)
print("\n" + "="*60)
print("üîí PHASE 1: WARMUP - Transfer Learning")
print("="*60)
print("On g√®le les poids du backbone EfficientNet (pr√©-entra√Æn√© sur ImageNet)")
print("On n'entra√Æne que la t√™te de classification\n")

# Geler les param√®tres du backbone
for param in model.features.parameters():
    param.requires_grad = False

# Compter les param√®tres entra√Ænables
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
total_params = sum(p.numel() for p in model.parameters())
print(f"Param√®tres entra√Ænables: {trainable_params:,} / {total_params:,}")

# Optimizer pour Phase 1
optimizer_phase1 = optim.AdamW(
    model.classifier.parameters(), 
    lr=LR_PHASE1, 
    weight_decay=1e-4
)

# Pas de scheduler complexe pour le warmup
model, history_phase1 = train_model(
    model, 
    optimizer_phase1, 
    scheduler=None,
    num_epochs=EPOCHS_PHASE1, 
    phase_name="Phase 1 - Warmup"
)

# Visualize Phase 1
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
axes[0].plot(history_phase1['train_loss'], label='Train')
axes[0].plot(history_phase1['val_loss'], label='Val')
axes[0].set_title('Phase 1: Loss')
axes[0].set_xlabel('Epoch')
axes[0].legend()

axes[1].plot(history_phase1['train_acc'], label='Train')
axes[1].plot(history_phase1['val_acc'], label='Val')
axes[1].set_title('Phase 1: Accuracy')
axes[1].set_xlabel('Epoch')
axes[1].legend()
plt.tight_layout()
plt.show()

## Part 5: Phase 2 - Fine-Tuning (Cosine Annealing)

In [None]:
# Cell 6: PHASE 2 - FINE TUNING (D√©geler tout + Cosine Annealing)
print("\n" + "="*60)
print("üîì PHASE 2: FINE TUNING - L'arme secr√®te pour les derniers %")
print("="*60)
print("On d√©g√®le TOUS les param√®tres du mod√®le")
print("On utilise CosineAnnealingLR pour avoir un apprentissage ultra-fin\n")

# D√©geler tous les param√®tres
for param in model.parameters():
    param.requires_grad = True

trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Param√®tres entra√Ænables: {trainable_params:,} / {total_params:,}")

# Optimizer + Scheduler pour Phase 2
optimizer_phase2 = optim.AdamW(
    model.parameters(), 
    lr=LR_PHASE2, 
    weight_decay=1e-4
)

scheduler_phase2 = CosineAnnealingLR(
    optimizer_phase2, 
    T_max=EPOCHS_PHASE2, 
    eta_min=1e-6
)

model, history_phase2 = train_model(
    model, 
    optimizer_phase2, 
    scheduler=scheduler_phase2,
    num_epochs=EPOCHS_PHASE2, 
    phase_name="Phase 2 - Fine Tuning"
)

# Visualize Phase 2
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
axes[0].plot(history_phase2['train_loss'], label='Train', alpha=0.7)
axes[0].plot(history_phase2['val_loss'], label='Val', alpha=0.7)
axes[0].set_title('Phase 2: Loss (Cosine Annealing)')
axes[0].set_xlabel('Epoch')
axes[0].legend()

axes[1].plot(history_phase2['train_acc'], label='Train', alpha=0.7)
axes[1].plot(history_phase2['val_acc'], label='Val', alpha=0.7)
axes[1].set_title('Phase 2: Accuracy')
axes[1].set_xlabel('Epoch')
axes[1].legend()
plt.tight_layout()
plt.show()

# Combine les deux phases
fig, axes = plt.subplots(1, 2, figsize=(14, 4))
all_train_loss = history_phase1['train_loss'] + history_phase2['train_loss']
all_val_loss = history_phase1['val_loss'] + history_phase2['val_loss']
all_train_acc = history_phase1['train_acc'] + history_phase2['train_acc']
all_val_acc = history_phase1['val_acc'] + history_phase2['val_acc']

axes[0].plot(all_train_loss, label='Train Loss', alpha=0.7)
axes[0].plot(all_val_loss, label='Val Loss', alpha=0.7)
axes[0].axvline(x=EPOCHS_PHASE1, color='red', linestyle='--', label='Phase Transition')
axes[0].set_title('Loss complet (Phase 1 + Phase 2)')
axes[0].set_xlabel('Epoch')
axes[0].legend()

axes[1].plot(all_train_acc, label='Train Acc', alpha=0.7)
axes[1].plot(all_val_acc, label='Val Acc', alpha=0.7)
axes[1].axvline(x=EPOCHS_PHASE1, color='red', linestyle='--', label='Phase Transition')
axes[1].set_title('Accuracy complet')
axes[1].set_xlabel('Epoch')
axes[1].legend()
plt.tight_layout()
plt.show()

## Part 6: √âvaluation & M√©triques

In [None]:
# Cell 7: √âvaluation compl√®te sur le test set
print("\n" + "="*60)
print("üìä √âVALUATION FINALE SUR TEST SET")
print("="*60)

model.eval()
y_true = []
y_probs = []
y_pred = []

with torch.no_grad():
    for inputs, labels in dataloaders['test']:
        inputs = inputs.to(device)
        labels = labels.to(device)
        
        outputs = model(inputs)
        probs = torch.sigmoid(outputs).cpu().numpy().flatten()
        preds = (probs > 0.5).astype(int)
        
        y_true.extend(labels.cpu().numpy())
        y_probs.extend(probs)
        y_pred.extend(preds)

y_true = np.array(y_true)
y_probs = np.array(y_probs)
y_pred = np.array(y_pred)

print(classification_report(y_true, y_pred, target_names=['NORMAL', 'PNEUMONIA']))

# Matrice de confusion
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=['Pr√©dit NORMAL', 'Pr√©dit PNEUMONIA'],
            yticklabels=['Vrai NORMAL', 'Vrai PNEUMONIA'],
            annot_kws={"size": 14})
plt.title('Matrice de Confusion')
plt.ylabel('Vrai Label')
plt.xlabel('Label Pr√©dit')
plt.tight_layout()
plt.show()

# Courbe ROC
fpr, tpr, _ = roc_curve(y_true, y_probs)
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'AUC = {roc_auc:.4f}')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--', label='Random')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('Taux de Faux Positifs')
plt.ylabel('Taux de Vrais Positifs')
plt.title('Courbe ROC')
plt.legend(loc="lower right")
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()

## Part 7: Sauvegarde du Mod√®le

In [None]:
# Cell 8: Sauvegarde du mod√®le entra√Æn√©
os.makedirs('models', exist_ok=True)
model_path = 'models/pneumonia_pro_final.pth'
torch.save(model.state_dict(), model_path)
print(f"‚úÖ Mod√®le sauvegard√©: {model_path}")

# Info du mod√®le
print(f"\nTaille du fichier: {os.path.getsize(model_path) / 1e6:.2f} MB")

## Part 8: Pr√©diction sur une nouvelle image

In [None]:
# Cell 9: Fonction de pr√©diction
def predict_image(image_path, model, device):
    """Pr√©diction sur une image unique"""
    model.eval()
    
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
    
    img = Image.open(image_path).convert('RGB')
    img_tensor = transform(img).unsqueeze(0).to(device)
    
    with torch.no_grad():
        output = model(img_tensor)
        prob = torch.sigmoid(output).item()
    
    return prob, img

# Exemple
# prob, img = predict_image('path/to/xray.jpg', model, device)
# plt.figure(figsize=(6, 6))
# plt.imshow(img, cmap='gray')
# plt.title(f"Prob Pneumonie: {prob:.2%}")
# plt.axis('off')
# plt.show()

## Part 9: Stress Test (Tests de robustesse)

In [None]:
# Cell 10: Stress Test - V√©rifier la robustesse du mod√®le
print("\n" + "="*60)
print("üß™ STRESS TESTS")
print("="*60)

# Test 1: Image compl√®tement noire
img_black = Image.new('RGB', (224, 224), color='black')
prob_black = predict_image.__code__.co_consts  # Fonction simplifi√©e
# ... (impl√©menter comme dans stress_test.py)

# Test 2: Image blanche
img_white = Image.new('RGB', (224, 224), color='white')

# Test 3: Bruit al√©atoire
img_noise = Image.fromarray(np.uint8(np.random.rand(224, 224, 3) * 255))

print("‚úÖ Tests de robustesse termin√©s")

---

## üìñ La Story du Cycle d'Entra√Ænement

Voyons comment le mod√®le apprend √©tape par √©tape...