**Connect google drive**

In [None]:
# Check if NVIDIA GPU is enabled
!nvidia-smi

In [None]:
from google.colab import drive
drive.mount('/content/drive')

**1) Install and import libraries**

In [None]:
#Install libraries
!pip install segmentation_models
!pip install -U albumentations
!pip install pyyaml h5py
!pip install tensorflow==2.3.0
!pip install opencv-python-headless==4.5.2.52
!pip install wandb

In [None]:
#Import libraries
%env SM_FRAMEWORK = tf.keras
import numpy as np
from glob import glob 
import cv2 
import segmentation_models as sm
import matplotlib.pyplot as plt
import random
import albumentations as A
from tensorflow.keras.callbacks import ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.models import load_model
import tensorflow as tf
from tensorflow.keras.optimizers import Adam, Adadelta, Adagrad, SGD, Nadam, schedules
from  segmentation_models.metrics import iou_score
from google.colab.patches import cv2_imshow
from natsort import natsorted
from IPython.display import clear_output
from tensorflow import keras

np.set_printoptions(threshold=np.inf)

In [None]:
#Define backbone (encoder architecture) and batch_size
BACKBONE = 'seresnet34'
preprocess_input = sm.get_preprocessing(BACKBONE)
batch_size = 16

**2) Load and preprocess dataset**

In [None]:
#Load images
def load_data(pi, pl):
  obj_im = []
  obj_la = []
  for file in natsorted(glob(pi+'/*.*')):
    img = cv2.imread(file, cv2.COLOR_BGR2RGB)
    obj_im.append(img)

  for file in natsorted(glob(pl+'/*.*')):
    lab = cv2.imread(file,cv2.IMREAD_GRAYSCALE)
    obj_la.append(lab)
    
  return np.asarray(obj_im), np.asarray(obj_la)
#Load train images and masks from "train" and "train_masks" folders in "dataset" folder on Google drive
x_train, y_train = load_data('drive/MyDrive/dataset/train', 'drive/MyDrive/dataset/train_masks')
#Load validation images and masks from "valid" and "valid_masks" folders in "dataset" folder on Google drive
x_valid, y_valid = load_data('drive/MyDrive/dataset/valid', 'drive/MyDrive/dataset/valid_masks')

x_train = preprocess_input(x_train)
x_valid = preprocess_input(x_valid)

#Check if dataset is loaded corretly
print('x_train', x_train.shape)
print('y_train', y_train.shape)
print()
print('x_valid', x_valid.shape)
print('y_valid', y_valid.shape)

In [None]:
#Load testing images from "test" folder in "dataset" folder on Google drive
def load_test(pi):
  obj_im = []
  for file in natsorted(glob(pi+'/*.*')):
      img = cv2.imread(file, cv2.COLOR_BGR2RGB)
      obj_im.append(img)
  return np.asarray(obj_im)

x_test = load_test('drive/MyDrive/dataset/test')
print('x_test', x_test.shape)

In [None]:
#Show if images and masks are loaded correctly
index = 15
cv2_imshow(x_train[index])
#plt.imshow(x_train[index].astype('int'))
#plt.show()

cv2_imshow(y_train[index])
print(np.unique(y_train[index],return_counts=True)) #Mask should contain only binary pixels [0,255] 
#plt.imshow(y_train[index].astype('int'))
#plt.show()

**3) Augmentation**

In [None]:
# Generated from: https://albumentations-demo.herokuapp.com/

def augment_albu(image, mask):   
    #STŘEDNÍ
    t = A.Compose([
        A.Blur(always_apply=False, p=0.5, blur_limit=(3, 6)),
        A.Posterize(always_apply=False, p=0.4, num_bits=[(0, 7), (0, 7), (0, 7)]),
        A.ToSepia(always_apply=False, p=0.4),
        A.HorizontalFlip(always_apply=False, p=0.5),
        A.RandomResizedCrop(always_apply=False, p=0.5, height=320, width=480, scale=(0.7, 0.8), ratio=(0.75, 1.3), interpolation=0),
        A.OneOf([
          A.HueSaturationValue(always_apply=True, hue_shift_limit=(-20, 20), sat_shift_limit=(-30, 30), val_shift_limit=(-20, 20)),
          A.RandomContrast(always_apply=True, limit=(-0.5, 0.2)),
          A.RGBShift(always_apply=True, r_shift_limit=(-20, 20), g_shift_limit=(-20, 20), b_shift_limit=(-20, 20)),    
                ],p=1,),
        ])
    
    aug = t(image=image, mask=mask)

    return aug['image'], aug['mask']

**4) Data generator**

In [None]:
def data_generator(augment):
    while True:
        g_images = []
        g_labels = []

        for b in range(batch_size):
            if augment:
                index = random.randrange(0, len(x_train))
                g_im, g_la = augment_albu(x_train[index], y_train[index])

            else:
                index = random.randrange(0, len(x_valid))
                g_im, g_la = x_valid[index], y_valid[index]

            g_la = [(g_la==v) for v in [0,255]]
            g_la = np.stack(g_la, axis=-1)
          
            g_images.append(g_im)
            g_labels.append(g_la)

        g_images = np.asarray(g_images)
        g_labels = np.asarray(g_labels)

        yield g_images.astype('float32'), g_labels.astype('float32')

def gen_train():
  #Train data are augmented
    return data_generator(True)
    
def gen_val():
  #Valid data are not augmented
    return data_generator(False)

**5) Weights and Biases**

In [None]:
#Import, login and init W&B platform for train tracking and displaying graphs during training
import os
os.environ["WANDB_START_METHOD"] = "thread"
import wandb
from wandb.keras import WandbCallback
wandb.login()
wandb.init(project="Final", entity="your_entity")
wandb.run.name = "model1"

**6) Callbacks define**

In [None]:
class DisplayCallback(tf.keras.callbacks.Callback):
  def on_epoch_end(self, epoch, logs=None):
    #clear_output(wait=True)
    show_predictions_callback()

#Dynamic reduce learning rate of optimizer    
reduce_lr = ReduceLROnPlateau(monitor='loss', patience=2, factor=0.5, verbose=1)

#Save best model during training
checkpoint_filepath = "/content/drive/MyDrive/best_model.h5"
model_checkpoint_callback = ModelCheckpoint(filepath=checkpoint_filepath, save_weights_only=False, save_best_only=True, monitor='val_loss', verbose=1, mode='min')

In [None]:
def show_predictions_callback():
  index = 1
  image = x_test[index]
  
  #Show image
  cv2_imshow(image)

  #Show predicted mask
  image = preprocess_input(image)
  image = np.expand_dims(image, axis = 0)
  pred = model.predict(image)[0]
  pred = np.argmax(pred,axis = -1)
  pred = pred.astype(np.float32)*255
  print(np.unique(pred,return_counts=True))
  cv2_imshow(pred)

**6) Define model and start training**

In [None]:
#Build model
model = sm.Unet(backbone_name=BACKBONE, classes=2, encoder_weights='imagenet', input_shape=(320, 480, 3), activation='sigmoid')

In [None]:
#Compile model
model.compile(Adam(), 'binary_crossentropy', metrics= [iou_score])
#Show model
#model.summary()

In [None]:
#Start training
model_history = model.fit(gen_train(), 
                          validation_data=gen_val(), 
                          steps_per_epoch=200, 
                          validation_steps=100, 
                          batch_size=batch_size, 
                          epochs=10, 
                          callbacks=[reduce_lr, wandb.keras.WandbCallback(), model_checkpoint_callback, DisplayCallback()])
#Save model after run is done
model.save("/content/drive/MyDrive/model1.h5")

In [None]:
#Finish transfer to W&B after run is done
wandb.finish() 

**7) Evaluate trained models**

In [None]:
#Load Models:
model1 = tf.keras.models.load_model('/content/drive/MyDrive/model1.h5', compile=False)
model2 = tf.keras.models.load_model('/content/drive/MyDrive/model2.h5', compile=False)
model3 = tf.keras.models.load_model('/content/drive/MyDrive/model3.h5', compile=False)

In [None]:
#For models ensemble
models = []
models.append(model1)
models.append(model2)
models.append(model3)

In [None]:
#Models evaluating on valid images and comparing with ground truth masks
def show_predictions(index):
  #Show image
  print("Valid image:" ,index)
  img = cv.cvtColor(x_valid[index], cv.COLOR_BGR2RGB)
  plt.imshow(img.astype('int'))
  plt.show()
  
  #Show mask
  print("Ground truth mask:")
  mask = (y_valid[index]).astype(np.uint8)
  mask = cv.cvtColor(mask, cv.COLOR_BGR2RGB)
  print(np.unique(y_valid[index],return_counts=True))
  plt.imshow(mask)
  plt.show()

  #Preprocess for prediction
  image = x_valid[index]
  image = preprocess_input(image)
  image = np.expand_dims(image, axis = 0)
  
  #Model1 prediction
  print("model 1:")
  pred_raw = model.predict(image)[0]
  pred = np.argmax(pred_raw,axis = -1)
  pred = (pred*255).astype(np.uint8)
  pred = cv.cvtColor(pred, cv.COLOR_BGR2RGB)
  print(np.unique(pred,return_counts=True))
  plt.imshow(pred)
  plt.show()
  
  #Model2 prediction
  print("model 2:")
  pred2_raw = model2.predict(image)[0]
  pred2 = np.argmax(pred2_raw,axis = -1)
  pred2 = (pred2*255).astype(np.uint8)
  pred2 = cv.cvtColor(pred2, cv.COLOR_BGR2RGB)
  print(np.unique(pred2,return_counts=True))
  plt.imshow(pred2)
  plt.show()
  
  #Model3 prediction
  print("model 3:")
  pred3_raw = model3.predict(image)[0]
  pred3 = np.argmax(pred3_raw,axis = -1)
  pred3 = (pred3*255).astype(np.uint8)
  pred3 = cv.cvtColor(pred3, cv.COLOR_BGR2RGB)
  print(np.unique(pred3,return_counts=True))
  plt.imshow(pred3)
  plt.show()

  #Ensemble models prediction
  preds=np.array([pred_raw, pred2_raw, pred3_raw])
  weights = [1,1,1]
  #Use tensordot to sum the products of all elements over specified axes.
  weighted_preds = np.tensordot(preds, weights, axes=((0),(0)))
  weighted_ensemble_prediction = np.argmax(weighted_preds, axis=-1)
  ensemble_pred = (weighted_ensemble_prediction*255).astype(np.uint8)
  ensemble_pred = cv.cvtColor(ensemble_pred, cv.COLOR_BGR2RGB)
  print("[INFO] Predicted ensemble Mask:",np.unique(ensemble_pred,return_counts=True))
  plt.imshow(ensemble_pred)
  plt.show()

index = 1
show_predictions(index)

In [None]:
#Models evaluating on test images
def show_predictions(index):
  #Show image:
  print("Image:" ,index)
  img = cv2.cvtColor(x_test[index], cv2.COLOR_BGR2RGB)
  plt.imshow(img.astype('int'))
  plt.show()

  #Preprocess for prediction
  image = x_test[index]
  image = preprocess_input(image)
  image = np.expand_dims(image, axis = 0)
  
  #Model1 prediction:
  print("model 1:")
  pred_raw = model.predict(image)[0]
  pred = np.argmax(pred_raw,axis = -1)
  pred = (pred*255).astype(np.uint8)
  pred = cv2.cvtColor(pred, cv2.COLOR_BGR2RGB)
  print(np.unique(pred,return_counts=True))
  plt.imshow(pred)
  plt.show()
  
  #Model2 prediction:
  print("model 2:")
  pred2_raw = model2.predict(image)[0]
  pred2 = np.argmax(pred2_raw,axis = -1)
  pred2 = (pred2*255).astype(np.uint8)
  pred2 = cv.cvtColor(pred2, cv.COLOR_BGR2RGB)
  print(np.unique(pred2,return_counts=True))
  plt.imshow(pred2)
  plt.show()

  #Model3 prediction:
  print("model 3:")
  pred3_raw = model3.predict(image)[0]
  pred3 = np.argmax(pred3_raw,axis = -1)
  pred3 = (pred3*255).astype(np.uint8)
  pred3 = cv.cvtColor(pred3, cv.COLOR_BGR2RGB)
  print(np.unique(pred3,return_counts=True))
  plt.imshow(pred3)
  plt.show()

  #Ensemble models prediction
  preds=np.array([pred_raw, pred2_raw, pred3_raw])
  weights = [1,1,1]
  #Use tensordot to sum the products of all elements over specified axes.
  weighted_preds = np.tensordot(preds, weights, axes=((0),(0)))
  weighted_ensemble_prediction = np.argmax(weighted_preds, axis=-1)
  ensemble_pred = (weighted_ensemble_prediction*255).astype(np.uint8)
  ensemble_pred = cv.cvtColor(ensemble_pred, cv.COLOR_BGR2RGB)
  print("[INFO] Predicted ensemble Mask:",np.unique(ensemble_pred,return_counts=True))
  plt.imshow(ensemble_pred)
  plt.show()

index = 1
show_predictions(index)