added some small augmentations. I want to change it to skewing instead of what i have now

In [None]:
import tensorflow as tf
import os
from sklearn.model_selection import train_test_split
from skimage.transform import resize
from tensorflow.keras.preprocessing.image import ImageDataGenerator



img_files = [f for f in os.listdir("/kaggle/input/kul-h02a5a-computer-vision-ga2-2024/train/img") if not f.startswith(".")]

n = len(img_files)

train_files, validation_files = train_test_split(img_files, test_size=0.2, random_state=42)


class SegmentationDataset(tf.keras.utils.Sequence):
    def __init__(self, img_folder, seg_folder, file_list, batch_size=32, image_size=(256, 256)):
        self.img_folder = img_folder
        self.seg_folder = seg_folder
        self.batch_size = batch_size
        self.image_size = image_size
        self.img_files = file_list
        self.num_samples = len(self.img_files)
        
        # Initialize the ImageDataGenerator for data augmentation
        self.datagen = ImageDataGenerator(
            width_shift_range=0.2,
            height_shift_range=0.2,
            fill_mode="wrap")
        
    def __len__(self):
        return int(np.ceil(self.num_samples / float(self.batch_size)))
    
    def __getitem__(self, idx):
        batch_img_files = self.img_files[idx * self.batch_size: (idx + 1) * self.batch_size]
        batch_imgs = []
        batch_segs = []
        
        for img_file in batch_img_files:
            img_path = os.path.join(self.img_folder, img_file)
            seg_path = os.path.join(self.seg_folder, img_file)
            
            img = np.load(img_path, allow_pickle=True)
            seg = np.load(seg_path, allow_pickle=True)
            seg = tf.expand_dims(seg, axis=-1)
            
            img = resize(img, self.image_size)
            seg = resize(seg, self.image_size, order=0, mode='reflect', cval=0, clip=True, preserve_range=True, anti_aliasing=False, anti_aliasing_sigma=None)
            
            seg = tf.keras.utils.to_categorical(seg, num_classes=21) if np.max(seg) < 21 else seg
            
            # Apply data augmentation
            params = self.datagen.get_random_transform(img.shape)
            img_aug = self.datagen.apply_transform(img, params)
            seg_aug = self.datagen.apply_transform(seg, params)
            
            batch_imgs.extend([img, img_aug])
            batch_segs.extend([seg, seg_aug])
        
        return np.array(batch_imgs), np.array(batch_segs)


    

img_folder = "/kaggle/input/kul-h02a5a-computer-vision-ga2-2024/train/img/"
seg_folder = "/kaggle/input/kul-h02a5a-computer-vision-ga2-2024/train/seg"
batch_size = 10
image_size = (256, 256)

train_dataset = SegmentationDataset(img_folder, seg_folder, batch_size=batch_size, image_size=image_size, file_list=train_files)
val_dataset = SegmentationDataset(img_folder, seg_folder, batch_size=batch_size, image_size=image_size, file_list=validation_files)

checking the augmentations

In [None]:
import matplotlib.pyplot as plt

# Get a batch of data
images, segments = train_dataset.__getitem__(0)

# Plot the images and their corresponding segments
fig, axs = plt.subplots(batch_size, 2, figsize=(10, 50))

for i in range(batch_size):
    axs[i, 0].imshow(images[i])
    axs[i, 0].title.set_text('Image')
    axs[i, 1].imshow(np.argmax(segments[i], axis=-1))
    axs[i, 1].title.set_text('Segment')

plt.show()


changed the Unet architecture

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, BatchNormalization, Dropout

def unet(input_size=(256,256,3), n_classes=21):
    inputs = Input(input_size)
    
    # Encoder
    conv1 = Conv2D(64, 3, activation='relu', padding='same')(inputs)
    conv1 = BatchNormalization()(conv1)
    conv1 = Dropout(0.1)(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = Conv2D(128, 3, activation='relu', padding='same')(pool1)
    conv2 = BatchNormalization()(conv2)
    conv2 = Dropout(0.1)(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    # Bottleneck
    conv3 = Conv2D(256, 3, activation='relu', padding='same')(pool2)
    conv3 = BatchNormalization()(conv3)
    conv3 = Dropout(0.1)(conv3)

    # Decoder
    up4 = concatenate([UpSampling2D(size=(2, 2))(conv3), conv2], axis=-1)
    conv4 = Conv2D(128, 3, activation='relu', padding='same')(up4)
    conv4 = BatchNormalization()(conv4)
    conv4 = Dropout(0.1)(conv4)

    up5 = concatenate([UpSampling2D(size=(2, 2))(conv4), conv1], axis=-1)
    conv5 = Conv2D(64, 3, activation='relu', padding='same')(up5)
    conv5 = BatchNormalization()(conv5)
    conv5 = Dropout(0.1)(conv5)

    # Output layer
    conv6 = Conv2D(n_classes, 1, activation='softmax')(conv5)

    model = Model(inputs=inputs, outputs=conv6)

    return model


getting the class weights for the generalized dice loss. This is identical to Perrine's

In [None]:
# Perrine: Compute class weights
train_images, train_masks = train_dataset[0]

class_weights = np.concatenate((np.arange(21), np.zeros(21)),axis=None).reshape((2, 21)).T
#class_weights = np.concatenate((np.arange(20), np.zeros(20)),axis=None).reshape((2, 20)).T  # for no bg

n = 0

# For each image in the batch
for i in range(len(train_images)):
    mask = train_masks[i]
    mask = np.argmax(mask, axis=-1)

    unique, counts = np.unique(mask, return_counts=True)
    #unique = unique[1:] # for no bg
    #counts = counts[1:] # for no bg
    sum = np.sum(counts)

    weights = np.asarray((unique, counts/sum*100)).T
    
    for weight in weights:
        class_id = int(weight[0])  # Get the class ID
        count_percentage = weight[1]  # Get the count/percentage
        
        # Find the row in `empty_class` with the matching class ID
        row_index = np.where(class_weights[:, 0] == class_id)[0][0]
        #row_index = np.where(class_weights[:, 0] == class_id-1)[0][0]   # for no bg
        
        # Update the second column with the new count/percentage
        class_weights[row_index, 1] += count_percentage

    n +=1

class_weights[:,1] = class_weights[:,1]/n

final_weights = class_weights[:,1]/100
#final_weights_nobg = class_weights[:,1]/100 # for no bg

In [None]:
import tensorflow as tf
import keras.backend as K

def generalized_dice_loss(y_true, y_pred, init_weights=final_weights[1:]):
    # Number of classes
    num_classes = tf.shape(y_pred)[-1]
    
    # Flatten predictions and labels
    y_true_flat = tf.reshape(y_true, (-1, num_classes))
    y_pred_flat = tf.reshape(y_pred, (-1, num_classes))
    
    # Compute class weights based on the frequency of each class
    class_weights = 1.0 / (tf.reduce_sum(y_true_flat, axis=0) ** 2)
    class_weights = tf.where(tf.math.is_finite(class_weights), class_weights, 1e-10)
    print(class_weights)
    
    # Compute Dice score for each class
    numerator = 2.0 * tf.reduce_sum(y_true_flat * y_pred_flat, axis=0)
    denominator = tf.reduce_sum(y_true_flat + y_pred_flat, axis=0)
    class_dice_scores = (numerator + 1e-10) / (denominator + 1e-10)
    
    # Multiply Dice scores by class weights
    weighted_dice_scores = class_weights * class_dice_scores

    # Compute Generalized Dice Loss
    dice_loss = 1.0 - tf.reduce_sum(weighted_dice_scores) / (tf.reduce_sum(class_weights) + 1e-10)
    
    # Penalize incorrect predictions for foreground classes
    fg_penalty = 1.0 - class_dice_scores[1:]  # Exclude background class
    foreground_loss = tf.reduce_mean(fg_penalty)
    #foreground_loss = tf.reduce_mean(init_weights*fg_penalty)
    
    # Combine Generalized Dice Loss with foreground penalty
    total_loss = dice_loss + 0.5 * foreground_loss
    
    return total_loss


In [None]:
from keras import metrics
from tensorflow.keras import optimizers

optimizer = optimizers.Adam(learning_rate=0.001)

Unet = unet((256, 256, 3), 21)  # Assuming 20 classes and image dimensions are 256x256, in RGB 
Unet.compile(optimizer='adam', loss=generalized_dice_loss, metrics=[metrics.Precision(), metrics.Recall(), metrics.AUC()])

fitting the model. Pay attention to the patience in early stopping, I set this to 100 so it doesn't stop training after 4 epochs

In [None]:
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.callbacks import ModelCheckpoint


# Define the early stopping criteria
early_stop = EarlyStopping(monitor='val_loss', patience=100)  # stop training when 'val_loss' has stopped improving for 3 epochs


# Define the model checkpoint criteria
model_checkpoint = ModelCheckpoint('best_model.keras', monitor='val_loss', save_best_only=True)  # save only the best model to 'best_model.h5'

# Add the model checkpoint callback to the fit function along with early stopping
unet_history = Unet.fit(train_dataset, validation_data=val_dataset, epochs=100, callbacks=[early_stop, model_checkpoint])


In [None]:
# Select 5 random batches from the training set
indices = np.random.choice(len(train_dataset), size=5)

for i in indices:
    # Get a batch of images and masks from the training set
    images, true_masks = train_dataset[i]
    
    # Select a random image and mask from the batch
    idx = np.random.choice(images.shape[0])
    image = images[idx]
    true_mask = true_masks[idx]
    
    # Use the model to predict the mask
    pred_mask = Unet.predict(image[np.newaxis, ...])[0]
    
    # Convert the predicted mask to binary predictions
    pred_mask = np.argmax(pred_mask, axis=-1)
    
    # Print the unique classes predicted by the model
    unique_classes = np.unique(pred_mask)
    print(f'Unique classes predicted: {unique_classes}')
    
    # Plot the image, the true mask, and the predicted mask
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    axes[0].imshow(image)
    axes[0].set_title('Image')
    axes[1].imshow(np.argmax(true_mask, axis=-1), cmap='nipy_spectral')  # Use a colormap with distinct colors
    axes[1].set_title('True Mask')
    axes[2].imshow(pred_mask, cmap='nipy_spectral')  # Use the same colormap for the predicted mask
    axes[2].set_title('Predicted Mask')
    plt.show()
