# Simple U-Net Demo

### Configurations

In [None]:
IMAGE_SIZE = 640
BATCH_SIZE = 1

### Import Dependencies & Initialization

In [None]:
import os
os.chdir('..')   # change the directory from 'examples' to root

import numpy as np
import tensorflow as tf
from IPython.display import clear_output

import pneumo

### Load Kaggle 'SIIM-ACR Pneumothorax Segmentation Challenge' dataset
https://www.kaggle.com/c/siim-acr-pneumothorax-segmentation

In [None]:
ds = pneumo.data.load_siim('data/siim-acr')

### Train-test split

In [None]:
N_data = sum(1 for _ in ds)
N_test = int(N_data*0.1)
N_val = int(N_data*0.1)
N_train = N_data - N_val - N_test

print(f'Total number of samples: {N_data}')
print(f'  Train: {N_train}')
print(f'  Validation: {N_val}')
print(f'  Test: {N_test}')

In [None]:
test_ds = ds.take(N_test)
train_ds = ds.skip(N_test)
val_ds = train_ds.take(N_val)
train_ds = train_ds.skip(N_val)

### Data Augmentation and Preprocessing

In [None]:
def preprocess(image, label):
    return tf.keras.applications.mobilenet_v2.preprocess_input(image), label

def unpreprocess(image, label=None):
    image = tf.cast((0.5*image+0.5)*255, tf.uint8)
    if label is None:
        return image
    else:
        return image, label

In [None]:
train_transforms = [
    pneumo.augmentations.random_crop((IMAGE_SIZE,IMAGE_SIZE)),
    pneumo.augmentations.random_intensity(5),
    pneumo.augmentations.random_noise(1),
    preprocess
]
test_transforms = [
    pneumo.augmentations.random_crop((IMAGE_SIZE,IMAGE_SIZE)),
    preprocess
]

In [None]:
for transform in train_transforms:
    train_ds = train_ds.map(transform)
for transform in test_transforms:
    val_ds = val_ds.map(transform)
    test_ds = test_ds.map(transform)

train_ds = train_ds.shuffle(512).repeat().batch(BATCH_SIZE).prefetch(1)
val_ds = val_ds.batch(BATCH_SIZE)
test_ds = test_ds.batch(BATCH_SIZE)

In [None]:
for image, mask in train_ds.take(1):
    pneumo.display(
        [image[0], mask[0]],
        ['image', 'mask'],
        ['gray', 'gray']
    )

### Build and compile U-Net

In [None]:
model = pneumo.models.UNet()
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
              loss=pneumo.losses.combined_loss(1,4,3),
              metrics=[pneumo.metrics.dice])

In [None]:
# code modified from https://www.tensorflow.org/tutorials/images/segmentation

def create_mask(pred_mask):
    pred_mask = tf.nn.sigmoid(pred_mask) > 0.5
    pred_mask = tf.cast(pred_mask, tf.int32)
    return pred_mask

def show_predictions(dataset=None, num=1):
    if dataset:
        for image, mask in dataset.take(num):
            pred_mask = model.predict(image)
            pnemo.display(
                [image[0], mask[0], tf.nn.sigmoid(pred_mask[0]), create_mask(pred_mask[0])],
                ['Input Image', 'True Mask', 'Heat Map', 'Predicted Mask'],
                [None, 'gray', 'inferno', 'gray']
            )
    else:
        for image, mask in val_ds.take(num):
            pred_mask = model.predict(image)
            pneumo.display(
                [image[0], mask[0], tf.nn.sigmoid(pred_mask[0]), create_mask(pred_mask[0])],
                ['Input Image', 'True Mask', 'Heat Map', 'Predicted Mask'],
                [None, 'gray', 'inferno', 'gray']
            )

class CustomCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        clear_output(wait=True)
        show_predictions()
        lr = float(tf.keras.backend.get_value(self.model.optimizer.learning_rate))
        scheduled_lr = lr*0.95
        tf.keras.backend.set_value(self.model.optimizer.lr, scheduled_lr)
        print ('\nSample Prediction after epoch {}'.format(epoch+1))
        print('Learning rate: {}\n'.format(scheduled_lr))

In [None]:
show_predictions()

### Train Model

In [None]:
EPOCHS = 50
VALIDATION_STEPS = N_train

model_history = model.fit(
    train_ds,
    epochs=EPOCHS,
    steps_per_epoch=VALIDATION_STEPS,
    callbacks=[CustomCallback()],
    validation_steps=VALIDATION_STEPS,
    validation_data=val_ds,
)

In [None]:
model.save('mymodel')

### Model Load and Test on a Sample Query

In [None]:
del model
model = tf.keras.models.load_model("mymodel", custom_objects={'dice':dice, 'loss':loss})
show_predictions()

In [None]:
uifiles = list(Path('data/ui').glob('*.dcm'))
dcm = dcmread(str(uifiles[0]))
image = dcm.pixel_array/4095*255   # sample image has pixel values ranging between 0 and 4095
pneumo.display(image)

In [None]:
resized = tf.image.resize(np.expand_dims(image,-1), (IMAGE_SIZE, IMAGE_SIZE))
resized = tf.image.grayscale_to_rgb(resized)
resized = tf.keras.applications.mobilenet_v2.preprocess_input(resized)
resized.shape

In [None]:
predicted = model.predict(tf.expand_dims(resized,0))

In [None]:
import matplotlib.pyplot as plt
plt.imshow((resized*0.5+0.5), cmap='gray')
plt.imshow(tf.sigmoid(predicted[0]), cmap='inferno', alpha=0.3)