In [1]:
import os
import numpy as np
import cv2
import glob
from sklearn.model_selection import train_test_split


In [2]:
# Define directories
vv_dir = '/home/maelh/Downloads/NASA_database/vv/'
vh_dir = '/home/maelh/Downloads/NASA_database/vh/'
water_body_dir = '/home/maelh/Downloads/NASA_database/water_body_label/'
flood_label_dir = '/home/maelh/Downloads/NASA_database/flood_label/'
preprocessed_dir = '../preprocessed_data/'


In [3]:


def load_images_from_folder(folder, label_folder):
    # Initialiser les tableaux NumPy avec la taille requise
    num_images = len(glob.glob(os.path.join(folder, '*.png')))
    images = np.zeros((num_images, 256, 256), dtype=np.uint8)
    labels = np.zeros((num_images, 256, 256), dtype=np.uint8)

    for i, filename in enumerate(glob.glob(os.path.join(folder, '*.png'))):
        # Charger l'image en uint8
        img = cv2.imread(filename, cv2.IMREAD_GRAYSCALE)
        if img is None:
            print(f"Erreur de chargement de l'image: {filename}")
            continue  # Passer à l'image suivante
        
        img = cv2.resize(img, (256, 256))  # Redimensionner
        images[i] = img  # Remplir le tableau NumPy
        
        # Charger le label correspondant
        label_name = os.path.basename(filename).replace('_vv.png', '.png')
        label_path = os.path.join(label_folder, label_name)
        label_img = cv2.imread(label_path, cv2.IMREAD_GRAYSCALE)
        if label_img is None:
            print(f"Erreur de chargement du label: {label_path}")
            continue  # Passer au label suivant
        
        label_img = cv2.resize(label_img, (256, 256))
        binary_label = (label_img > 0).astype(np.uint8)
        labels[i] = binary_label  # Remplir le tableau NumPy

    # Retirer les images et labels qui n'ont pas pu être chargés
    valid_indices = np.where(images.sum(axis=(1, 2)) > 0)[0]
    return images[valid_indices], labels[valid_indices]

# Exemple d'utilisation



In [4]:
images, labels = load_images_from_folder(vv_dir,water_body_dir)

X_train, X_val, y_train, y_val = train_test_split(images, labels, test_size=0.2, random_state=42)

In [5]:
import tensorflow as tf
from tensorflow.keras import layers, models

def unet_model_with_water_body(input_shape=(256, 256, 1)):
    """
    Build a U-Net model that takes in VV, VH polarization images and 
    the water body label as input to predict the flooded areas.
    
    Input: (VV, VH, Water Body Label) -> 3 channels
    Output: Flooded area segmentation -> 1 channel
    """

    inputs = layers.Input(shape=input_shape)

    # Encoder
    c1 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    c1 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(c1)
    p1 = layers.MaxPooling2D((2, 2))(c1)

    c2 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(p1)
    c2 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(c2)
    p2 = layers.MaxPooling2D((2, 2))(c2)

    # Decoder
    u1 = layers.Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(p2)
    u1 = layers.concatenate([u1, c2])
    c3 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(u1)
    c3 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(c3)

    u2 = layers.Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(c3)
    u2 = layers.concatenate([u2, c1])
    c4 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(u2)
    c4 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(c4)

    # Output layer (flooded area prediction)
    outputs = layers.Conv2D(1, (1, 1), activation='sigmoid')(c4)

    # Compile the model
    model = models.Model(inputs=[inputs], outputs=[outputs])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    return model

# Example usage:
# The input shape is (256, 256, 3) where we have 3 channels (VV, VH, Water Body Label)
model = unet_model_with_water_body()
model.summary()


2024-10-09 10:16:00.987480: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-10-09 10:16:00.997461: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-10-09 10:16:01.096740: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-10-09 10:16:01.180743: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-10-09 10:16:01.261943: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been 

In [6]:
def iou_metric(y_true, y_pred, threshold=0.5):
    """
    Calculate Intersection over Union (IoU) between ground truth (y_true) and predicted (y_pred) masks.
    
    Arguments:
    y_true -- Ground truth binary mask (e.g., flood label).
    y_pred -- Predicted binary mask (output from the model).
    threshold -- Threshold to convert the predicted probabilities into binary output.
    
    Returns:
    IoU score.
    """

    # Convert predicted probabilities to binary (0 or 1) based on the threshold
    y_pred_binary = (y_pred > threshold).astype(np.uint8)

    # Compute intersection and union
    intersection = np.logical_and(y_true, y_pred_binary)
    union = np.logical_or(y_true, y_pred_binary)

    # Calculate IoU
    iou_score = np.sum(intersection) / np.sum(union)
    return iou_score

In [7]:
# Train the U-Net model
model.fit(X_train, y_train, epochs=20, batch_size=2, validation_data=(X_val, y_val))

Epoch 1/20




[1m222/570[0m [32m━━━━━━━[0m[37m━━━━━━━━━━━━━[0m [1m10:57[0m 2s/step - accuracy: 0.9555 - loss: 0.5605

KeyboardInterrupt: 