# Chapter 14 Deep Computer Vision Using Convolutional Neural Networks

##### Setup

In [1]:
import os
import sys
from functools import partial

import numpy as np
import matplotlib.pyplot as plt

import tensorflow as tf 
from tensorflow import keras
from keras import (
    Sequential,
    applications,
    layers,
    optimizers,
    losses,
    callbacks,
    metrics
)
import tensorflow_datasets as tfds
import tensorflow_hub as hub


caused by: ["[Errno 2] The file to load file system plugin from does not exist.: '/Users/mmenendezg/Developer/Books/.venv/lib/python3.10/site-packages/tensorflow_io/python/ops/libtensorflow_io_plugins.so'"]
caused by: ["dlopen(/Users/mmenendezg/Developer/Books/.venv/lib/python3.10/site-packages/tensorflow_io/python/ops/libtensorflow_io.so, 0x0006): tried: '/Users/mmenendezg/Developer/Books/.venv/lib/python3.10/site-packages/tensorflow_io/python/ops/libtensorflow_io.so' (no such file), '/System/Volumes/Preboot/Cryptexes/OS/Users/mmenendezg/Developer/Books/.venv/lib/python3.10/site-packages/tensorflow_io/python/ops/libtensorflow_io.so' (no such file), '/Users/mmenendezg/Developer/Books/.venv/lib/python3.10/site-packages/tensorflow_io/python/ops/libtensorflow_io.so' (no such file)"]
  from .autonotebook import tqdm as notebook_tqdm


In [2]:
DATA_PATH = os.environ["DATA_PATH"]
MODEL_PATH = os.path.join("..", "..", "models", "chapter_14")
AUTOTUNE = tf.data.AUTOTUNE

tf.autograph.set_verbosity(0)


#### 9. Build your own CNN from scartch and try to achieve the highest possible accuracy on MNIST.

In [None]:
# Create the folder to store the images
mnist_folder = os.path.join(DATA_PATH, "MNIST")
os.makedirs(mnist_folder, exist_ok=True)

# Load the MNIST dataset
train_set, valid_set, test_set = tfds.load(
    "mnist",
    split=["train[:90%]", "train[90%:]", "test"],
    data_dir=mnist_folder,
    as_supervised=True,
)

normalize_pixels = lambda x, y: (x / 255, y)

train_set = (
    train_set.cache()
    .map(normalize_pixels, num_parallel_calls=AUTOTUNE)
    .batch(4096)
)

valid_set = (
    valid_set.cache()
    .map(normalize_pixels, num_parallel_calls=AUTOTUNE)
    .batch(4096)
)

test_set = (
    test_set.cache()
    .map(normalize_pixels, num_parallel_calls=AUTOTUNE)
    .batch(4096)
)

The MNIST images have dimensions `[28, 28, 1]`

In [None]:
keras.backend.clear_session()
tf.random.set_seed(1992)

conv_layer = partial(
    layers.SeparableConv2D,
    kernel_size=3,
    padding="same",
    activation="elu",
    kernel_initializer="he_normal"
)

mnist_model = Sequential([
    layers.Conv2D(filters=32, kernel_size=7, input_shape=[28, 28, 1]),
    layers.MaxPool2D(),
    conv_layer(filters=64),
    conv_layer(filters=64),
    layers.MaxPool2D(),
    conv_layer(filters=128),
    conv_layer(filters=128),
    layers.GlobalAvgPool2D(),
    layers.Dropout(0.3),
    layers.Dense(10, activation="softmax")
])

In [None]:
mnist_optimizer = optimizers.Nadam()
mnist_loss = losses.sparse_categorical_crossentropy
mnist_metric = "accuracy"

mnist_model.compile(loss=mnist_loss, optimizer=mnist_optimizer, metrics=[mnist_metric])

# Define the callbacks to avoid the model been trained longer
# than necessary

local_logs_path = os.path.join("..", "..", "reports", "logs", "chapter_14", "mnist")
log_dir = help_functions.get_logdir(date_type="datetime", path_folder=local_logs_path)
model_path = os.path.join(MODEL_PATH, "mnist_model.h5")
tensorboard_cb = callbacks.TensorBoard(
    log_dir=log_dir, profile_batch=10, histogram_freq=1
)
early_stopping_cb = callbacks.EarlyStopping(patience=5)
model_checkpoint_cb = callbacks.ModelCheckpoint(
    filepath=model_path, save_best_only=True
)
callbacks_list = [tensorboard_cb, early_stopping_cb, model_checkpoint_cb]


In [None]:
mnist_model.fit(
    train_set,
    validation_data=valid_set,
    callbacks=callbacks_list,
    epochs=100,
)

evaluation = mnist_model.evaluate(test_set, verbose=0)
print(f'The accuracy of the model is {evaluation[1] * 100:.4f}')

#### 10. Use transfer learning for large image classification, going through these steps:

##### a. Create a training set containing at aleast 100 images per class.
##### b. Split it into a training set, a validation set, and a test set.
##### c. Build the input pipeline, apply the appropriate preprocessing operations, and optionally add data augmentation.
##### d. Fine-tune a pretrained model on this dataset.

*The dataset we'll use for this example is the [`colorectal_histology`](https://www.tensorflow.org/datasets/catalog/colorectal_histology) dataset from TensorFlow. The dataset is composed of 5000 images `150x150x3` classified in 8 different classes. The dataset returns only training set, and for the splitting is necessary to sample the same amount for each class. There are 625 images per class, and we will divide them in 70% for training, 15% for validation, and 15% for test*

In [None]:
HIST_CLASSES = [
    "Tumor",
    "Stroma",
    "Complex",
    "Lympho",
    "Debris",
    "Mucosa",
    "Adipose",
    "Empty"
]

MOBILENET_MODEL_PATH = "https://tfhub.dev/google/imagenet/mobilenet_v3_small_100_224/feature_vector/5"

In [None]:
# Create the folder to store the dataset
hist_path = os.path.join(DATA_PATH, "colorectal_histology")
os.makedirs(hist_path, exist_ok=True)

histology_dataset, histology_info = tfds.load(
    "colorectal_histology",
    split="train",
    data_dir=hist_path,
    as_supervised=True,
    with_info=True,
)


*When loading the data directly from TensorFlow Datasets, the data cannot be assured to have the same amount of images per class. That is why it is necessary to make this distribution on our side:*

In [None]:
train_set, valid_set, test_set = ml_functions.balanced_split(histology_dataset, verbose=True)

In [None]:
train_set

In [None]:
plt.figure(figsize=(12, 12))
for images, labels in train_set.shuffle(5000).take(1):
    for idx in range(9):
        plt.subplot(3, 3, idx + 1)
        plt.imshow(images[idx])
        plt.title(f"Class: {HIST_CLASSES[labels[idx]]}")
        plt.axis("off")


In [None]:
def pipeline_dataset(dataset, split='train'):
    dataset = dataset.cache()
    if split == 'train':
        dataset.shuffle(5000)
    dataset = dataset.batch(512).prefetch(AUTOTUNE)
    
    return dataset


In [None]:
def train_model(train_set, valid_set, test_set, epochs=100):

    # Pipeline for the datasets
    train_set = pipeline_dataset(train_set)
    valid_set = pipeline_dataset(valid_set)
    test_set = pipeline_dataset(test_set)

    # # Normalization of the pixel values
    # train_sample = train_set.take(1).map(lambda image, label: image)
    # normalizer = layers.Normalization(input_shape=[None, 150, 150, 3])
    # normalizer.adapt(train_sample)

    # train_set = normalizer(train_set)
    # valid_set = normalizer(valid_set)
    # test_set = normalizer(test_set)

    # Data Augmentation
    data_augmentation = Sequential(
        [
            layers.RandomBrightness(factor=0.3),
            layers.RandomFlip(),
            layers.RandomContrast(factor=0.3),
        ]
    )

    # Get model from Tensorflow Hub
    resnet_101 = Sequential(
        [
            hub.KerasLayer(
                MOBILENET_MODEL_PATH,
                trainable=True,
            )
        ]
    )

    # Model
    model = Sequential(
        [
            layers.Normalization(input_shape=[150, 150, 3]),
            layers.ZeroPadding2D(padding=37),
            data_augmentation,
            resnet_101,
            layers.Dense(8, activation="softmax"),
        ],
        name="colorectal_histology_model",
    )

    # Compile the model

    optimizer = optimizers.Adam(learning_rate=1e-4)
    model.compile(
        loss=losses.sparse_categorical_crossentropy,
        metrics=["accuracy"],
        optimizer=optimizer,
    )

    # Callbacks
    exponential_decay_fn = learning_rate_functions.exponential_decay_with_warmup(
        lr_start=1e-4
    )
    lr_scheduler_cb = callbacks.LearningRateScheduler(exponential_decay_fn)

    hist_logs_path = os.path.join(
        "..", "..", "reports", "logs", "chapter_14", "colorectal_histology"
    )
    log_dir = help_functions.get_logdir(path_folder=hist_logs_path)
    tensorboard_cb = callbacks.TensorBoard(log_dir=log_dir)

    model_path = os.path.join(MODEL_PATH, "colorectal_histology_model.h5")
    model_checkpoint_cb = callbacks.ModelCheckpoint(
        filepath=model_path, save_best_only=True
    )
    
    early_stopping_cb = callbacks.EarlyStopping(patience=5)
    
    callbacks_list = [
        lr_scheduler_cb,
        tensorboard_cb,
        model_checkpoint_cb,
        early_stopping_cb
    ]
    
    # Training of the model
    model.fit(
        train_set,
        validation_data=valid_set,
        epochs=epochs,
        callbacks=callbacks_list,
    )
    
    evaluation = model.evaluate(test_set, verbose=0)
    
    print(f"The Training has finished. The result is the following:")
    print(f"\n\tModel Accuracy: {evaluation[1] * 100:.4f}")
    
    return model


In [None]:
keras.backend.clear_session()
tf.random.set_seed(1992)
np.random.seed(1992)

colorectal_hist_model = train_model(train_set, valid_set, test_set, epochs=10)

#### 11. Go through Tensorflow's [Style Transfer Tutorial](https://www.tensorflow.org/tutorials/generative/style_transfer).

##### Setup

In [None]:
os.environ["TFHUB_MODEL_LOAD_FORMAT"] = "COMPRESSED"

TF_MODEL = "https://tfhub.dev/google/magenta/arbitrary-image-stylization-v1-256/2"


In [None]:
from IPython import display
import matplotlib as mpl
import PIL.Image
import time
import functools

mpl.rcParams["figure.figsize"] = (12, 12)
mpl.rcParams["axes.grid"] = False


In [None]:
def tensor_to_image(tensor):
    tensor = tensor * 255
    tensor = np.array(tensor, dtype=np.uint8)

    if np.ndim(tensor) > 3:
        assert tensor.shape[0] == 1
        tensor = tensor[0]
    return PIL.Image.fromarray(tensor)


In [None]:
content_path = tf.keras.utils.get_file(
    "YellowLabradorLooking_new.jpg",
    "https://storage.googleapis.com/download.tensorflow.org/example_images/YellowLabradorLooking_new.jpg",
)
style_path = tf.keras.utils.get_file(
    "kandinsky5.jpg",
    "https://storage.googleapis.com/download.tensorflow.org/example_images/Vassily_Kandinsky%2C_1913_-_Composition_7.jpg",
)


##### Visualize the image

In [None]:
def load_img(path_to_img):
    max_dim = 512
    img = tf.io.read_file(path_to_img)
    img = tf.image.decode_image(img, channels=3)
    img = tf.image.convert_image_dtype(img, tf.float32)

    shape = tf.cast(tf.shape(img)[:-1], tf.float32)
    long_dim = max(shape)
    scale = max_dim / long_dim

    new_shape = tf.cast(shape * scale, tf.int32)

    img = tf.image.resize(img, new_shape)
    img = img[tf.newaxis, :]
    return img


def imshow(image, title=None):
    if len(image.shape) > 3:
        image = tf.squeeze(image, axis=0)
    
    plt.imshow(image)
    if title:
        plt.title(title)


In [None]:
content_image = load_img(content_path)
style_image = load_img(style_path)

plt.subplot(1, 2, 1)
imshow(content_image, "Content Image")

plt.subplot(1, 2, 2)
imshow(style_image, "Style Image")

##### Fast Style Transfer Using TF-Hub

In [None]:
transfer_model = hub.load(TF_MODEL)
stylized_image = transfer_model(tf.constant(content_image), tf.constant(style_image))[0]
tensor_to_image(stylized_image)

##### Define content and style Representations

In [None]:
pre_image = keras.applications.vgg19.preprocess_input(content_image * 255)
pre_image = tf.image.resize(pre_image, (224, 224))
vgg19 = keras.applications.VGG19(include_top=True, weights="imagenet")
prediction_probs = vgg19(pre_image)
prediction_probs.shape

In [None]:
top_5 = tf.keras.applications.vgg19.decode_predictions(prediction_probs.numpy())[0]
[(class_name, prob) for (number, class_name, prob) in top_5]

In [None]:
vgg19 = applications.VGG19(include_top=False, weights="imagenet")

print()
for layer in vgg19.layers:
    print(layer.name)

In [None]:
content_layers = ["block5_conv2"]

style_layers = [
    "block1_conv1",
    "block2_conv1",
    "block3_conv1",
    "block4_conv1",
    "block5_conv1",
]

num_content_layers = len(content_layers)
num_style_layers = len(style_layers)

##### Build the Model

In [None]:
def vgg_layers(layer_names):
    vgg = tf.keras.applications.VGG19(include_top=False, weights="imagenet")
    vgg.trainable = False
    
    outputs = [vgg.get_layer(name).output for name in layer_names]

    model = tf.keras.Model([vgg.input], outputs)
    return model

In [None]:
style_extractor = vgg_layers(style_layers)
style_outputs = style_extractor(style_image * 255)

for name, output in zip(style_layers, style_outputs):
    output = output.numpy()
    print(name)
    print(f"\tshape: {output.shape}")
    print(f"\tmin: {output.min()}")
    print(f"\tmax: {output.max()}")
    print(f"\tmean:{output.mean()}\n")

##### Calculate Style

In [None]:
def gram_matrix(input_tensor):
    result = tf.linalg.einsum("bijc,bijd->bcd", input_tensor, input_tensor)
    input_shape = tf.shape(input_tensor)
    num_locations = tf.cast(input_shape[1] * input_shape[2], tf.float32)
    return result / (num_locations)

##### Extract Style and Content

In [None]:
class StyleContentModel(tf.keras.models.Model):
    def __init__(self, style_layers, content_layers):
        super(StyleContentModel, self).__init__()
        self.vgg = vgg_layers(style_layers + content_layers)
        self.style_layers = style_layers
        self.content_layers = content_layers
        self.num_style_layers = len(style_layers)
        self.vgg.trainable = False

    def call(self, inputs):
        "Expects float input in [0,1]"
        inputs = inputs * 255.0
        preprocessed_image = applications.vgg19.preprocess_input(inputs)
        outputs = self.vgg(preprocessed_image)
        
        style_outputs, content_outputs = (
            outputs[: self.num_style_layers],
            outputs[self.num_style_layers :],
        )
        style_outputs = [gram_matrix(output) for output in style_outputs]
        content_dict = {
            content_name: value
            for content_name, value in zip(self.content_layers, content_outputs)
        }
        style_dict = {
            style_name: value
            for style_name, value in zip(self.style_layers, style_outputs)
        }

        return {"content": content_dict, "style": style_dict}


In [None]:
extractor = StyleContentModel(style_layers, content_layers)

results = extractor(tf.constant(content_image))

print("Styles:")
for name, output in sorted(results["style"].items()):
    print("  ", name)
    print("    shape: ", output.numpy().shape)
    print("    min: ", output.numpy().min())
    print("    max: ", output.numpy().max())
    print("    mean: ", output.numpy().mean())
    print()

print("Contents:")
for name, output in sorted(results["content"].items()):
    print("  ", name)
    print("    shape: ", output.numpy().shape)
    print("    min: ", output.numpy().min())
    print("    max: ", output.numpy().max())
    print("    mean: ", output.numpy().mean())


##### Run Gradient Descent

In [None]:
style_targets = extractor(style_image)['style']
content_targets = extractor(content_image)['content']

In [None]:
image = tf.Variable(content_image)

In [None]:
def clip_0_1(image):
    return tf.clip_by_value(image, clip_value_min=0.0, clip_value_max=1.0)

In [None]:
opt = optimizers.Adam(learning_rate=0.02, beta_1=0.99, epsilon=1e-1)
style_weight = 1e-2
content_weight = 1e4

In [None]:
def style_content_loss(outputs):
    style_outputs = outputs["style"]
    content_outputs = outputs["content"]
    style_loss = tf.add_n(
        [
            tf.reduce_mean((style_outputs[name] - style_targets[name]) ** 2)
            for name in style_outputs.keys()
        ]
    )
    style_loss *= style_weight / num_style_layers

    content_loss = tf.add_n(
        [
            tf.reduce_mean((content_outputs[name] - content_targets[name]) ** 2)
            for name in content_outputs.keys()
        ]
    )
    content_loss *= content_weight / num_content_layers
    loss = style_loss + content_loss
    return loss


In [None]:
@tf.function()
def train_step(image):
    with tf.GradientTape() as tape:
        outputs = extractor(image)
        loss = style_content_loss(outputs)

    grad = tape.gradient(loss, image)
    opt.apply_gradients([(grad, image)])
    image.assign(clip_0_1(image))
        

In [None]:
for i in range(5):
    train_step(image)

tensor_to_image(image)

In [None]:
start = time.time()

epochs = 15
steps_per_epoch = 100

step = 0
for n in range(epochs):
    for m in range(steps_per_epoch):
        step += 1
        train_step(image)
        print(".", end="", flush=True)
    display.clear_output(wait=True)
    display.display(tensor_to_image(image))
    print(f"Train step : {step}")

end = time.time()
print(f"Total time: {end - start}")

In [None]:
def high_pass_x_y(image):
    x_var = image[:, :, 1:, :] - image[:, :, :-1, :]
    y_var = image[:, 1:, :, :] - image[:, :-1, :, :]

    return x_var, y_var


In [None]:
x_deltas, y_deltas = high_pass_x_y(content_image)

plt.figure(figsize=(14, 10))
plt.subplot(2, 2, 1)
imshow(clip_0_1(2*y_deltas+0.5), "Horizontal Deltas: Original")

plt.subplot(2, 2, 2)
imshow(clip_0_1(2*x_deltas+0.5), "Vertical Deltas: Original")

x_deltas, y_deltas = high_pass_x_y(image)

plt.subplot(2, 2, 3)
imshow(clip_0_1(2*y_deltas+0.5), "Horizontal Deltas: Styled")

plt.subplot(2, 2, 4)
imshow(clip_0_1(2*x_deltas+0.5), "Vertical Deltas: Styled")

In [None]:
plt.figure(figsize=(14, 10))

sobel = tf.image.sobel_edges(content_image)
plt.subplot(1, 2, 1)
imshow(clip_0_1(sobel[..., 0]/4+0.5), "Horizontal Sobel-edges")
plt.subplot(1, 2, 2)
imshow(clip_0_1(sobel[..., 1]/4+0.5), "Vertical Sobel-edges")

In [None]:
tf.image.total_variation(image).numpy()

##### Re-run the optimization

In [None]:
total_variation_weight = 30

@tf.function()
def train_step(image):
    with tf.GradientTape() as tape:
        outputs = extractor(image)
        loss = style_content_loss(outputs)
        loss += total_variation_weight * tf.image.total_variation(image)
    
    grad = tape.gradient(loss, image)
    opt.apply_gradients([(grad, image)])
    image.assign(clip_0_1(image))

In [None]:
opt = optimizers.Adam(learning_rate=2e-2, beta_1=0.99, epsilon=1e-1)
image = tf.Variable(content_image)

In [None]:
start = time.time()

for n in range(epochs):
    for m in range(steps_per_epoch):
        step += 1
        train_step(image)
        print(f".", end="", flush=True)
    display.clear_output(wait=True)
    display.display(tensor_to_image(image))
    print(f"Train step: {step}")

end = time.time()
print(f"Total Time: {end-start:.2f}")


In [None]:
file_name = '../../reports/figures/chapter_14/stylized-image.png'
tensor_to_image(image).save(file_name)