<center><img src="picture.jpg" width="600" height="500" /></center>

In [1]:
import tensorflow as tf
import random
import numpy as np
from tensorflow.keras import layers
from tensorflow.keras.layers import concatenate, Input, Reshape
from tensorflow.keras.models import Model
from tensorflow.keras import regularizers
import tensorflow as tf
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import Conv2D  
from tensorflow import keras
from tensorflow.keras import backend as K 
import os
import shutil
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint


# Load data

In [2]:
# Define width, height, and channel variables
width, height, channel = 64, 64, 12

# Define the directory path for training data and list files in that directory
all_files_loc_train = 'E:/Deep Course/Weeks/W11/Data/Train(without Uncertainty)/'
all_files_train = os.listdir(all_files_loc_train)

# Define the directory path for testing data and list files in that directory
all_files_loc_test = 'E:/Deep Course/Weeks/W11/Data/Test(without Uncertainty)/'
all_files_test = os.listdir(all_files_loc_test)

# Create a dictionary to map input file names to corresponding label file names for training data
image_label_map = {
    "input_file_{}.npy".format(i + 1): "label_file_{}.npy".format(i + 1)
    for i in range(int(len(all_files_train) / 2))
}

# Create a list of training data file names that contain "input" in their names
partition_train = [item for item in all_files_train if "input" in item]

# Create a dictionary to map input file names to corresponding label file names for testing data
image_label_map_val = {
    "input_file_{}.npy".format(i + 1): "label_file_{}.npy".format(i + 1)
    for i in range(int(len(all_files_test) / 2))
}

# Create a list of testing data file names that contain "input" in their names
partition_val = [item for item in all_files_test if "input" in item]

# Print information about the lengths of data sets and partitions
print('Val Len:', len(all_files_test))
print('Len Val:', len(partition_val))
print('Train Len:', len(all_files_train))
print('Len Train:', len(partition_train))

Val Len: 1370
Len Val: 685
Train Len: 19236
Len Train: 9618


In [3]:
# Define a custom data generator class for training data
class DataGenerator(tf.keras.utils.Sequence):
    def __init__(self, list_examples, batch_size=4, dim=(width, height, channel), shuffle=True):
        # Constructor of the data generator.
        self.dim = dim
        self.batch_size = batch_size
        self.list_examples = list_examples
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        # Denotes the number of batches per epoch
        return int(np.floor(len(self.list_examples) / self.batch_size))

    def __getitem__(self, index):
        # Generate one batch of data
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]

        # Find list of IDs
        list_IDs_temp = [self.list_examples[k] for k in indexes]

        # Generate data
        X, y = self.__data_generation(list_IDs_temp)

        return X, y

    def on_epoch_end(self):
        # This function is called at the end of each epoch.
        self.indexes = np.arange(len(self.list_examples))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __data_generation(self, list_IDs_temp):
        # Load individual numpy arrays and aggregate them into a batch.

        X = np.empty([self.batch_size, self.dim[0], self.dim[1], self.dim[2]], dtype='float32')

        # y is a one-hot encoded vector.
        y = np.empty([self.batch_size, width, height, 1], dtype=np.float32)

        # Generate data.

        c = 0
        for i in list_IDs_temp:

            x_file_path = os.path.join(all_files_loc_train, i)
            y_file_path = os.path.join(all_files_loc_train, image_label_map.get(i))

            # Load sample
            X[c, :, :, :] = np.load(x_file_path)
            # Load labels     
            y[c, :, :, :] = np.load(y_file_path)

            c = c + 1

        return X, y

# Define a custom data generator class for validation data (similar structure to the training data generator)
class ValDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, list_examples, batch_size=4, dim=(width, height, channel), shuffle=True):
        # Constructor of the data generator.
        self.dim = dim
        self.batch_size = batch_size
        self.list_examples = list_examples
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        # Denotes the number of batches per epoch
        return int(np.floor(len(self.list_examples) / self.batch_size))

    def __getitem__(self, index):
        # Generate one batch of data
        indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]

        # Find list of IDs
        list_IDs_temp = [self.list_examples[k] for k in indexes]

        # Generate data
        X, y = self.__data_generation(list_IDs_temp)

        return X, y

    def on_epoch_end(self):
        # This function is called at the end of each epoch.
        self.indexes = np.arange(len(self.list_examples))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __data_generation(self, list_IDs_temp):
        # Load individual numpy arrays and aggregate them into a batch.

        X = np.empty([self.batch_size, self.dim[0], self.dim[1], self.dim[2]], dtype=np.float32)

        # y is a one-hot encoded vector.
        y = np.empty([self.batch_size, width, height, 1], dtype=np.float32)

        # Generate data.

        c = 0
        for i in list_IDs_temp:

            x_file_path = os.path.join(all_files_loc_test, i)
            y_file_path = os.path.join(all_files_loc_test, image_label_map_val.get(i))

            # Load sample
            X[c, :, :, :] = np.load(x_file_path)
            # Load labels     
            y[c, :, :, :] = np.load(y_file_path)

            c = c + 1

        return X, y

# Create instances of the custom data generators for training and validation data
training_generator = DataGenerator(partition_train)
validation_generator = ValDataGenerator(partition_val)

In [4]:
# Define constants ALPHA and BETA
ALPHA = 0.7
BETA = 0.3

# Define TverskyLoss function
def TverskyLoss(targets, inputs, alpha=ALPHA, beta=BETA, smooth=1e-6):
    """
    Tversky Loss function for semantic segmentation.
    :param targets: Ground truth labels
    :param inputs: Predicted labels
    :param alpha: Weight for false positives
    :param beta: Weight for false negatives
    :param smooth: Smoothing term to prevent division by zero
    :return: Tversky loss value
    """
    # Flatten label and prediction tensors
    inputs = K.flatten(inputs)
    targets = K.flatten(targets)
    
    # Calculate True Positives, False Positives & False Negatives
    TP = K.sum((inputs * targets))
    FP = K.sum(((1 - targets) * inputs))
    FN = K.sum((targets * (1 - inputs)))
    
    # Calculate Tversky score
    Tversky = (TP + smooth) / (TP + alpha * FP + beta * FN + smooth)
    
    # Return Tversky loss
    return (1 - Tversky)

# Define Intersection over Union (IoU) function
def iou(y_true, y_pred, smooth=1):
    """
    Intersection over Union (IoU) metric for semantic segmentation.
    :param y_true: Ground truth labels
    :param y_pred: Predicted labels
    :param smooth: Smoothing term to prevent division by zero
    :return: IoU score
    """
    intersection = K.sum(y_true * y_pred)
    sum_ = K.sum(y_true + y_pred)
    jac = (intersection + smooth) / (sum_ - intersection + smooth)
    return jac

# Define Jaccard distance function
def jac_distance(y_true, y_pred):
    """
    Jaccard Distance (1 - IoU) metric for semantic segmentation.
    :param y_true: Ground truth labels
    :param y_pred: Predicted labels
    :return: Jaccard distance
    """
    y_truef = K.flatten(y_true)
    y_predf = K.flatten(y_pred)
    
    return -iou(y_true, y_pred)


In [5]:
# Define the channel attention block
def channel_attention(input_tensor):
    # Get the number of channels in the input tensor
    channels = input_tensor.shape[-1]

    # Print the shape of the input tensor
    print('input Tensor shape: ', input_tensor.shape)

    # Global average pooling to get channel-wise information
    gap = tf.reduce_mean(input_tensor, axis=[1, 2], keepdims=True)
    print('gap:', gap.shape)

    # Fully connected layers to compute channel-wise attention weights
    fc1 = layers.Dense(100, activation='relu')(gap)
    print('fc1:', fc1.shape)
    fc2 = layers.Dense(channels, activation='sigmoid')(fc1)
    print('fc2:', fc2.shape)

    # Reshape the attention weights and scale the input tensor
    attention=Reshape((1,1,channels))(fc2)
    
    # attention = tf.reshape(fc2, (1, 1, 1, channels))
    print('attention:', attention.shape)
    scaled_input = input_tensor * attention
    print('scaled_input:', scaled_input.shape)

    return scaled_input

# Define a custom convolutional neural network model
def Model_(input_shape):
    # Define the input layer with the specified input shape
    input_ = Input(input_shape)

    # First set of convolutional layers
    x1 = Conv2D(64, (3, 3), activation='relu', padding='same', kernel_regularizer=regularizers.l2(.00001))(input_)
    x1 = Conv2D(128, (3, 3), kernel_regularizer=regularizers.l2(.00001), padding='same')(x1)
    print('x1 shape: ',x1.shape)

    # Define a series of convolutional layers with different dilation rates
    x2 = Conv2D(64, (3, 3), dilation_rate=3, activation='relu', padding='same')(x1)
    x3 = Conv2D(66, (3, 3), dilation_rate=6, activation='relu', padding='same')(x1)
    x4 = Conv2D(64, (3, 3), dilation_rate=12, activation='relu', padding='same')(x1)
    x5 = Conv2D(64, (3, 3), dilation_rate=18, activation='relu', padding='same')(x1)
    x6 = Conv2D(64, (3, 3), dilation_rate=1, activation='relu', padding='same')(x1)

    # Concatenate the output feature maps from the convolutional layers
    x6 = concatenate([x6, x5, x2, x3, x4])
    print('concatenate: ',x6.shape)

    # Apply channel attention to the concatenated feature maps
    channel_out = channel_attention(x6)
    print('channel_out:', channel_out.shape)

    # Apply additional convolutional layers
    x7 = Conv2D(32, (1, 1), padding='same')(channel_out)
    x8 = Conv2D(32, (3, 3), padding='same')(x7)
    x9 = Conv2D(16, (3, 3), padding='same')(x8)

    # Apply batch normalization
    x10 = BatchNormalization()(x9)

    # Define the output layer with sigmoid activation
    output = Conv2D(1, (1, 1), padding='same', activation='sigmoid')(x10)

    # Create and return the Keras model
    return Model(inputs=input_, outputs=output)

# Create an instance of the custom model with the specified input shape
model = Model_((width, height, channel))


x1 shape:  (None, 64, 64, 128)
concatenate:  (None, 64, 64, 322)
input Tensor shape:  (None, 64, 64, 322)
gap: (None, 1, 1, 322)
fc1: (None, 1, 1, 100)
fc2: (None, 1, 1, 322)
attention: (None, 1, 1, 322)
scaled_input: (None, 64, 64, 322)
channel_out: (None, 64, 64, 322)


In [6]:
# Create an instance of the custom neural network model with the specified input shape
model = Model_((width, height, channel))

# Define the learning rate for the optimizer
learning_rate = 0.0005

# Define the optimizer (RMSprop in this case) with the specified learning rate
optimizer = tf.keras.optimizers.RMSprop(learning_rate=learning_rate)

# Compile the model with a custom loss function (TverskyLoss) and metrics (IoU)
model.compile(loss=TverskyLoss, optimizer=optimizer, metrics=[iou])

# Define an early stopping callback to monitor validation loss and stop training if it doesn't improve for a certain number of epochs
early_stopping = keras.callbacks.EarlyStopping(monitor="val_loss", patience=60)

# Create a list of callbacks, including ModelCheckpoint to save the best model and EarlyStopping for early stopping
callbacks = [ModelCheckpoint('E:/Deep Course/Weeks/W11/Models/channelAttention.h5',
                             verbose=1, save_best_only=True), early_stopping]


x1 shape:  (None, 64, 64, 128)
concatenate:  (None, 64, 64, 322)
input Tensor shape:  (None, 64, 64, 322)
gap: (None, 1, 1, 322)
fc1: (None, 1, 1, 100)
fc2: (None, 1, 1, 322)
attention: (None, 1, 1, 322)
scaled_input: (None, 64, 64, 322)
channel_out: (None, 64, 64, 322)


In [7]:
history =model.fit(training_generator,epochs=500,validation_data=validation_generator,callbacks=callbacks)

Epoch 1/500

Epoch 00001: val_loss improved from inf to 0.68100, saving model to E:/Deep Course/Weeks/W11/Models\channelAttention.h5
Epoch 2/500

Epoch 00002: val_loss improved from 0.68100 to 0.67305, saving model to E:/Deep Course/Weeks/W11/Models\channelAttention.h5
Epoch 3/500

Epoch 00003: val_loss improved from 0.67305 to 0.66643, saving model to E:/Deep Course/Weeks/W11/Models\channelAttention.h5
Epoch 4/500

Epoch 00004: val_loss did not improve from 0.66643
Epoch 5/500

KeyboardInterrupt: 