Import des bibliothèques nécessaires

In [None]:
# Importation des bibliothèques nécessaires
import os
import sys
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
from tqdm import tqdm
import time
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, callbacks, applications, metrics
from tensorflow.keras.utils import Sequence
from sklearn.model_selection import train_test_split
import albumentations as A
from albumentations.core.composition import OneOf
import seaborn as sns
import glob
import shutil
from datetime import datetime

# Configuration pour la reproductibilité
SEED = 42
os.environ['PYTHONHASHSEED'] = str(SEED)
random.seed(SEED)
np.random.seed(SEED)
tf.random.set_seed(SEED)

# Vérifier la disponibilité du GPU
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))
print("TensorFlow version:", tf.__version__)

Préparation et exploration des données

In [None]:
# Paramètres globaux
IMAGE_SIZE = (256, 256)  # Taille réduite pour l'entraînement rapide
BATCH_SIZE = 8
NUM_EPOCHS = 30
NUM_CLASSES = 8  # 8 classes pour la segmentation urbaine

# Mapping des classes originales (34 classes) vers 8 classes
# Selon la documentation de Cityscapes
CLASS_MAPPING = {
    0: 7,   # unlabeled -> background
    1: 0,   # ego vehicle -> vehicle
    2: 0,   # rectification border -> vehicle
    3: 7,   # out of roi -> background
    4: 7,   # static -> background
    5: 7,   # dynamic -> background
    6: 7,   # ground -> background
    7: 1,   # road -> road
    8: 2,   # sidewalk -> sidewalk
    9: 7,   # parking -> background
    10: 7,  # rail track -> background
    11: 3,  # building -> building
    12: 7,  # wall -> background
    13: 7,  # fence -> background
    14: 7,  # guard rail -> background
    15: 7,  # bridge -> background
    16: 7,  # tunnel -> background
    17: 4,  # pole -> pole
    18: 7,  # polegroup -> background
    19: 5,  # traffic light -> traffic light
    20: 6,  # traffic sign -> traffic sign
    21: 3,  # vegetation -> building
    22: 7,  # terrain -> background
    23: 7,  # sky -> background
    24: 0,  # person -> vehicle
    25: 0,  # rider -> vehicle
    26: 0,  # car -> vehicle
    27: 0,  # truck -> vehicle
    28: 0,  # bus -> vehicle
    29: 7,  # caravan -> background
    30: 7,  # trailer -> background
    31: 0,  # train -> vehicle
    32: 0,  # motorcycle -> vehicle
    33: 0   # bicycle -> vehicle
}

# Classes finales pour la visualisation
CLASSES = {
    0: "vehicle", 
    1: "road",
    2: "sidewalk",
    3: "building",
    4: "pole",
    5: "traffic light",
    6: "traffic sign",
    7: "background"
}

# Couleurs pour chaque classe
COLORS = {
    0: [0, 0, 142],     # vehicle - bleu foncé
    1: [128, 64, 128],  # road - violet
    2: [244, 35, 232],  # sidewalk - rose
    3: [70, 70, 70],    # building - gris
    4: [153, 153, 153], # pole - gris clair
    5: [250, 170, 30],  # traffic light - orange
    6: [220, 220, 0],   # traffic sign - jaune
    7: [0, 0, 0]        # background - noir
}

# Fonction pour visualiser les images et masques
def visualize_sample(image, mask, title="Sample"):
    """
    Visualise une image et son masque de segmentation
    """
    # Créer une palette de couleurs pour les masques
    colors = np.array([list(COLORS.values())])
    
    # Convertir le masque en image RGB
    mask_rgb = np.zeros((mask.shape[0], mask.shape[1], 3), dtype=np.uint8)
    for class_idx, color in COLORS.items():
        mask_rgb[mask == class_idx] = color
    
    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    plt.imshow(image)
    plt.title(f"{title} - Image")
    plt.axis('off')
    
    plt.subplot(1, 2, 2)
    plt.imshow(mask_rgb)
    plt.title(f"{title} - Mask")
    plt.axis('off')
    
    plt.tight_layout()
    plt.show()

# Fonction pour réorganiser le dataset en respectant l'équilibre des classes
def reorganize_dataset(source_dir, target_dir, test_size=0.2, val_size=0.15):
    """
    Réorganise le dataset en regroupant toutes les images dans un seul dossier
    et en les répartissant en train, val et test en respectant l'équilibre des classes
    """
    # Créer les répertoires cibles s'ils n'existent pas
    os.makedirs(os.path.join(target_dir, 'train', 'images'), exist_ok=True)
    os.makedirs(os.path.join(target_dir, 'train', 'masks'), exist_ok=True)
    os.makedirs(os.path.join(target_dir, 'val', 'images'), exist_ok=True)
    os.makedirs(os.path.join(target_dir, 'val', 'masks'), exist_ok=True)
    os.makedirs(os.path.join(target_dir, 'test', 'images'), exist_ok=True)
    os.makedirs(os.path.join(target_dir, 'test', 'masks'), exist_ok=True)
    
    # Trouver tous les fichiers de masque gt_labelIds
    mask_files = []
    for root, _, files in os.walk(source_dir):
        for file in files:
            if file.endswith('labelIds.png'):
                mask_files.append(os.path.join(root, file))
    
    # Analyse des masques pour déterminer la distribution des classes
    print(f"Total mask files found: {len(mask_files)}")
    
    # Calculer la distribution des classes dans chaque masque
    class_distribution = {}
    for mask_file in tqdm(mask_files, desc="Analyzing masks"):
        mask = cv2.imread(mask_file, cv2.IMREAD_GRAYSCALE)
        
        # Remapper les classes selon notre mapping
        remapped_mask = np.zeros_like(mask, dtype=np.uint8)
        for original, target in CLASS_MAPPING.items():
            remapped_mask[mask == original] = target
            
        # Compter les occurrences de chaque classe
        unique_classes = np.unique(remapped_mask)
        
        # Créer une clé basée sur les classes présentes
        class_key = '_'.join(map(str, sorted(unique_classes)))
        
        if class_key not in class_distribution:
            class_distribution[class_key] = []
        
        class_distribution[class_key].append(mask_file)
    
    # Afficher la distribution des combinaisons de classes
    print("\nDistribution of class combinations:")
    for class_key, files in sorted(class_distribution.items(), key=lambda x: len(x[1]), reverse=True):
        print(f"Classes {class_key}: {len(files)} images")
    
    # Diviser chaque groupe en train, val et test
    train_masks = []
    val_masks = []
    test_masks = []
    
    for class_key, files in class_distribution.items():
        # Premier split: train+val vs test
        train_val, test_split = train_test_split(files, test_size=test_size, random_state=SEED)
        
        # Deuxième split: train vs val
        train_split, val_split = train_test_split(train_val, test_size=val_size/(1-test_size), random_state=SEED)
        
        train_masks.extend(train_split)
        val_masks.extend(val_split)
        test_masks.extend(test_split)
    
    print(f"\nSplit results:")
    print(f"Train: {len(train_masks)} images")
    print(f"Validation: {len(val_masks)} images")
    print(f"Test: {len(test_masks)} images")
    
    # Copier les fichiers vers les répertoires cibles
    def copy_files_to_target(mask_files, target_subdir):
        for mask_file in tqdm(mask_files, desc=f"Copying {target_subdir} files"):
            # Obtenir le nom de fichier sans extension
            filename = os.path.basename(mask_file).replace('_gtFine_labelIds.png', '')
            
            # Trouver l'image correspondante
            img_dir = os.path.dirname(mask_file).replace('gtFine', 'leftImg8bit')
            img_file = os.path.join(img_dir, f"{filename}_leftImg8bit.png")
            
            if os.path.exists(img_file):
                # Copier l'image et le masque
                shutil.copy2(img_file, os.path.join(target_dir, target_subdir, 'images', f"{filename}.png"))
                shutil.copy2(mask_file, os.path.join(target_dir, target_subdir, 'masks', f"{filename}.png"))
            else:
                print(f"Warning: Could not find image for {mask_file}")
    
    copy_files_to_target(train_masks, 'train')
    copy_files_to_target(val_masks, 'val')
    copy_files_to_target(test_masks, 'test')
    
    return len(train_masks), len(val_masks), len(test_masks)

# Exécuter la réorganisation du dataset si nécessaire
# Pour ce notebook, nous supposons que le dataset est déjà réorganisé
# reorganize_dataset('path/to/cityscapes', 'path/to/processed')

Préparation des augmentations de données

In [None]:
# Définir différentes pipelines d'augmentation de données
def get_basic_augmentation():
    """Augmentations de base: flip horizontal et vertical"""
    return A.Compose([
        A.HorizontalFlip(p=0.5),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ])

def get_medium_augmentation():
    """Augmentations moyennes: flips, rotation, brightness/contrast"""
    return A.Compose([
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.2),
        A.RandomRotate90(p=0.2),
        A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ])

def get_advanced_augmentation():
    """Augmentations avancées: ajout de transformations géométriques et d'intensité"""
    return A.Compose([
        A.HorizontalFlip(p=0.5),
        A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=15, p=0.5),
        A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),
        A.OneOf([
            A.GridDistortion(p=1),
            A.ElasticTransform(p=1),
            A.OpticalDistortion(p=1)
        ], p=0.3),
        A.OneOf([
            A.GaussNoise(p=1),
            A.GaussianBlur(p=1),
            A.MotionBlur(p=1),
        ], p=0.2),
        A.CoarseDropout(max_holes=8, max_height=32, max_width=32, p=0.2),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ])

# Visualiser les effets des augmentations
def visualize_augmentations(image, mask, augmentation, n_examples=3):
    """
    Visualise les effets des augmentations sur une image et son masque
    """
    plt.figure(figsize=(15, n_examples*5))
    
    # Image et masque originaux
    plt.subplot(n_examples+1, 2, 1)
    plt.imshow(image)
    plt.title('Original Image')
    plt.axis('off')
    
    # Convertir le masque en image RGB pour la visualisation
    mask_rgb = np.zeros((mask.shape[0], mask.shape[1], 3), dtype=np.uint8)
    for class_idx, color in COLORS.items():
        mask_rgb[mask == class_idx] = color
        
    plt.subplot(n_examples+1, 2, 2)
    plt.imshow(mask_rgb)
    plt.title('Original Mask')
    plt.axis('off')
    
    # Appliquer les augmentations
    for i in range(n_examples):
        # Appliquer l'augmentation
        augmented = augmentation(image=image, mask=mask)
        aug_image, aug_mask = augmented['image'], augmented['mask']
        
        # Convertir l'image augmentée pour l'affichage (dénormaliser)
        if aug_image.dtype == np.float32:
            aug_image_display = (aug_image * 255).astype(np.uint8)
        else:
            aug_image_display = aug_image
        
        # Convertir le masque augmenté en RGB
        aug_mask_rgb = np.zeros((aug_mask.shape[0], aug_mask.shape[1], 3), dtype=np.uint8)
        for class_idx, color in COLORS.items():
            aug_mask_rgb[aug_mask == class_idx] = color
        
        # Afficher l'image augmentée
        plt.subplot(n_examples+1, 2, 2*i+3)
        plt.imshow(aug_image_display)
        plt.title(f'Augmented Image {i+1}')
        plt.axis('off')
        
        # Afficher le masque augmenté
        plt.subplot(n_examples+1, 2, 2*i+4)
        plt.imshow(aug_mask_rgb)
        plt.title(f'Augmented Mask {i+1}')
        plt.axis('off')
    
    plt.tight_layout()
    plt.show()

# Classe pour le générateur de données avec augmentation
class SegmentationDataGenerator(tf.keras.utils.Sequence):
    """Générateur de données pour la segmentation avec support d'augmentation"""
    
    def __init__(self, image_dir, mask_dir, batch_size=8, img_size=(256, 256), 
                 augmentation=None, shuffle=True, remap_classes=True):
        self.image_dir = image_dir
        self.mask_dir = mask_dir
        self.batch_size = batch_size
        self.img_size = img_size
        self.augmentation = augmentation
        self.shuffle = shuffle
        self.remap_classes = remap_classes
        
        # Lister tous les fichiers image
        self.image_files = sorted(glob.glob(os.path.join(image_dir, '*.png')))
        
        # Vérifier qu'il y a des images
        if len(self.image_files) == 0:
            raise ValueError(f"No image files found in {image_dir}")
            
        print(f"Found {len(self.image_files)} images in {image_dir}")
        self.on_epoch_end()
        
    def __len__(self):
        """Renvoie le nombre de batchs par epoch"""
        return int(np.ceil(len(self.image_files) / self.batch_size))
    
    def __getitem__(self, idx):
        """Génère un batch de données"""
        # Sélectionner les fichiers pour ce batch
        batch_image_files = self.image_files[idx * self.batch_size:(idx + 1) * self.batch_size]
        
        # Initialiser les tableaux pour les images et masques
        batch_images = np.zeros((len(batch_image_files), *self.img_size, 3), dtype=np.float32)
        batch_masks = np.zeros((len(batch_image_files), *self.img_size), dtype=np.uint8)
        
        for i, image_file in enumerate(batch_image_files):
            # Charger l'image
            image = cv2.imread(image_file)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            
            # Trouver et charger le masque correspondant
            filename = os.path.basename(image_file)
            mask_file = os.path.join(self.mask_dir, filename)
            mask = cv2.imread(mask_file, cv2.IMREAD_GRAYSCALE)
            
            # Remapper les classes si nécessaire
            if self.remap_classes:
                remapped_mask = np.zeros_like(mask, dtype=np.uint8)
                for original, target in CLASS_MAPPING.items():
                    remapped_mask[mask == original] = target
                mask = remapped_mask
            
            # Redimensionner l'image et le masque
            image = cv2.resize(image, self.img_size[::-1])  # cv2 utilise (width, height)
            mask = cv2.resize(mask, self.img_size[::-1], interpolation=cv2.INTER_NEAREST)
            
            # Appliquer l'augmentation si disponible
            if self.augmentation:
                augmented = self.augmentation(image=image, mask=mask)
                image, mask = augmented['image'], augmented['mask']
            else:
                # Normaliser l'image même sans augmentation
                image = image / 255.0
            
            # Stocker dans les tableaux
            batch_images[i] = image
            batch_masks[i] = mask
        
        # Convertir les masques en one-hot encoding
        batch_masks_one_hot = tf.keras.utils.to_categorical(batch_masks, num_classes=NUM_CLASSES)
        
        return batch_images, batch_masks_one_hot
    
    def on_epoch_end(self):
        """Mélanger les indices à la fin de chaque epoch"""
        if self.shuffle:
            np.random.shuffle(self.image_files)

Définition des modèles

In [None]:
# 1. Modèle UNet Mini
def build_unet_mini(input_shape=(256, 256, 3), num_classes=8):
    """Construit un modèle UNet léger"""
    inputs = tf.keras.layers.Input(shape=input_shape)
    
    # Encoder
    conv1 = tf.keras.layers.Conv2D(32, 3, activation='relu', padding='same')(inputs)
    conv1 = tf.keras.layers.Conv2D(32, 3, activation='relu', padding='same')(conv1)
    pool1 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(conv1)
    
    conv2 = tf.keras.layers.Conv2D(64, 3, activation='relu', padding='same')(pool1)
    conv2 = tf.keras.layers.Conv2D(64, 3, activation='relu', padding='same')(conv2)
    pool2 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(conv2)
    
    conv3 = tf.keras.layers.Conv2D(128, 3, activation='relu', padding='same')(pool2)
    conv3 = tf.keras.layers.Conv2D(128, 3, activation='relu', padding='same')(conv3)
    pool3 = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(conv3)
    
    # Bridge
    conv4 = tf.keras.layers.Conv2D(256, 3, activation='relu', padding='same')(pool3)
    conv4 = tf.keras.layers.Conv2D(256, 3, activation='relu', padding='same')(conv4)
    
    # Decoder
    up5 = tf.keras.layers.UpSampling2D(size=(2, 2))(conv4)
    concat5 = tf.keras.layers.Concatenate()([up5, conv3])
    conv5 = tf.keras.layers.Conv2D(128, 3, activation='relu', padding='same')(concat5)
    conv5 = tf.keras.layers.Conv2D(128, 3, activation='relu', padding='same')(conv5)
    
    up6 = tf.keras.layers.UpSampling2D(size=(2, 2))(conv5)
    concat6 = tf.keras.layers.Concatenate()([up6, conv2])
    conv6 = tf.keras.layers.Conv2D(64, 3, activation='relu', padding='same')(concat6)
    conv6 = tf.keras.layers.Conv2D(64, 3, activation='relu', padding='same')(conv6)
    
    up7 = tf.keras.layers.UpSampling2D(size=(2, 2))(conv6)
    concat7 = tf.keras.layers.Concatenate()([up7, conv1])
    conv7 = tf.keras.layers.Conv2D(32, 3, activation='relu', padding='same')(concat7)
    conv7 = tf.keras.layers.Conv2D(32, 3, activation='relu', padding='same')(conv7)
    
    # Output
    outputs = tf.keras.layers.Conv2D(num_classes, 1, activation='softmax')(conv7)
    
    model = tf.keras.models.Model(inputs, outputs)
    return model

# 2. Modèle VGG16-UNet avec encoder pré-entraîné
def build_vgg16_unet(input_shape=(256, 256, 3), num_classes=8, trainable_encoder=False):
    """Construit un modèle UNet avec un encoder VGG16 pré-entraîné"""
    # Charger le modèle VGG16 pré-entraîné
    base_model = tf.keras.applications.VGG16(
        include_top=False, 
        weights='imagenet',
        input_shape=input_shape
    )
    
    # Figer les poids du modèle de base
    base_model.trainable = trainable_encoder
    
    # Extraire les sorties des couches pour les skip connections
    layer_names = [
        'block1_conv2',   # 64 filtres
        'block2_conv2',   # 128 filtres
        'block3_conv3',   # 256 filtres
        'block4_conv3',   # 512 filtres
        'block5_conv3'    # 512 filtres
    ]
    
    layers = [base_model.get_layer(name).output for name in layer_names]
    
    # Créer le modèle d'extraction de features
    encoder = tf.keras.models.Model(inputs=base_model.input, outputs=layers)
    
    # Entrée
    inputs = tf.keras.layers.Input(shape=input_shape)
    
    # Appliquer l'encoder
    skips = encoder(inputs)
    
    # Bridge est la sortie de l'encoder
    x = skips[-1]
    
    # Decoder avec skip connections
    for i in range(len(skips)-2, -1, -1):
        x = tf.keras.layers.UpSampling2D(2)(x)
        concat = tf.keras.layers.Concatenate()([x, skips[i]])
        
        x = tf.keras.layers.Conv2D(512 // 2**(len(skips)-i-1), 3, activation='relu', padding='same')(concat)
        x = tf.keras.layers.Conv2D(512 // 2**(len(skips)-i-1), 3, activation='relu', padding='same')(x)
    
    # Couche de sortie
    outputs = tf.keras.layers.Conv2D(num_classes, 1, activation='softmax')(x)
    
    model = tf.keras.models.Model(inputs, outputs)
    return model

# 3. Modèle ResNet50-UNet avec encoder pré-entraîné
def build_resnet50_unet(input_shape=(256, 256, 3), num_classes=8, trainable_encoder=False):
    """Construit un modèle UNet avec un encoder ResNet50 pré-entraîné"""
    # Charger le modèle ResNet50 pré-entraîné
    base_model = tf.keras.applications.ResNet50(
        include_top=False,
        weights='imagenet',
        input_shape=input_shape
    )
    
    # Figer les poids du modèle de base
    base_model.trainable = trainable_encoder
    
    # Extraire les sorties des couches pour les skip connections
    layer_names = [
        'conv1_relu',      # 64 filtres
        'conv2_block3_out', # 256 filtres
        'conv3_block4_out', # 512 filtres
        'conv4_block6_out', # 1024 filtres
        'conv5_block3_out'  # 2048 filtres
    ]
    
    layers = [base_model.get_layer(name).output for name in layer_names]
    
    # Créer le modèle d'extraction de features
    encoder = tf.keras.models.Model(inputs=base_model.input, outputs=layers)
    
    # Entrée
    inputs = tf.keras.layers.Input(shape=input_shape)
    
    # Appliquer l'encoder
    skips = encoder(inputs)
    
    # Bridge est la sortie de l'encoder
    x = skips[-1]
    
    # Decoder avec skip connections
    decoder_filters = [1024, 512, 256, 64, 32]  # Réduire progressivement le nombre de filtres
    
    for i in range(len(skips)-2, -1, -1):
        x = tf.keras.layers.UpSampling2D(2)(x)
        x = tf.keras.layers.Conv2D(decoder_filters[len(skips)-i-2], 3, activation='relu', padding='same')(x)
        
        # Skip connection
        concat = tf.keras.layers.Concatenate()([x, skips[i]])
        
        # Double convolution
        x = tf.keras.layers.Conv2D(decoder_filters[len(skips)-i-2], 3, activation='relu', padding='same')(concat)
        x = tf.keras.layers.Conv2D(decoder_filters[len(skips)-i-2], 3, activation='relu', padding='same')(x)
    
    # Dernière couche de upsampling et convolution
    x = tf.keras.layers.UpSampling2D(2)(x)
    x = tf.keras.layers.Conv2D(32, 3, activation='relu', padding='same')(x)
    
    # Couche de sortie
    outputs = tf.keras.layers.Conv2D(num_classes, 1, activation='softmax')(x)
    
    model = tf.keras.models.Model(inputs, outputs)
    return model

Métriques et fonctions de perte

In [None]:
# Définition des métriques et fonctions de perte pour la segmentation

# Fonction IoU (Intersection over Union) / Indice de Jaccard
def jaccard_index(y_true, y_pred, smooth=1e-5):
    """
    Calcule l'indice de Jaccard (IoU) entre les prédictions et les vérités terrain
    """
    y_true_f = tf.reshape(y_true, [-1])
    y_pred_f = tf.reshape(y_pred, [-1])
    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    union = tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) - intersection
    return (intersection + smooth) / (union + smooth)

# Coefficient Dice (F1-score)
def dice_coefficient(y_true, y_pred, smooth=1e-5):
    """
    Calcule le coefficient Dice entre les prédictions et les vérités terrain
    """
    y_true_f = tf.reshape(y_true, [-1])
    y_pred_f = tf.reshape(y_pred, [-1])
    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) + smooth)

# Dice Loss
def dice_loss(y_true, y_pred, smooth=1e-5):
    """
    Calcule la perte Dice
    """
    return 1 - dice_coefficient(y_true, y_pred, smooth)

# Combinaison de BCE et Dice Loss
def bce_dice_loss(y_true, y_pred):
    """
    Combine la perte BCE (Binary Cross Entropy) et la perte Dice
    """
    bce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
    dice = dice_loss(y_true, y_pred)
    return bce + dice

# Fonction de perte pondérée pour gérer le déséquilibre des classes
def weighted_categorical_crossentropy(weights):
    """
    Génère une fonction de perte d'entropie croisée catégorielle pondérée
    pour gérer le déséquilibre des classes
    """
    weights = tf.keras.backend.variable(weights)
    
    def loss(y_true, y_pred):
        # Scale predictions so that the class probabilities of each sample sum to 1
        y_pred /= tf.keras.backend.sum(y_pred, axis=-1, keepdims=True)
        
        # Clip to prevent NaN's and Inf's
        y_pred = tf.keras.backend.clip(y_pred, tf.keras.backend.epsilon(), 1 - tf.keras.backend.epsilon())
        
        # Calculate the weighted cross-entropy
        loss = y_true * tf.keras.backend.log(y_pred) * weights
        loss = -tf.keras.backend.sum(loss, -1)
        return loss
    
    return loss

# Focal Loss pour gérer les classes déséquilibrées
def categorical_focal_loss(gamma=2.0, alpha=None):
    """
    Implémentation de la Focal Loss pour la segmentation multi-classes
    """
    def focal_loss(y_true, y_pred):
        # Scale predictions so that the class probabilities of each sample sum to 1
        y_pred /= tf.keras.backend.sum(y_pred, axis=-1, keepdims=True)
        
        # Clip to prevent NaN's and Inf's
        epsilon = tf.keras.backend.epsilon()
        y_pred = tf.keras.backend.clip(y_pred, epsilon, 1. - epsilon)
        
        # Calculate focal loss
        loss = -y_true * tf.keras.backend.pow(1 - y_pred, gamma) * tf.keras.backend.log(y_pred)
        
        # Apply class weights if provided
        if alpha is not None:
            loss = alpha * loss
            
        return tf.keras.backend.sum(loss, axis=-1)
    
    return focal_loss

# Classe pour le Mean IoU en tant que métrique Keras
class MeanIoU(tf.keras.metrics.Metric):
    def __init__(self, num_classes, name='mean_iou', **kwargs):
        super(MeanIoU, self).__init__(name=name, **kwargs)
        self.num_classes = num_classes
        self.total_iou = self.add_weight('total_iou', initializer='zeros')
        self.count = self.add_weight('count', initializer='zeros')
        
    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.argmax(y_true, axis=-1)
        y_pred = tf.argmax(y_pred, axis=-1)
        
        current_iou = 0
        
        # Calculer IoU pour chaque classe et faire la moyenne
        for i in range(self.num_classes):
            y_true_class = tf.cast(tf.equal(y_true, i), tf.float32)
            y_pred_class = tf.cast(tf.equal(y_pred, i), tf.float32)
            
            intersection = tf.reduce_sum(y_true_class * y_pred_class)
            union = tf.reduce_sum(y_true_class) + tf.reduce_sum(y_pred_class) - intersection
            
            # Éviter la division par zéro
            iou = tf.cond(
                tf.equal(union, 0),
                lambda: tf.constant(1.0),  # Si union=0, IoU=1 (parfait)
                lambda: intersection / union
            )
            
            current_iou += iou
        
        current_iou /= tf.cast(self.num_classes, tf.float32)
        
        self.total_iou.assign_add(current_iou)
        self.count.assign_add(1)
        
    def result(self):
        return self.total_iou / self.count
    
    def reset_state(self):
        self.total_iou.assign(0)
        self.count.assign(0)

Entraînement et évaluation des modèles

In [None]:
# Fonction pour entraîner et évaluer un modèle
def train_and_evaluate(model, model_name, train_gen, val_gen, test_gen, 
                       loss_function, epochs=30, learning_rate=1e-4,
                       metrics=['accuracy', 'MeanIoU'], early_stopping_patience=10):
    """
    Entraîne et évalue un modèle de segmentation
    
    Args:
        model: Modèle Keras à entraîner
        model_name: Nom du modèle pour sauvegarder les checkpoints
        train_gen: Générateur d'entraînement
        val_gen: Générateur de validation
        test_gen: Générateur de test
        loss_function: Fonction de perte à utiliser
        epochs: Nombre maximum d'epochs
        learning_rate: Taux d'apprentissage
        metrics: Liste des métriques à suivre
        early_stopping_patience: Patience pour l'early stopping
    
    Returns:
        history: Historique d'entraînement
        evaluation: Résultats d'évaluation sur l'ensemble test
        training_time: Temps d'entraînement en secondes
    """
    # Configurer les métriques
    metrics_list = []
    if 'accuracy' in metrics:
        metrics_list.append('accuracy')
    if 'MeanIoU' in metrics:
        metrics_list.append(MeanIoU(num_classes=NUM_CLASSES))
    
    # Compiler le modèle
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        loss=loss_function,
        metrics=metrics_list
    )
    
    # Callbacks
    checkpoint_path = f"checkpoints/{model_name}_best.h5"
    os.makedirs(os.path.dirname(checkpoint_path), exist_ok=True)
    
    callbacks = [
        # Sauvegarder le meilleur modèle
        tf.keras.callbacks.ModelCheckpoint(
            checkpoint_path,
            monitor='val_loss',
            save_best_only=True,
            save_weights_only=False,
            mode='min',
            verbose=1
        ),
        # Réduire le taux d'apprentissage quand la validation loss stagne
        tf.keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=5,
            min_lr=1e-6,
            verbose=1
        ),
        # Arrêter l'entraînement si la validation loss ne s'améliore pas
        tf.keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=early_stopping_patience,
            restore_best_weights=True,
            verbose=1
        ),
        # Logging TensorBoard
        tf.keras.callbacks.TensorBoard(
            log_dir=f'logs/{model_name}_{datetime.now().strftime("%Y%m%d-%H%M%S")}',
            histogram_freq=1
        )
    ]
    
    # Entraîner le modèle
    print(f"\nEntraînement du modèle {model_name}...")
    start_time = time.time()
    
    history = model.fit(
        train_gen,
        validation_data=val_gen,
        epochs=epochs,
        callbacks=callbacks,
        verbose=1
    )
    
    training_time = time.time() - start_time
    print(f"\nTemps d'entraînement: {training_time:.2f} secondes")
    
    # Charger le meilleur modèle
    if os.path.exists(checkpoint_path):
        print(f"Chargement du meilleur modèle depuis {checkpoint_path}")
        model = tf.keras.models.load_model(
            checkpoint_path,
            custom_objects={
                'MeanIoU': MeanIoU,
                'dice_coefficient': dice_coefficient,
                'jaccard_index': jaccard_index,
                'dice_loss': dice_loss,
                'bce_dice_loss': bce_dice_loss
            }
        )
    
    # Évaluer le modèle sur l'ensemble de test
    print(f"\nÉvaluation du modèle {model_name} sur l'ensemble de test...")
    evaluation = model.evaluate(test_gen, verbose=1)
    
    eval_result = {}
    for i, metric in enumerate(model.metrics_names):
        eval_result[metric] = evaluation[i]
        print(f"{metric}: {evaluation[i]:.4f}")
    
    return history, eval_result, training_time, model

# Fonction pour visualiser les prédictions du modèle
def visualize_predictions(model, test_gen, num_samples=3):
    """
    Visualise les prédictions du modèle sur quelques échantillons de test
    """
    # Obtenir quelques échantillons de test
    for i in range(min(num_samples, len(test_gen))):
        images, masks_true = test_gen.__getitem__(i)
        
        # Ne prendre que la première image du batch
        image = images[0]
        mask_true = masks_true[0]
        
        # Faire une prédiction
        mask_pred = model.predict(np.expand_dims(image, axis=0))[0]
        
        # Convertir les masques one-hot en masques de classe
        mask_true_class = np.argmax(mask_true, axis=-1)
        mask_pred_class = np.argmax(mask_pred, axis=-1)
        
        # Créer des masques RGB pour la visualisation
        mask_true_rgb = np.zeros((*mask_true_class.shape, 3), dtype=np.uint8)
        mask_pred_rgb = np.zeros((*mask_pred_class.shape, 3), dtype=np.uint8)
        
        for class_idx, color in COLORS.items():
            mask_true_rgb[mask_true_class == class_idx] = color
            mask_pred_rgb[mask_pred_class == class_idx] = color
        
        # Créer une figure pour afficher l'image, la vérité terrain et la prédiction
        plt.figure(figsize=(15, 5))
        
        plt.subplot(1, 3, 1)
        # L'image a été normalisée, la renormaliser pour l'affichage
        plt.imshow(image)
        plt.title('Image')
        plt.axis('off')
        
        plt.subplot(1, 3, 2)
        plt.imshow(mask_true_rgb)
        plt.title('Vérité terrain')
        plt.axis('off')
        
        plt.subplot(1, 3, 3)
        plt.imshow(mask_pred_rgb)
        plt.title('Prédiction')
        plt.axis('off')
        
        plt.tight_layout()
        plt.show()
        
# Fonction pour visualiser l'historique d'entraînement
def plot_training_history(history, model_name):
    """
    Visualise l'historique d'entraînement d'un modèle
    """
    plt.figure(figsize=(15, 5))
    
    # Loss
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title(f'{model_name} - Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)
    
    # Metrics (accuracy or IoU)
    plt.subplot(1, 2, 2)
    
    if 'accuracy' in history.history:
        plt.plot(history.history['accuracy'], label='Training Accuracy')
        plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
        plt.title(f'{model_name} - Accuracy')
        plt.ylabel('Accuracy')
    
    if 'mean_iou' in history.history:
        plt.plot(history.history['mean_iou'], label='Training IoU')
        plt.plot(history.history['val_mean_iou'], label='Validation IoU')
        plt.title(f'{model_name} - Mean IoU')
        plt.ylabel('IoU')
    
    plt.xlabel('Epoch')
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)
    
    plt.tight_layout()
    plt.savefig(f'results/{model_name}_history.png', dpi=300)
    plt.show()

# Fonction pour comparer les performances des différents modèles
def compare_models(results):
    """
    Compare les performances de différents modèles
    
    Args:
        results: Liste de dictionnaires contenant les résultats pour chaque modèle
    """
    # Extraire les noms des modèles et les métriques
    model_names = [result['model_name'] for result in results]
    loss_values = [result['evaluation']['loss'] for result in results]
    
    # Chercher les métriques communes
    metrics = set()
    for result in results:
        metrics.update(result['evaluation'].keys())
    
    metrics = sorted(list(metrics - {'loss'}))
    
    # Créer un DataFrame pour les résultats
    df_results = pd.DataFrame({
        'Model': model_names,
        'Loss': loss_values,
        'Training Time (s)': [result['training_time'] for result in results]
    })
    
    # Ajouter les métriques
    for metric in metrics:
        df_results[metric] = [result['evaluation'].get(metric, float('nan')) for result in results]
    
    # Afficher le tableau des résultats
    print("\n=== Comparaison des modèles ===")
    print(df_results.to_string(index=False))
    
    # Sauvegarder le tableau des résultats
    os.makedirs('results', exist_ok=True)
    df_results.to_csv('results/model_comparison.csv', index=False)
    
    # Créer un graphique comparatif
    plt.figure(figsize=(15, 10))
    
    # Comparer les métriques
    n_metrics = len(metrics) + 1  # +1 pour la loss
    n_cols = 2
    n_rows = (n_metrics + 1) // n_cols
    
    # Loss
    plt.subplot(n_rows, n_cols, 1)
    sns.barplot(x='Model', y='Loss', data=df_results)
    plt.title('Loss')
    plt.xticks(rotation=45)
    plt.grid(True, linestyle='--', alpha=0.7)
    
    # Autres métriques
    for i, metric in enumerate(metrics):
        plt.subplot(n_rows, n_cols, i+2)
        sns.barplot(x='Model', y=metric, data=df_results)
        plt.title(metric)
        plt.xticks(rotation=45)
        plt.grid(True, linestyle='--', alpha=0.7)
    
    # Temps d'entraînement
    plt.subplot(n_rows, n_cols, n_metrics+1)
    sns.barplot(x='Model', y='Training Time (s)', data=df_results)
    plt.title('Training Time (s)')
    plt.xticks(rotation=45)
    plt.grid(True, linestyle='--', alpha=0.7)
    
    plt.tight_layout()
    plt.savefig('results/model_comparison.png', dpi=300)
    plt.show()
    
    # Retourner le meilleur modèle selon la métrique IoU
    if 'mean_iou' in metrics:
        best_idx = df_results['mean_iou'].idxmax()
    elif 'accuracy' in metrics:
        best_idx = df_results['accuracy'].idxmax()
    else:
        best_idx = df_results['Loss'].idxmin()
    
    best_model = {
        'name': model_names[best_idx],
        'model': results[best_idx]['model']
    }
    
    return df_results, best_model

Expérimentations et comparaison des résultats

In [None]:
# Chemin vers le dataset organisé
dataset_path = 'processed_dataset'  # Adaptez selon votre structure de fichiers
img_size = (256, 256)
batch_size = 8

# Créer les générateurs de données avec différentes augmentations
train_gen_basic = SegmentationDataGenerator(
    os.path.join(dataset_path, 'train', 'images'),
    os.path.join(dataset_path, 'train', 'masks'),
    batch_size=batch_size,
    img_size=img_size,
    augmentation=get_basic_augmentation()
)

train_gen_medium = SegmentationDataGenerator(
    os.path.join(dataset_path, 'train', 'images'),
    os.path.join(dataset_path, 'train', 'masks'),
    batch_size=batch_size,
    img_size=img_size,
    augmentation=get_medium_augmentation()
)

train_gen_advanced = SegmentationDataGenerator(
    os.path.join(dataset_path, 'train', 'images'),
    os.path.join(dataset_path, 'train', 'masks'),
    batch_size=batch_size,
    img_size=img_size,
    augmentation=get_advanced_augmentation()
)

val_gen = SegmentationDataGenerator(
    os.path.join(dataset_path, 'val', 'images'),
    os.path.join(dataset_path, 'val', 'masks'),
    batch_size=batch_size,
    img_size=img_size,
    augmentation=None,  # Pas d'augmentation pour la validation
    shuffle=False
)

test_gen = SegmentationDataGenerator(
    os.path.join(dataset_path, 'test', 'images'),
    os.path.join(dataset_path, 'test', 'masks'),
    batch_size=batch_size,
    img_size=img_size,
    augmentation=None,  # Pas d'augmentation pour le test
    shuffle=False
)

# Visualiser les exemples d'augmentation
try:
    # Obtenir un exemple d'image et de masque
    images, masks = train_gen_basic.__getitem__(0)
    image, mask = images[0], np.argmax(masks[0], axis=-1)
    
    # Visualiser les effets des différentes augmentations
    print("Augmentation basique:")
    visualize_augmentations(image, mask, get_basic_augmentation(), n_examples=2)
    
    print("Augmentation moyenne:")
    visualize_augmentations(image, mask, get_medium_augmentation(), n_examples=2)
    
    print("Augmentation avancée:")
    visualize_augmentations(image, mask, get_advanced_augmentation(), n_examples=2)
except Exception as e:
    print(f"Erreur lors de la visualisation des augmentations: {e}")

# Liste pour stocker les résultats
results = []

# Expérience 1: UNet Mini avec augmentation basique et Dice Loss
print("\n=== Expérience 1: UNet Mini avec augmentation basique et Dice Loss ===")
model1 = build_unet_mini(input_shape=(*img_size, 3), num_classes=NUM_CLASSES)
model1.summary()

history1, eval1, time1, model1 = train_and_evaluate(
    model=model1,
    model_name='unet_mini_basic_dice',
    train_gen=train_gen_basic,
    val_gen=val_gen,
    test_gen=test_gen,
    loss_function=dice_loss,
    epochs=30,
    learning_rate=1e-4
)

plot_training_history(history1, 'UNet Mini (Basic Aug, Dice Loss)')
visualize_predictions(model1, test_gen, num_samples=2)

results.append({
    'model_name': 'UNet Mini (Basic Aug, Dice Loss)',
    'evaluation': eval1,
    'training_time': time1,
    'model': model1
})

# Expérience 2: UNet Mini avec augmentation avancée et BCE-Dice Loss
print("\n=== Expérience 2: UNet Mini avec augmentation avancée et BCE-Dice Loss ===")
model2 = build_unet_mini(input_shape=(*img_size, 3), num_classes=NUM_CLASSES)

history2, eval2, time2, model2 = train_and_evaluate(
    model=model2,
    model_name='unet_mini_advanced_bce_dice',
    train_gen=train_gen_advanced,
    val_gen=val_gen,
    test_gen=test_gen,
    loss_function=bce_dice_loss,
    epochs=30,
    learning_rate=1e-4
)

plot_training_history(history2, 'UNet Mini (Advanced Aug, BCE-Dice Loss)')
visualize_predictions(model2, test_gen, num_samples=2)

results.append({
    'model_name': 'UNet Mini (Advanced Aug, BCE-Dice Loss)',
    'evaluation': eval2,
    'training_time': time2,
    'model': model2
})

# Expérience 3: VGG16-UNet avec augmentation moyenne et Focal Loss
print("\n=== Expérience 3: VGG16-UNet avec augmentation moyenne et Focal Loss ===")
model3 = build_vgg16_unet(input_shape=(*img_size, 3), num_classes=NUM_CLASSES)
model3.summary()

# Calculer les poids de classe pour le Focal Loss (optionnel)
class_weights = None  # Vous pouvez calculer des poids basés sur la distribution des classes

history3, eval3, time3, model3 = train_and_evaluate(
    model=model3,
    model_name='vgg16_unet_medium_focal',
    train_gen=train_gen_medium,
    val_gen=val_gen,
    test_gen=test_gen,
    loss_function=categorical_focal_loss(gamma=2.0, alpha=class_weights),
    epochs=30,
    learning_rate=5e-5  # Plus faible pour le modèle pré-entraîné
)

plot_training_history(history3, 'VGG16-UNet (Medium Aug, Focal Loss)')
visualize_predictions(model3, test_gen, num_samples=2)

results.append({
    'model_name': 'VGG16-UNet (Medium Aug, Focal Loss)',
    'evaluation': eval3,
    'training_time': time3,
    'model': model3
})

# Expérience 4: ResNet50-UNet avec augmentation avancée et BCE-Dice Loss
print("\n=== Expérience 4: ResNet50-UNet avec augmentation avancée et BCE-Dice Loss ===")
model4 = build_resnet50_unet(input_shape=(*img_size, 3), num_classes=NUM_CLASSES)
model4.summary()

history4, eval4, time4, model4 = train_and_evaluate(
    model=model4,
    model_name='resnet50_unet_advanced_bce_dice',
    train_gen=train_gen_advanced,
    val_gen=val_gen,
    test_gen=test_gen,
    loss_function=bce_dice_loss,
    epochs=30,
    learning_rate=5e-5  # Plus faible pour le modèle pré-entraîné
)

plot_training_history(history4, 'ResNet50-UNet (Advanced Aug, BCE-Dice Loss)')
visualize_predictions(model4, test_gen, num_samples=2)

results.append({
    'model_name': 'ResNet50-UNet (Advanced Aug, BCE-Dice Loss)',
    'evaluation': eval4,
    'training_time': time4,
    'model': model4
})

# Comparer les modèles
df_comparison, best_model = compare_models(results)

print(f"\nMeilleur modèle: {best_model['name']}")

# Sauvegarder le meilleur modèle
best_model['model'].save(f'best_model_{best_model["name"]}.h5')
print(f"Meilleur modèle sauvegardé sous 'best_model_{best_model['name']}.h5'")

Segmentation optimisée sur de nouvelles images

In [None]:
# Fonction pour segmenter une nouvelle image avec le meilleur modèle
def segment_image(image_path, model, img_size=(256, 256)):
    """
    Segmente une image avec le modèle fourni
    
    Args:
        image_path: Chemin vers l'image à segmenter
        model: Modèle à utiliser pour la segmentation
        img_size: Taille de l'image pour le modèle
    
    Returns:
        Image originale et masque de segmentation
    """
    # Charger l'image
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
    # Redimensionner l'image
    image_resized = cv2.resize(image, img_size[::-1])
    
    # Normaliser l'image
    image_norm = image_resized / 255.0
    
    # Faire la prédiction
    mask_pred = model.predict(np.expand_dims(image_norm, axis=0))[0]
    mask_pred_class = np.argmax(mask_pred, axis=-1)
    
    # Créer un masque RGB pour la visualisation
    mask_pred_rgb = np.zeros((*mask_pred_class.shape, 3), dtype=np.uint8)
    
    for class_idx, color in COLORS.items():
        mask_pred_rgb[mask_pred_class == class_idx] = color
    
    # Afficher l'image et la segmentation
    plt.figure(figsize=(12, 6))
    
    plt.subplot(1, 2, 1)
    plt.imshow(image_resized)
    plt.title('Image originale')
    plt.axis('off')
    
    plt.subplot(1, 2, 2)
    plt.imshow(mask_pred_rgb)
    plt.title('Segmentation')
    plt.axis('off')
    
    plt.tight_layout()
    plt.show()
    
    return image_resized, mask_pred_rgb

# Exemple d'utilisation sur de nouvelles images
test_images = [
    'test_images/image1.jpg',
    'test_images/image2.jpg',
    'test_images/image3.jpg'
]

# Commenter ce bloc si les images de test n'existent pas
for image_path in test_images:
    try:
        print(f"Segmentation de {image_path}:")
        segment_image(image_path, best_model['model'], img_size)
    except Exception as e:
        print(f"Erreur lors de la segmentation de {image_path}: {e}")

Conclusion et perspectives

In [None]:
# Afficher les résultats finaux et les conclusions
print("\n=== Résultats finaux ===")
print(df_comparison)

# Réfléchir aux améliorations possibles
improvements = [
    "Utiliser une résolution d'image plus élevée (512x512 ou plus)",
    "Tester d'autres architectures comme DeepLabV3+, PSPNet, ou HRNet",
    "Implémenter des techniques d'apprentissage par transfert plus avancées",
    "Utiliser des techniques d'augmentation de données plus spécifiques au domaine urbain",
    "Explorer l'optimisation des hyperparamètres avec Optuna ou Ray Tune",
    "Combiner différentes fonctions de perte pour mieux gérer les classes minoritaires",
    "Utiliser l'ensemble learning en combinant plusieurs modèles"
]

print("\n=== Perspectives d'amélioration ===")
for i, improvement in enumerate(improvements, 1):
    print(f"{i}. {improvement}")