<a href="https://colab.research.google.com/github/mmcastillo/al112248/blob/main/Unet_aneurismas_custom_datagen.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import os
from google.colab import drive
drive.mount('/content/drive/');
import tensorflow as tf
from tensorflow import keras
from keras.models import Model
from keras import layers
from keras.layers import Input, Conv3D, MaxPooling3D, UpSampling3D, concatenate, Conv3DTranspose, BatchNormalization, Dropout, Lambda
from keras.layers import Activation, MaxPool2D, Concatenate
from scipy import ndimage
from sklearn.model_selection import train_test_split

In [None]:
def load_img(img_dir, img_list):
    images=[]
    for i, image_name in enumerate(img_list):    
        if (image_name.split('.')[1] == 'npy'):
            
            image = np.load(img_dir+image_name)
                      
            images.append(image)
    images = np.array(images)
    
    return(images)


In [None]:
def imageLoader(img_dir, img_list, mask_dir, mask_list, batch_size):

    L = len(img_list)

    #keras needs the generator infinite, so we will use while true  
    while True:

        batch_start = 0
        batch_end = batch_size

        while batch_start < L:
            limit = min(batch_end, L)
                       
            X = load_img(img_dir, img_list[batch_start:limit])
            Y = load_img(mask_dir, mask_list[batch_start:limit])

            yield (X,Y) #a tuple with two numpy arrays with batch_size samples     

            batch_start += batch_size   
            batch_end += batch_size

In [None]:
train_img_dir = '/content/drive/MyDrive/DOCTORADO/python/ADAM_10C_10A_npy/imgs/';
train_mask_dir = '/content/drive/MyDrive/DOCTORADO/python/ADAM_10C_10A_npy/masks/';
train_img_list = os.listdir(train_img_dir)
train_mask_list = os.listdir(train_mask_dir)

val_img_dir = '/content/drive/MyDrive/DOCTORADO/python/val/imgs/';
val_mask_dir = '/content/drive/MyDrive/DOCTORADO/python/val/masks/';
val_img_list = os.listdir(val_img_dir)
val_mask_list = os.listdir(val_mask_dir)


In [None]:
batch_size = 4

train_img_datagen = imageLoader(train_img_dir, train_img_list, 
                                train_mask_dir, train_mask_list, batch_size)

val_img_datagen = imageLoader(val_img_dir, val_img_list, 
                                val_mask_dir, val_mask_list, batch_size)


In [None]:
img,mask=train_img_datagen.__next__()
print(img.shape)
print(mask.shape)

In [None]:
def conv_block(input, num_filters):
    x = Conv3D(num_filters, 3, padding="same")(input)
    x = BatchNormalization()(x)   #Not in the original network. 
    x = Activation("relu")(x)

    x = Conv3D(num_filters, 3, padding="same")(x)
    x = BatchNormalization()(x)  #Not in the original network
    x = Activation("relu")(x)

    return x

#Encoder block: Conv block followed by maxpooling


def encoder_block(input, num_filters):
    x = conv_block(input, num_filters)
    p = MaxPooling3D((2, 2, 2))(x)
    return x, p   

#Decoder block
#skip features gets input from encoder for concatenation

def decoder_block(input, skip_features, num_filters):
    x = Conv3DTranspose(num_filters, (2, 2, 2), strides=2, padding="same")(input)
    x = Concatenate()([x, skip_features])
    x = conv_block(x, num_filters)
    return x

#Build Unet using the blocks
def build_unet(input_shape):
    inputs = Input(input_shape)

    s1, p1 = encoder_block(inputs, 16)
    s2, p2 = encoder_block(p1, 32)
    s3, p3 = encoder_block(p2, 64)
    s4, p4 = encoder_block(p3, 128)

    b1 = conv_block(p4, 256) #Bridge

    d1 = decoder_block(b1, s4, 128)
    d2 = decoder_block(d1, s3, 64)
    d3 = decoder_block(d2, s2, 32)
    d4 = decoder_block(d3, s1, 16)

    activation = 'softmax'

    outputs = Conv3D(1, 1, padding="same", activation=activation)(d4)  #Change the activation based on n_classes
    print(f'activation function: {activation}')

    model = Model(inputs, outputs, name="U-Net")
    return model

In [None]:
model = build_unet((64, 64, 64, 1))
print(f'model input shape: {model.input_shape}')

In [None]:
!pip install segmentation-models-3D

In [None]:
import segmentation_models_3D as sm
from keras import backend as K

'''
# Segmentation models losses can be combined together by '+' and scaled by integer or float factor
# set class weights for dice_loss (car: 1.; pedestrian: 2.; background: 0.5;)
dice_loss = sm.losses.DiceLoss()
focal_loss = sm.losses.BinaryFocalLoss()
total_loss = dice_loss + (1 * focal_loss)

# actulally total_loss can be imported directly from library, above example just show you how to manipulate with losses
# total_loss = sm.losses.binary_focal_dice_loss # or sm.losses.categorical_focal_dice_loss 
'''
metrics = [sm.metrics.IOUScore(threshold=0.5), sm.metrics.FScore(threshold=0.5)]
jaccard_loss = sm.losses.JaccardLoss()


In [None]:
LR = 0.0001
optim = keras.optimizers.Adam(LR)

model.compile(optimizer=optim, loss=jaccard_loss ,metrics = metrics)
print(model.summary())

In [None]:
#Si uso el custom datagen
steps_per_epoch = len(train_img_list)//batch_size
val_steps_per_epoch = len(val_img_list)//batch_size

history=model.fit(train_img_datagen,
          steps_per_epoch=steps_per_epoch,
          epochs=20,
          verbose=1,
          validation_data=val_img_datagen,
          validation_steps=val_steps_per_epoch,
          )

model.save('UNet_aneurismas_patches.hdf5')

In [None]:
y_pred=model.predict(X_test)
#y_pred = y_pred >= 0.5 
#y_pred = y_pred.astype(int)
print(y_pred.shape)
print(np.unique(y_pred))

In [None]:
print(np.unique(y_test[38,:,:,26,:]))

In [None]:
plt.subplot(1,3,1)
plt.imshow(X_test[38,:,:,26,0],cmap='gray');
plt.subplot(1,3,2)
plt.imshow(y_test[38,:,:,26,0],cmap='gray');
plt.subplot(1,3,3)
plt.imshow(y_pred[38,:,:,26,0],cmap='gray');