<a href="https://colab.research.google.com/github/nicop1709/projet8-cityscapes/blob/main/P8_cityscapes_segmentation_pipeline_no_tfa.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# Projet 8 — Pipeline complet Cityscapes (8 classes) : Entraînement, Évaluation et Prédiction

Version sans `tensorflow-addons` (compat. Colab standard).

- Chargement Cityscapes (chemins Drive)
- Remap → 8 classes
- `tf.data` + augmentations
- Modèles : **U-Net** et **U-Net+MobileNetV2**
- mIoU (callback maison), export **SavedModel**
- Artefacts pour API (config, palette, LUT, metrics)
- Prédiction + visualisation


## 0. Environnement & dépendances

In [None]:

# Pas de tensorflow-addons requis
import os, sys, json, math, time, itertools, random
from pathlib import Path
from pprint import pprint

import numpy as np
import tensorflow as tf
import pandas as pd
print("TensorFlow:", tf.__version__)
print("GPU available:", tf.config.list_physical_devices('GPU'))


TensorFlow: 2.19.0
GPU available: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


## 1. Chemins & configuration

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:

PATH_PROJECT = '/content/drive/MyDrive/AI Engineer/Projets/Projet 8 - Segmentation images voitures autonomes'

FOLDER_TRAINING_IMAGES = PATH_PROJECT + "/P8_Cityscapes_leftImg8bit_trainvaltest/leftImg8bit/train"
FOLDER_TRAINING_MASK   = PATH_PROJECT + "/P8_Cityscapes_gtFine_trainvaltest/gtFine/train"
FOLDER_VALIDATION_IMAGES = PATH_PROJECT + "/P8_Cityscapes_leftImg8bit_trainvaltest/leftImg8bit/val"
FOLDER_VALIDATION_MASK   = PATH_PROJECT + "/P8_Cityscapes_gtFine_trainvaltest/gtFine/val"
FOLDER_TEST_IMAGES = PATH_PROJECT + "/P8_Cityscapes_leftImg8bit_trainvaltest/leftImg8bit/test"
FOLDER_TEST_MASK   = PATH_PROJECT + "/P8_Cityscapes_gtFine_trainvaltest/gtFine/test"

IMG_HEIGHT, IMG_WIDTH = 512, 1024
N_CLASSES = 8
BATCH_SIZE = 4
EPOCHS = 25
SEED = 42

OUT_DIR = Path(PATH_PROJECT) / "artifacts_training"
OUT_DIR.mkdir(parents=True, exist_ok=True)

tf.random.set_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)

print("OUT_DIR:", OUT_DIR)


OUT_DIR: /content/drive/MyDrive/AI Engineer/Projets/Projet 8 - Segmentation images voitures autonomes/artifacts_training


In [None]:
for path in [FOLDER_TRAINING_IMAGES,
             FOLDER_TRAINING_MASK,
             FOLDER_VALIDATION_IMAGES,
             FOLDER_VALIDATION_MASK,
             FOLDER_TEST_IMAGES,
             FOLDER_TEST_MASK
             ]:
  print(os.listdir(path))

['zurich', 'strasbourg', 'weimar', 'tubingen', 'jena', 'aachen', 'bochum', 'dusseldorf', 'darmstadt', 'cologne', 'hamburg', 'krefeld', 'monchengladbach', 'hanover', 'stuttgart', 'ulm', 'erfurt', 'bremen']
['weimar', 'tubingen', 'zurich', 'aachen', 'strasbourg', 'bochum', 'jena', 'dusseldorf', 'ulm', 'darmstadt', 'hamburg', 'cologne', 'monchengladbach', 'stuttgart', 'erfurt', 'hanover', 'krefeld', 'bremen']
['munster', 'frankfurt', 'lindau']
['lindau', 'munster', 'frankfurt']
['bielefeld', 'bonn', 'leverkusen', 'berlin', 'mainz', 'munich']
['leverkusen', 'bielefeld', 'bonn', 'berlin', 'mainz', 'munich']


In [None]:
image_counts = []

# Define the image folders to process
image_folders = {
    "train": FOLDER_TRAINING_IMAGES,
    "val": FOLDER_VALIDATION_IMAGES,
    "test": FOLDER_TEST_IMAGES
}

# Iterate through each dataset split (train, val, test)
for split_name, folder_path in image_folders.items():
    print(f"[INFO]: Processing {split_name} images in {folder_path}")
    # Iterate through each city subfolder in the current split
    for city_folder in os.listdir(folder_path):
        city_folder_full_path = os.path.join(folder_path, city_folder)
        if os.path.isdir(city_folder_full_path):
            # Count the number of image files (assuming .png) in the city folder
            num_images = len([f for f in os.listdir(city_folder_full_path) if f.endswith('.png')])
            image_counts.append({
                "split": split_name,
                "city": city_folder,
                "image_count": num_images
            })

# Create a DataFrame from the collected data
images_per_city_df = pd.DataFrame(image_counts)

print("\nDataFrame of image counts per city and split:")
images_per_city_df

[INFO]: Processing train images in /content/drive/MyDrive/AI Engineer/Projets/Projet 8 - Segmentation images voitures autonomes/P8_Cityscapes_leftImg8bit_trainvaltest/leftImg8bit/train
[INFO]: Processing val images in /content/drive/MyDrive/AI Engineer/Projets/Projet 8 - Segmentation images voitures autonomes/P8_Cityscapes_leftImg8bit_trainvaltest/leftImg8bit/val
[INFO]: Processing test images in /content/drive/MyDrive/AI Engineer/Projets/Projet 8 - Segmentation images voitures autonomes/P8_Cityscapes_leftImg8bit_trainvaltest/leftImg8bit/test

DataFrame of image counts per city and split:


Unnamed: 0,split,city,image_count
0,train,zurich,122
1,train,strasbourg,365
2,train,weimar,142
3,train,tubingen,144
4,train,jena,119
5,train,aachen,174
6,train,bochum,96
7,train,dusseldorf,221
8,train,darmstadt,85
9,train,cologne,154


In [None]:
# Nb of images by split
images_per_city_df.groupby(["split"])['image_count'].sum()


Unnamed: 0_level_0,image_count
split,Unnamed: 1_level_1
test,1525
train,2975
val,500


In [None]:
# Define a dataframe with Pandas
dataset_df = pd.DataFrame()

if os.path.exists(PATH_PROJECT+'/train.csv'):
  dataset_df = pd.read_csv(PATH_PROJECT+"/train.csv",
                         converters={"labels": json.loads, "labels_main": json.loads},index_col="image_id")
  dataset_df.index.name = 'index'
else:
  # Loop over city folder
  for city_folder in os.listdir(FOLDER_TRAINING_MASK):
      print("[INFO]: Processing city folder ", city_folder)

      city_folder_full = os.path.join(FOLDER_TRAINING_MASK, city_folder)

      # Loop in each city folder
      for i, file in enumerate(os.listdir(city_folder_full)):

          # Parse only json file
          if file.endswith("json"):
              print(f"[INFO]: Processing file {i+1} / {len(os.listdir(city_folder_full))}: {file}")
              filename = os.path.join(city_folder_full, file)

              # In each json file, pop labels
              with open(filename) as json_file:
                  data = json.load(json_file)
                  labels = list()
                  labels_main = list()

                  for elt in data["objects"]:
                      for k, v in elt.items():
                          if k == "label":
                              labels.append(v)
                              try:
                                  labels_main.append(name2label[v][3])
                              except:
                                  pass

                  data["labels"] = labels
                  data["labels_count"] = len(labels)

                  data["labels_main"] = labels_main
                  data["labels_main_count"] = len(labels_main)

                  data["city"] = city_folder
                  del data["objects"]

              basename = "_".join(file.split(".")[0].split("_")[:3])

              df_tmp = pd.DataFrame.from_dict(data, orient='index', columns=[basename]).T
              dataset_df = pd.concat([dataset_df, df_tmp])
              (dataset_df
    .assign(labels=lambda d: d["labels"].apply(json.dumps),
            labels_main=lambda d: d["labels_main"].apply(json.dumps))
    .to_csv(PATH_PROJECT+"/train.csv", index=True,index_label="image_id"))

In [None]:
dataset_df

Unnamed: 0_level_0,imgHeight,imgWidth,labels,labels_count,labels_main,labels_main_count,city
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
weimar_000012_000019,1024,2048,"[road, sidewalk, sidewalk, vegetation, sky, fe...",69,"[flat, flat, flat, nature, sky, construction, ...",69,weimar
weimar_000052_000019,1024,2048,"[sky, road, sidewalk, sidewalk, building, buil...",43,"[sky, flat, flat, flat, construction, construc...",43,weimar
weimar_000044_000019,1024,2048,"[sky, road, building, vegetation, static, side...",43,"[sky, flat, construction, nature, void, flat, ...",42,weimar
weimar_000009_000019,1024,2048,"[road, sky, sidewalk, sidewalk, building, vege...",47,"[flat, sky, flat, flat, construction, nature, ...",46,weimar
weimar_000040_000019,1024,2048,"[sky, road, vegetation, building, pole, static...",48,"[sky, flat, nature, construction, object, void...",47,weimar
...,...,...,...,...,...,...,...
bremen_000080_000019,1024,2048,"[sky, road, static, static, static, static, st...",85,"[sky, flat, void, void, void, void, void, natu...",84,bremen
bremen_000076_000019,1024,2048,"[sky, sidewalk, road, building, sidewalk, buil...",76,"[sky, flat, flat, construction, flat, construc...",74,bremen
bremen_000062_000019,1024,2048,"[static, road, ground, pole, parking, car, per...",52,"[void, flat, void, object, flat, vehicle, huma...",51,bremen
bremen_000077_000019,1024,2048,"[sky, road, vegetation, building, vegetation, ...",89,"[sky, flat, nature, construction, nature, void...",87,bremen


## 2. Remapping Cityscapes → 8 classes

In [None]:
import numpy as np
import cv2
# ------------------------------------------------------------------
# Groupes d'IDs Cityscapes (id dans les *_gtFine_labelIds.png)
# regroupés par grande catégorie (8 classes)
# ------------------------------------------------------------------
void = [
    0,  # unlabeled
    1,  # ego vehicle
    2,  # rectification border
    3,  # out of roi
    4,  # static
    5,  # dynamic
    6,  # ground
]

flat = [
    7,  # road
    8,  # sidewalk
    9,  # parking
    10, # rail track
]

construction = [
    11, # building
    12, # wall
    13, # fence
    14, # guard rail
    15, # bridge
    16, # tunnel
]

object = [
    17, # pole
    18, # polegroup
    19, # traffic light
    20, # traffic sign
]

nature = [
    21, # vegetation
    22, # terrain
]

sky = [
    23, # sky
]

human = [
    24, # person
    25, # rider
]

vehicle = [
    26, # car
    27, # truck
    28, # bus
    29, # caravan
    30, # trailer
    31, # train
    32, # motorcycle
    33, # bicycle
    # id = -1 (license plate) n'est pas inclus
]

# ------------------------------------------------------------------
# REMAPPING vers 8 classes :
# 0: void, 1: flat, 2: construction, 3: object,
# 4: nature, 5: sky, 6: human, 7: vehicle
# ------------------------------------------------------------------
REMAPPING = {
    0: void,
    1: flat,
    2: construction,
    3: object,
    4: nature,
    5: sky,
    6: human,
    7: vehicle,
}

# LUT 256 -> 8 classes
lut = np.zeros(256, dtype=np.uint8)
for new_id, src_ids in REMAPPING.items():
    for sid in src_ids:
        lut[sid] = new_id

PALETTE = np.array([
    [  0,   0,   0],   # 0 void
    [128,  64, 128],   # 1 flat        (road)
    [ 70,  70,  70],   # 2 construction (building)
    [220, 220,   0],   # 3 object      (traffic sign style)
    [107, 142,  35],   # 4 nature      (vegetation)
    [ 70, 130, 180],   # 5 sky
    [220,  20,  60],   # 6 human       (person)
    [  0,   0, 142],   # 7 vehicle     (car)
], dtype=np.uint8)

# Noms des 8 classes dans l’ordre de ton mapping
LABELS = [
    "void",
    "flat",
    "construction",
    "object",
    "nature",
    "sky",
    "human",
    "vehicle",
]

# Dossier de sortie pour les fichiers de mapping
OUT_DIR = Path("mappings")  # ou Path(PATH_PROJECT) / "mappings" si tu préfères
OUT_DIR.mkdir(parents=True, exist_ok=True)

# Sauvegarde des métadonnées de classes
with open(OUT_DIR / "labels.json", "w") as f:
    json.dump({"labels": LABELS}, f, indent=2, ensure_ascii=False)

# Sauvegarde de la LUT (256 -> 8) et de la palette (8 x 3)
np.save(OUT_DIR / "lut_256_to_8.npy", lut)
np.save(OUT_DIR / "palette_8x3.npy", PALETTE)

print("Mapping & palette sauvegardés dans", OUT_DIR)

Mapping & palette sauvegardés dans mappings


## 3. Indexation des fichiers & correspondance image/mask

In [None]:
def gather_pairs(images_root, masks_root):
    """
    Associe:
      .../leftImg8bit/<split>/<city>/<frame>_leftImg8bit.png
    avec:
      .../gtFine/<split>/<city>/<frame>_gtFine_labelIds.png
    """
    images_root = Path(images_root)
    masks_root  = Path(masks_root)
    img_paths, mask_paths = [], []

    # parcourt toutes les images *_leftImg8bit.png (toutes villes)
    for img_path in images_root.rglob("*_leftImg8bit.png"):
        if not img_path.is_file():
            continue
        city = img_path.parent.name  # dossier ville
        base = img_path.name[:-len("_leftImg8bit.png")]  # <frame>
        mask_path = masks_root / city / f"{base}_gtFine_labelIds.png"
        if mask_path.exists():
            img_paths.append(str(img_path))
            mask_paths.append(str(mask_path))
        else:
            # fallback (au cas où)
            cand = list((masks_root / city).glob(f"{base}_gtFine_labelIds.png"))
            if cand:
                img_paths.append(str(img_path))
                mask_paths.append(str(cand[0]))

    return img_paths, mask_paths


train_imgs, train_msks = gather_pairs(FOLDER_TRAINING_IMAGES, FOLDER_TRAINING_MASK)
val_imgs,   val_msks   = gather_pairs(FOLDER_VALIDATION_IMAGES, FOLDER_VALIDATION_MASK)
test_imgs,  test_msks  = gather_pairs(FOLDER_TEST_IMAGES,      FOLDER_TEST_MASK)

print("Train pairs:", len(train_imgs))
print("Val   pairs:", len(val_imgs))
print("Test  pairs:", len(test_imgs))

with open(OUT_DIR / "index_train.json", "w") as f:
    json.dump({"images": train_imgs, "masks": train_msks}, f, indent=2)
with open(OUT_DIR / "index_val.json", "w") as f:
    json.dump({"images": val_imgs, "masks": val_msks}, f, indent=2)
with open(OUT_DIR / "index_test.json", "w") as f:
    json.dump({"images": test_imgs, "masks": test_msks}, f, indent=2)


Train pairs: 2975
Val   pairs: 500
Test  pairs: 1525


## 4. Pipeline tf.data (+ augmentations)

In [None]:

AUTOTUNE = tf.data.AUTOTUNE
lut_tf = tf.constant(np.load(str(OUT_DIR / "lut_256_to_8.npy")), dtype=tf.uint8)

def _load_image(image_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_png(img, channels=3)
    img = tf.image.convert_image_dtype(img, tf.float32)
    return img

def _load_mask(mask_path):
    m = tf.io.read_file(mask_path)
    m = tf.image.decode_png(m, channels=1)
    m = tf.squeeze(m, axis=-1)
    m = tf.gather(lut_tf, tf.cast(m, tf.int32))
    m = tf.expand_dims(m, axis=-1)
    return m

def _resize(img, msk):
    img = tf.image.resize(img, (IMG_HEIGHT, IMG_WIDTH), method="bilinear")
    msk = tf.image.resize(msk, (IMG_HEIGHT, IMG_WIDTH), method="nearest")
    return img, msk

def _augment(img, msk):
    if tf.random.uniform(()) > 0.5:
        img = tf.image.flip_left_right(img)
        msk = tf.image.flip_left_right(msk)
    img = tf.image.random_brightness(img, max_delta=0.05)
    img = tf.image.random_contrast(img, 0.9, 1.1)
    img = tf.clip_by_value(img, 0.0, 1.0)
    return img, msk

def _prep_train(img_path, msk_path):
    img = _load_image(img_path)
    msk = _load_mask(msk_path)
    img, msk = _resize(img, msk)
    img, msk = _augment(img, msk)
    return img, tf.cast(msk, tf.int32)

def _prep_eval(img_path, msk_path):
    img = _load_image(img_path)
    msk = _load_mask(msk_path)
    img, msk = _resize(img, msk)
    return img, tf.cast(msk, tf.int32)

def make_ds(imgs, msks, training=True, batch_size=BATCH_SIZE):
    # forcer le type string propre
    imgs = [str(p) for p in imgs]
    msks = [str(p) for p in msks]

    imgs = tf.constant(imgs, dtype=tf.string)
    msks = tf.constant(msks, dtype=tf.string)

    ds = tf.data.Dataset.from_tensor_slices((imgs, msks))
    if training:
        buf = int(min(1000, imgs.shape[0]))
        if buf > 0:
            ds = ds.shuffle(buf, seed=SEED, reshuffle_each_iteration=True)
    ds = ds.map(_prep_train if training else _prep_eval, num_parallel_calls=AUTOTUNE)
    ds = ds.batch(batch_size).prefetch(AUTOTUNE)
    return ds

train_ds = make_ds(train_imgs, train_msks, training=True)
val_ds   = make_ds(val_imgs,   val_msks,   training=False)
test_ds  = make_ds(test_imgs,  test_msks,  training=False)

for batch in train_ds.take(1):
    print("Batch images:", batch[0].shape, "Batch masks:", batch[1].shape)


Batch images: (4, 512, 1024, 3) Batch masks: (4, 512, 1024, 1)


## 5. Pertes & métriques (mIoU callback)

In [None]:

from tensorflow.keras import backend as K

CE_LOSS = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)

class MeanIoUCallback(tf.keras.callbacks.Callback):
    def __init__(self, val_dataset, name="val_mIoU"):
        super().__init__()
        self.val_dataset = val_dataset
        self.name = name

    def on_epoch_end(self, epoch, logs=None):
        miou = tf.keras.metrics.MeanIoU(num_classes=N_CLASSES)
        for x, y in self.val_dataset:
            pred = self.model.predict(x, verbose=0)
            pred_labels = tf.argmax(pred, axis=-1)
            pred_labels = tf.expand_dims(pred_labels, axis=-1)
            miou.update_state(y, pred_labels)
        logs = logs or {}
        logs[self.name] = miou.result().numpy()
        self.model.history.history.setdefault(self.name, []).append(logs[self.name])
        print(f" — {self.name}: {logs[self.name]:.4f}")


## 6. Modèles : U-Net et U-Net-MobileNetV2

In [None]:

from tensorflow.keras import layers, models

def conv_block(x, filters, k=3):
    x = layers.Conv2D(filters, k, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.Conv2D(filters, k, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    return x

def encoder_block(x, filters):
    c = conv_block(x, filters)
    p = layers.MaxPooling2D((2,2))(c)
    return c, p

def decoder_block(x, skip, filters):
    x = layers.Conv2DTranspose(filters, (2,2), strides=(2,2), padding="same")(x)
    x = layers.Concatenate()([x, skip])
    x = conv_block(x, filters)
    return x

def build_unet(input_shape=(IMG_HEIGHT, IMG_WIDTH, 3), num_classes=N_CLASSES, base_filters=64):
    inputs = layers.Input(shape=input_shape)
    c1, p1 = encoder_block(inputs, base_filters)
    c2, p2 = encoder_block(p1, base_filters*2)
    c3, p3 = encoder_block(p2, base_filters*4)
    c4, p4 = encoder_block(p3, base_filters*8)

    bn = conv_block(p4, base_filters*16)

    d1 = decoder_block(bn, c4, base_filters*8)
    d2 = decoder_block(d1, c3, base_filters*4)
    d3 = decoder_block(d2, c2, base_filters*2)
    d4 = decoder_block(d3, c1, base_filters)

    outputs = layers.Conv2D(num_classes, 1, activation="softmax")(d4)
    return models.Model(inputs, outputs, name="UNet")

def build_unet_mobilenetv2(input_shape=(IMG_HEIGHT, IMG_WIDTH, 3), num_classes=N_CLASSES):
    base = tf.keras.applications.MobileNetV2(
        input_shape=input_shape,
        include_top=False,
        weights="imagenet"
    )

    # Couches de skip-connection
    skips = [
        base.get_layer("block_1_expand_relu").output,   # ~ 1/2
        base.get_layer("block_3_expand_relu").output,   # ~ 1/4
        base.get_layer("block_6_expand_relu").output,   # ~ 1/8
        base.get_layer("block_13_expand_relu").output,  # ~ 1/16
    ]
    encoder_output = base.get_layer("block_16_project").output  # ~ 1/32

    x = encoder_output

    # Decoder : 1/32 → 1/16 → 1/8 → 1/4 → 1/2
    filters = [256, 128, 64, 32]
    for i, f in enumerate(filters):
        x = layers.Conv2DTranspose(f, 3, strides=2, padding="same")(x)
        x = layers.Concatenate()([x, skips[-(i+1)]])
        x = layers.Conv2D(f, 3, padding="same", activation="relu")(x)
        x = layers.Conv2D(f, 3, padding="same", activation="relu")(x)

    # Dernier upsampling 1/2 → 1 (512 x 1024)
    x = layers.Conv2DTranspose(16, 3, strides=2, padding="same")(x)
    x = layers.Conv2D(16, 3, padding="same", activation="relu")(x)

    outputs = layers.Conv2D(num_classes, 1, activation="softmax")(x)
    model = models.Model(inputs=base.input, outputs=outputs, name="UNet_MobileNetV2")
    return model

unet = build_unet()
unet_mb = build_unet_mobilenetv2()
unet.summary()
unet_mb.summary()

config = {
    "img_height": IMG_HEIGHT,
    "img_width": IMG_WIDTH,
    "n_classes": N_CLASSES,
    "labels": LABELS,
    "preprocess": "rescale_0_1",
    "color_palette": "palette_8x3.npy"
}
with open(OUT_DIR / "inference_config.json", "w") as f:
    json.dump(config, f, indent=2, ensure_ascii=False)
print("Config d'inférence sauvegardée.")


  base = tf.keras.applications.MobileNetV2(


Config d'inférence sauvegardée.


## 7. Entraînement & suivi mIoU

In [None]:

def compile_model(model, lr=1e-3):
    opt = tf.keras.optimizers.Adam(learning_rate=lr)
    model.compile(optimizer=opt,
                  loss=CE_LOSS,
                  metrics=[tf.keras.metrics.SparseCategoricalAccuracy(name="acc")])
    return model

callbacks_common = [
    tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=2),
    tf.keras.callbacks.CSVLogger(str(OUT_DIR / "training_log.csv"), append=True)
]
print("Train Unet...")
unet = compile_model(unet, lr=1e-3)
miou_cb_unet = MeanIoUCallback(val_ds, name="val_mIoU")
ckpt_unet = tf.keras.callbacks.ModelCheckpoint(
    filepath=str(OUT_DIR / "best_unet.keras"),
    monitor="val_loss",
    save_best_only=True
)
history_unet = unet.fit(
    train_ds, validation_data=val_ds,
    epochs=EPOCHS, callbacks=callbacks_common + [miou_cb_unet, ckpt_unet],
    verbose=1
)

print("Train Unet-MobileNetV2...")
unet_mb = compile_model(unet_mb, lr=1e-3)
miou_cb_unetmb = MeanIoUCallback(val_ds, name="val_mIoU")
ckpt_unetmb = tf.keras.callbacks.ModelCheckpoint(
    filepath=str(OUT_DIR / "best_unet_mobilenetv2.keras"),
    monitor="val_loss",
    save_best_only=True
)
history_unet_mb = unet_mb.fit(
    train_ds, validation_data=val_ds,
    epochs=EPOCHS, callbacks=callbacks_common + [miou_cb_unetmb, ckpt_unetmb],
    verbose=1
)

import pandas as pd
pd.DataFrame(history_unet.history).to_csv(OUT_DIR / "history_unet.csv", index=False)
pd.DataFrame(history_unet_mb.history).to_csv(OUT_DIR / "history_unet_mobilenetv2.csv", index=False)
print("Historiques sauvegardés.")


Epoch 1/25
[1m744/744[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 612ms/step - acc: 0.6969 - loss: 0.9086 — val_mIoU: 0.3640
[1m744/744[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m643s[0m 760ms/step - acc: 0.6970 - loss: 0.9083 - val_acc: 0.7368 - val_loss: 1.0759 - learning_rate: 0.0010 - val_mIoU: 0.3640
Epoch 2/25
[1m744/744[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 171ms/step - acc: 0.8391 - loss: 0.5063 — val_mIoU: 0.4937
[1m744/744[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m171s[0m 229ms/step - acc: 0.8391 - loss: 0.5062 - val_acc: 0.8030 - val_loss: 0.6403 - learning_rate: 0.0010 - val_mIoU: 0.4937
Epoch 3/25
[1m744/744[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 171ms/step - acc: 0.8664 - loss: 0.4263 — val_mIoU: 0.5673
[1m744/744[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m172s[0m 230ms/step - acc: 0.8664 - loss: 0.4263 - val_acc: 0.8324 - val_loss: 0.5326 - learning_rate: 0.0010 - val_mIoU: 0.5673
Epoch 4/25
[1m744/744[0m 

ValueError: Arguments `target` and `output` must have the same shape up until the last dimension: target.shape=(None, 512, 1024), output.shape=(None, 256, 512, 8)

In [None]:
# 2) Compiler SEULEMENT le modèle MobileNet UNet
unet_mb = compile_model(unet_mb, lr=1e-3)

miou_cb_unetmb = MeanIoUCallback(val_ds, name="val_mIoU")
ckpt_unetmb = tf.keras.callbacks.ModelCheckpoint(
    filepath=str(OUT_DIR / "best_unet_mobilenetv2.keras"),
    monitor="val_loss",
    save_best_only=True
)

# 3) Entraîner SEULEMENT unet_mb
history_unet_mb = unet_mb.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    callbacks=callbacks_common + [miou_cb_unetmb, ckpt_unetmb],
    verbose=1
)

Epoch 1/25
[1m744/744[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 162ms/step - acc: 0.7277 - loss: 0.8209 — val_mIoU: 0.1468
[1m744/744[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m285s[0m 240ms/step - acc: 0.7279 - loss: 0.8205 - val_acc: 0.3323 - val_loss: 2.1684 - learning_rate: 0.0010 - val_mIoU: 0.1468
Epoch 2/25
[1m743/744[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 66ms/step - acc: 0.8997 - loss: 0.3447 — val_mIoU: 0.2813
[1m744/744[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m88s[0m 117ms/step - acc: 0.8997 - loss: 0.3447 - val_acc: 0.4119 - val_loss: 2.5521 - learning_rate: 0.0010 - val_mIoU: 0.2813
Epoch 3/25
[1m743/744[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 65ms/step - acc: 0.9132 - loss: 0.2925 — val_mIoU: 0.0632
[1m744/744[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m87s[0m 117ms/step - acc: 0.9131 - loss: 0.2928 - val_acc: 0.1326 - val_loss: 4.6108 - learning_rate: 0.0010 - val_mIoU: 0.0632
Epoch 4/25
[1m743/744[0m [32m

## 8. Sélection du meilleur modèle & export

In [None]:

import pandas as pd

def best_miou(csv_path, key="val_mIoU"):
    p = Path(csv_path)
    if not p.exists():
        return -1.0
    df = pd.read_csv(p)
    return float(df[key].max()) if key in df.columns else -1.0

miou_unet = best_miou(OUT_DIR / "history_unet.csv")
miou_unet_mb = best_miou(OUT_DIR / "history_unet_mobilenetv2.csv")

print("Best val mIoU — UNet:", miou_unet)
print("Best val mIoU — UNet-MobileNetV2:", miou_unet_mb)

if miou_unet_mb > miou_unet:
    best_name = "unet_mobilenetv2"
    best_ckpt = OUT_DIR / "best_unet_mobilenetv2.keras"
    model = tf.keras.models.load_model(best_ckpt, compile=False)
else:
    best_name = "unet"
    best_ckpt = OUT_DIR / "best_unet.keras"
    model = tf.keras.models.load_model(best_ckpt, compile=False)

BEST_EXPORT_DIR = OUT_DIR / f"savedmodel_{best_name}"
tf.keras.models.save_model(model, BEST_EXPORT_DIR, include_optimizer=False, save_format="tf")
print("Export:", BEST_EXPORT_DIR)

with open(OUT_DIR / "best_model.json", "w") as f:
    json.dump({
        "best_model": best_name,
        "checkpoint": str(best_ckpt),
        "export_dir": str(BEST_EXPORT_DIR),
        "val_mIoU_unet": miou_unet,
        "val_mIoU_unet_mobilenetv2": miou_unet_mb
    }, f, indent=2)


## 9. Évaluation test

In [None]:

with open(OUT_DIR / "best_model.json") as f:
    meta = json.load(f)
best_dir = meta["export_dir"]
best_model = tf.keras.models.load_model(best_dir, compile=False)

miou_test = tf.keras.metrics.MeanIoU(num_classes=N_CLASSES)
acc_test = tf.keras.metrics.SparseCategoricalAccuracy()

for x, y in test_ds:
    pred = best_model.predict(x, verbose=0)
    acc_test.update_state(y, pred)
    pred_labels = tf.argmax(pred, axis=-1)
    pred_labels = tf.expand_dims(pred_labels, axis=-1)
    miou_test.update_state(y, pred_labels)

print("Test mIoU:", miou_test.result().numpy())
print("Test Accuracy:", acc_test.result().numpy())

with open(OUT_DIR / "metrics_test.json", "w") as f:
    json.dump({
        "test_mIoU": float(miou_test.result().numpy()),
        "test_acc": float(acc_test.result().numpy())
    }, f, indent=2)
print("Métriques test sauvegardées.")


## 10. Prédiction & visualisation

In [None]:

import matplotlib.pyplot as plt

palette = np.load(str(OUT_DIR / "palette_8x3.npy"))

def colorize_mask(mask_np):
    colored = palette[mask_np]
    return colored.astype(np.uint8)

def predict_image(img_path, model, save_dir=OUT_DIR/"predictions"):
    save_dir = Path(save_dir); save_dir.mkdir(parents=True, exist_ok=True)
    img_raw = tf.io.read_file(img_path)
    img = tf.image.decode_png(img_raw, channels=3)
    img = tf.image.convert_image_dtype(img, tf.float32)
    img_resized = tf.image.resize(img, (IMG_HEIGHT, IMG_WIDTH))
    pred = model.predict(tf.expand_dims(img_resized, 0), verbose=0)[0]
    pred_lbl = tf.argmax(pred, axis=-1).numpy().astype(np.uint8)
    color = colorize_mask(pred_lbl)

    stem = Path(img_path).stem.replace("_leftImg8bit", "")
    out_mask = save_dir / f"{stem}_pred_mask.png"
    out_visu = save_dir / f"{stem}_visu.png"

    tf.keras.utils.save_img(str(out_mask), np.expand_dims(pred_lbl, -1), scale=False)
    tf.keras.utils.save_img(str(out_visu), color, scale=False)

    plt.figure(figsize=(16,6))
    plt.subplot(1,3,1); plt.title("Image"); plt.imshow(img_resized.numpy()); plt.axis("off")
    plt.subplot(1,3,2); plt.title("Mask (labels)"); plt.imshow(pred_lbl, vmin=0, vmax=N_CLASSES-1); plt.axis("off")
    plt.subplot(1,3,3); plt.title("Mask colorisé"); plt.imshow(color); plt.axis("off")
    plt.show()

if len(test_imgs) > 0:
    predict_image(test_imgs[0], best_model)
else:
    print("Pas d'images test indexées.")


## 11. Exports pour l'API

In [None]:

print("Contenu de", OUT_DIR)
for p in sorted(OUT_DIR.iterdir()):
    print(" -", p.name)
