In [0]:
# Installation of dependencies 
!pip install imageio
!pip install -U tensorboardcolab
!git clone https://github.com/mariomeissner/unet-segmentation.git ./unet

# Optinal for graph plotting
#!apt-get install graphviz
#!pip install graphviz pydot


In [0]:
# Imports 
import numpy as np 
import os
import skimage.transform as trans
import matplotlib.pyplot as plt
import unet.model as unet_model
from keras.models import *
from keras.layers import *
from keras.optimizers import *
from keras.callbacks import ModelCheckpoint, LearningRateScheduler
from keras.preprocessing.image import ImageDataGenerator
from tensorboardcolab import TensorBoardColab, TensorBoardColabCallback
from keras import backend
from keras.utils import plot_model
from imageio import imread, imwrite
from skimage import transform

In [0]:
USE_DRIVE = True
if(USE_DRIVE):
    from google.colab import drive
    folder = '/content/gdrive/My Drive/Projects/datasets/steven2358-larynx_data/'
    drive.mount('/content/gdrive')
else:
    folder = 'your/local/dataset/folder/here'
    
# Hyperparameters
num_images = 154

In [0]:
# Function for plotting model history
def plot_history(history):
  plt.plot(history.history['categorical_accuracy'])
  plt.plot(history.history['val_categorical_accuracy'])
  plt.title('model accuracy')
  plt.ylabel('accuracy')
  plt.xlabel('epoch')
  plt.legend(['train', 'validation'], loc='upper left')
  plt.show()
  # summarize history for loss
  plt.plot(history.history['loss'])
  plt.plot(history.history['val_loss'])
  plt.title('model loss')
  plt.ylabel('loss')
  plt.xlabel('epoch')
  plt.legend(['train', 'validation'], loc='upper left')

In [0]:
images = np.zeros((154,160,240,3), dtype=np.float32)
for i,filename in enumerate(sorted(os.listdir(folder + 'images_cropped/images/'))):
  images[i,:,:,:] = imread(folder + 'images_cropped/images/' + filename, pilmode='RGB')
print("loaded images")
images = images / 255
print(images.shape)

labels = np.zeros((154,160,240,3), dtype=np.float32)
for i,filename in enumerate(sorted(os.listdir(folder + 'labels_cropped/labels/'))):
  labels[i,:,:,:] = imread(folder + 'labels_cropped/labels/' + filename, pilmode='RGB')
print("loaded labels")
labels = labels / 255
print(labels.shape)

In [0]:
# Have a peak at what they look like
print(images[3][100][120:140])
print(labels[3][100][120:140])

In [0]:
# Shuffle images and labels and create test split
num_test_images = 15
num_val_images = 10
rng_state = np.random.get_state()
np.random.shuffle(images)
np.random.set_state(rng_state)
np.random.shuffle(labels)
images_test = images[:num_test_images]
labels_test = labels[:num_test_images]
images_val = images[:-num_val_images]
labels_val = labels[:-num_val_images]
images = images[num_test_images:-num_val_images]
labels = labels[num_test_images:-num_val_images]
num_train_steps = len(images_test)
num_val_steps = len(images_val)

In [0]:
# Data augmentation

data_gen_args = dict(
    rotation_range = 5,
    width_shift_range = 5,
    height_shift_range = 5,
    horizontal_flip = True,
    zoom_range = 0.05,
    data_format = 'channels_last',
)

image_datagen = ImageDataGenerator(**data_gen_args)
label_datagen = ImageDataGenerator(**data_gen_args)


In [0]:
batch_size = 8
image_gen = image_datagen.flow(images, seed = 1, batch_size=batch_size, shuffle=True)
label_gen = label_datagen.flow(labels, seed = 1, batch_size=batch_size, shuffle=True)
image_val = image_datagen.flow(images_val, seed = 1, batch_size=batch_size, shuffle=True)
label_val = label_datagen.flow(labels_val, seed = 1, batch_size=batch_size, shuffle=True)
train_gen = zip(image_gen, label_gen)
val_gen = zip(image_val, label_val)

In [0]:
# Check that original images and labels match up
position = 55
plt.imshow(images[position])
plt.show()
plt.imshow(labels[position])
plt.show()

In [0]:
# Check that augmented images and labels match up
image_test, label_test = next(train_gen)
image_test, label_test = image_test[0], label_test[0]
print(image_test.dtype)
print(image_test[100][100])
print(label_test.dtype)
print(label_test[100][100])
plt.imshow(image_test)
plt.show()
plt.imshow(label_test)
plt.show()
image_test, label_test = next(val_gen)
image_test, label_test = image_test[0], label_test[0]
print(image_test.dtype)
print(image_test[100][100])
print(label_test.dtype)
print(label_test[100][100])
plt.imshow(image_test)
plt.show()
plt.imshow(label_test)
plt.show()

In [0]:
checkpoint = ModelCheckpoint(folder + 'unet_checkpoint.hdf5', 
                             monitor='val_acc',
                             verbose=1, 
                             save_best_only=True)

In [0]:
model = unet_model.unet(input_size=(160,240,3))
#plot_model(model, to_file="model.png")

In [0]:
# Experimental custom loss function for unbalanced segmentation
def dice_coef_loss(y_true, y_pred, smooth=1.0):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return 1 - (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)
  
# Lets test it
# Take a random label
label = labels[0]

# given the label as prediction, loss should be 0
print(f"Should be almost 0: {K.eval(dice_coef_loss(label, label))}")

# Doing a few random modifications should give a small loss
pred = np.array(label)
pred[100,100] = [0.5, 0.5, 0.]
pred[105,105] = [0.5, 0., 0.5]
pred[110,110] = [0.4, 0.5, 0.1]
pred[115,115] = [0.5, 0.5, 0.]
pred[120,120] = [0.5, 0.5, 0.]
print(f"Should be close to 0: {K.eval(dice_coef_loss(label, pred))}")

# taking a completely different label as prediction should give a high loss
label2 = labels[44]
print(f"Should be high: {K.eval(dice_coef_loss(label, label2))}")

# taking a real prediction from a trained model
pred = model.predict(images[:1])[0]
print(K.eval(dice_coef_loss(label, pred)))

In [0]:
# Compile using experimental custom loss 
#model.compile(optimizer = Adam(lr = 2e-4), loss = dice_coef_loss, metrics = ['accuracy'])

In [0]:
# Compile using crossentropy
model.compile(optimizer = Adam(lr = 2e-4), loss = "categorical_crossentropy", metrics = ['categorical_accuracy'])

In [0]:
# Tensorboard callback 
#tbc = TensorBoardColab()

In [0]:
# Train and periodically check how well we are doing.

history = model.fit_generator(train_gen,
                            validation_data = val_gen,
                            validation_steps = num_val_steps,
                            steps_per_epoch=num_train_steps,
                            epochs = 5,
                            callbacks = [
                                checkpoint, 
                                #TensorBoardColabCallback(tbc),
                            ],
                            )
image_batch, label_batch = next(train_gen)
image, label = image_batch[0], label_batch[0]
predicted = model.predict(image_batch)[0]

# Show real image and label, then predicted one
print(predicted[100][120:125])
plt.imshow(image)
plt.show()
plt.imshow(label)
plt.show()
plt.imshow(predicted)
plt.show()

# Flatten prediction
flat_predicted = np.zeros(predicted.shape)
for i in range(len(flat_predicted)):
  for j in range(len(flat_predicted[0])):
    flat_predicted[i,j,np.argmax(predicted[i,j])] = 1. 
plt.imshow(flat_predicted)
plt.show()

# Show history plots
plot_history(history)


In [0]:
model.save(folder + 'unet.hdf5')

In [0]:
# Load model weights
# model.load_weights(folder + 'unet.hdf5')

In [0]:
results = model.evaluate(images_test, labels_test)

In [0]:
print(model.metrics_names)
print(results)

In [0]:
predicted = model.predict(images_test)[0]
# Show real image and label, then predicted one
plt.imshow(images_test[0])
plt.show()
plt.imshow(labels_test[0])
plt.show()
plt.imshow(predicted)
plt.show()