In [None]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [None]:
import os
import tensorflow as tf
import numpy as np

SEED = 1234
tf.random.set_seed(SEED)  

# Get current working directory
cwd = os.getcwd()

In [None]:
# Run this cell only if you are using Colab with Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

apply_data_augmentation = False

# Create training ImageDataGenerator object
if apply_data_augmentation:
    train_data_gen = ImageDataGenerator(rotation_range=10,
                                        width_shift_range=10,
                                        height_shift_range=10,
                                        zoom_range=0.3,
                                        horizontal_flip=True,
                                        vertical_flip=True,
                                        fill_mode='constant',
                                        cval=0,
                                        rescale=1./255)
else:
    train_data_gen = ImageDataGenerator(rescale=1./255)

# Create validation and test ImageDataGenerator objects
valid_data_gen = ImageDataGenerator(rescale=1./255)
test_data_gen = ImageDataGenerator(rescale=1./255)

In [None]:
# Run this cell only if you are using Colab with Drive
!unzip '/content/drive/My Drive/MaskDataset.zip'

In [None]:
!ls /content

In [None]:
# Create generators to read images from dataset directory
# -------------------------------------------------------
dataset_dir = os.path.join(cwd, 'MaskDataset')

# Batch size
bs = 32

# img shape
img_h = 256
img_w = 256

num_classes=2

decide_class_indices = True
if decide_class_indices:
    classes = ['without_mask',       # 0
               'with_mask',          # 1
               ]
else:
    classes=None

# Training
training_dir = os.path.join(dataset_dir, 'train')
train_gen = train_data_gen.flow_from_directory(training_dir,
                                               batch_size=bs,
                                               target_size=[img_h, img_w],
                                               classes=classes,
                                               class_mode='categorical',
                                               shuffle=True,
                                               seed=SEED)  # targets are directly converted into one-hot vectors

# Validation
validation_dir = os.path.join(dataset_dir, 'val')
valid_gen = valid_data_gen.flow_from_directory(validation_dir,
                                               batch_size=bs, 
                                               target_size=[img_h, img_w],
                                               classes=classes,
                                               class_mode='categorical',
                                               shuffle=False,
                                               seed=SEED)

# Test
test_dir = os.path.join(dataset_dir, 'test')
test_gen = test_data_gen.flow_from_directory(test_dir,
                                             batch_size=bs, 
                                             target_size=[img_h, img_w],
                                             classes=classes,
                                             class_mode='categorical',
                                             shuffle=False,
                                             seed=SEED)

In [None]:
# Check how keras assigned the labels
train_gen.class_indices

In [None]:
# Create Dataset objects
# ----------------------

# Training
train_dataset = tf.data.Dataset.from_generator(lambda: train_gen,
                                               output_types=(tf.float32, tf.float32),
                                               output_shapes=([None, img_h, img_w, 3], [None, num_classes]))

train_dataset = train_dataset.repeat()

# Validation
# ----------
valid_dataset = tf.data.Dataset.from_generator(lambda: valid_gen, 
                                               output_types=(tf.float32, tf.float32),
                                               output_shapes=([None, img_h, img_w, 3], [None, num_classes]))

# Repeat
valid_dataset = valid_dataset.repeat()

# Test
# ----
test_dataset = tf.data.Dataset.from_generator(lambda: test_gen,
                                              output_types=(tf.float32, tf.float32),
                                              output_shapes=([None, img_h, img_w, 3], [None, num_classes]))

# Repeat
test_dataset = valid_dataset.repeat()

In [None]:
# Architecture: Features extraction -> Classifier

start_f = 8
depth = 5

model = tf.keras.Sequential()

encoder = tf.keras.Sequential()

# Features extraction
for i in range(depth):

    if i == 0:
        input_shape = [img_h, img_w, 3]
    else:
        input_shape=[None]

    # Conv block: Conv2D -> Activation -> Pooling
    encoder.add(tf.keras.layers.Conv2D(filters=start_f, 
                                     kernel_size=(3, 3),
                                     strides=(1, 1),
                                     padding='same',
                                     input_shape=input_shape))
    encoder.add(tf.keras.layers.ReLU())

    if i < depth -1:
      encoder.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))

    start_f *= 2

model.add(encoder)

# GAP
model.add(tf.keras.layers.GlobalAveragePooling2D())
    
# Classifier
model.add(tf.keras.layers.Dense(units=num_classes, activation='softmax'))

In [None]:
# Visualize created model as a table
model.summary()

# Visualize initialized weights
# model.weights

In [None]:
model.layers[-1].weights[0].shape

In [None]:
# Optimization params
# -------------------

# Loss
loss = tf.keras.losses.CategoricalCrossentropy()

# learning rate
lr = 1e-3
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
# -------------------

# Validation metrics
# ------------------

metrics = ['accuracy']
# ------------------

# Compile Model
model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

In [None]:
%load_ext tensorboard
%tensorboard --logdir /content/drive/My\ Drive/Keras4/segmentation_experiments_GAP/#

In [None]:
import os
from datetime import datetime


cwd = os.getcwd()

exps_dir = os.path.join('/content/drive/My Drive/Keras4/', 'segmentation_experiments_GAP')
if not os.path.exists(exps_dir):
    os.makedirs(exps_dir)

now = datetime.now().strftime('%b%d_%H-%M-%S')

model_name = 'CNN_GAP'

exp_dir = os.path.join(exps_dir, model_name + '_' + str(now))
if not os.path.exists(exp_dir):
    os.makedirs(exp_dir)
    
callbacks = []

# Model checkpoint
# ----------------
ckpt_dir = os.path.join(exp_dir, 'ckpts')
if not os.path.exists(ckpt_dir):
    os.makedirs(ckpt_dir)

ckpt_callback = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(ckpt_dir, 'cp_{epoch:02d}.ckpt'), 
                                                   save_weights_only=True)  # False to save the model directly
callbacks.append(ckpt_callback)

# Visualize Learning on Tensorboard
# ---------------------------------
tb_dir = os.path.join(exp_dir, 'tb_logs')
if not os.path.exists(tb_dir):
    os.makedirs(tb_dir)
    
# By default shows losses and metrics for both training and validation
tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tb_dir,
                                             profile_batch=0,
                                             histogram_freq=1)  # if 1 shows weights histograms
callbacks.append(tb_callback)

# Early Stopping
# --------------
early_stop = True
if early_stop:
    es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10)
    callbacks.append(es_callback)

In [None]:
model.fit(x=train_dataset,
          epochs=100,  #### set repeat in training dataset
          steps_per_epoch=len(train_gen),
          validation_data=valid_dataset,
          validation_steps=len(valid_gen), 
          callbacks=callbacks)

# How to visualize Tensorboard

# 1. tensorboard --logdir EXPERIMENTS_DIR --port PORT     <- from terminal
# 2. localhost:PORT   <- in your browser

In [None]:
# Let's visualize the activations of our network
from PIL import Image

test_iter = iter(test_dataset)

# Get a test image with a mask
test_img, target = next(test_iter)
test_img = test_img[0]

# Visualize the image
Image.fromarray(np.uint8(np.array(test_img)*255.))

In [None]:
np.argmax(target, 1)

In [None]:
out_model = tf.keras.Model(inputs=model.input, outputs=[model.output, model.layers[0].get_output_at(-1)])

In [None]:
resize_feature = tf.keras.layers.experimental.preprocessing.Resizing(
    256, 256, interpolation="bilinear")

In [None]:
softmax_out, last_enc_feature = out_model(tf.expand_dims(test_img, 0))
 
pred = tf.argmax(softmax_out, 1)
 
resized_feature = resize_feature(last_enc_feature)

In [None]:
# Get the weights in input to the output neuron corresponding to the 'with_mask' class
mask_weights = model.layers[-1].weights[0][:, 1]

resized_feature = tf.reshape(resized_feature, shape=[256*256, resized_feature.shape[-1]])
cam = tf.linalg.matmul(resized_feature, tf.expand_dims(mask_weights, -1))
cam = tf.reshape(cam, shape=[256, 256])

In [None]:
import matplotlib.pyplot as plt

fig, ax = plt.subplots(nrows=1, ncols=2, sharey=True, sharex=True, figsize=plt.figaspect(0.5))
ax[0].imshow(test_img)
ax[1].imshow(test_img, alpha=0.5)
ax[1].imshow(cam, cmap='jet', alpha=0.5)
plt.show()

In [None]:
# we can obtain a course segmentation of the mask by thresholding 
# For example we can use thr = 0.5 * max(cam) 

fig, ax = plt.subplots(nrows=1, ncols=2, sharey=True, sharex=True, figsize=plt.figaspect(0.5))
ax[0].imshow(test_img)
ax[1].imshow(test_img, alpha=0.5)
ax[1].imshow(0.5 * np.max(cam), cmap='gray', alpha=0.5)
plt.show()