# **CONVOLUTIONAL NEURAL NETWORK (CNN)**

## Problem

Train a CNN on the [Cats-vs-Dogs dataset]().

Note: Dataset is not divided into train-vaidation-test. Needs a lot of pre-processing.

## Initialize

In [10]:
import os
import zipfile
import shutil

import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_datasets as tfds

from tensorflow.keras.layers import Input, Dense, Flatten, Conv2D, MaxPool2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator

from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score
from matplotlib import pyplot as plt

In [11]:
# set random seeds
np.random.seed(0)
tf.random.set_seed(0)

# show figures inline
%matplotlib inline

## Dataset

Download the datase.

In [3]:
data_url = 'https://download.microsoft.com/download/3/E/1/' \
           '3E1C3F21-ECDB-4869-8368-6DEBA77B919F/kagglecatsanddogs_3367a.zip'
save_dir = '../.tmp'
name_zip = 'cats-and-dogs.zip'
root_dir = f'{save_dir}/cats-and-dogs'

# if not already there, download the files
if not os.path.exists(root_dir):
    # download the dataset
    os.makedirs(save_dir, exist_ok=True)
    os.system(f"""wget --no-check-certificate {data_url} -O {save_dir}/{name_zip}""")

    # unzip the file
    zip_ref = zipfile.ZipFile(f'{save_dir}/{name_zip}', 'r')
    zip_ref.extractall(root_dir)
    zip_ref.close()

# see the file structure
for root, dirs, files in os.walk(root_dir):
    level = root.replace(root_dir, '').count(os.sep)
    indent = ' ' * 4 * level
    print(f'{indent}{os.path.basename(root)}/')
    subindent = ' ' * 4 * (level + 1)
    for i, f in enumerate(files):
        print(f'{subindent}{f}')
        if i >= 2:
            print(f'{subindent}...')
            break

cats-and-dogs/
    MSR-LA - 3467.docx
    readme[1].txt
    PetImages/
        Cat/
            0.jpg
            1.jpg
            10.jpg
            ...
        Dog/
            0.jpg
            1.jpg
            10.jpg
            ...


## Data Preprocessing

In [4]:
# see number of files
print('# cat images:', len(os.listdir(f'{root_dir}/PetImages/Cat/')))
print('# dog images:', len(os.listdir(f'{root_dir}/PetImages/Dog/')))

# cat images: 12501
# dog images: 12501


**Split the dataset**


In [7]:
# create training and testing folders
for subset in ['train', 'test']:
    for class_name in ['cat', 'dog']:
        os.makedirs(f'{root_dir}/{subset}/{class_name}', exist_ok=True)
        print(f"directory '{root_dir}/{subset}/{class_name}' is created")

directory '../.tmp/cats-and-dogs/train/cat' is created
directory '../.tmp/cats-and-dogs/train/dog' is created
directory '../.tmp/cats-and-dogs/test/cat' is created
directory '../.tmp/cats-and-dogs/test/dog' is created


In [12]:
def split_dataset(dir_source, dir_train, dir_test, split_ratio):
    """
    Reads the files available in 'dir_source'.
    Splits it by 'split_ratio' (train to test ratio).
    Puts the training and testing files
    in 'dir_train' and 'dir_test' folder, respectively.
    """

    # get all the filenames in source folder
    filenames = []  # list of all file names

    # read the source files
    for fn in os.listdir(dir_source):
        filepath = f'{dir_source}/{fn}'
        # if file's size >0, append to the list
        if os.path.getsize(filepath) > 0:
            filenames.append(fn)
        else:
            print(f"Size of '{fn}' is zero. So, ignored.")

    # determine the size of training subset
    num_train = int(len(filenames) * split_ratio)

    # randomly shuffle the file names
    np.random.shuffle(filenames)

    # training and testing subsets
    train_files = filenames[:num_train]
    test_files = filenames[num_train:]

    # function for copying all files
    def copy_all_files(src, dst, file_names):
        for file in file_names:
            shutil.copy(f'{src}/{file}', f'{dst}/{file}')

    # copy training files
    copy_all_files(dir_source, dir_train, train_files)
    # copy test files
    copy_all_files(dir_source, dir_test, test_files)

Size of '666.jpg' is zero. So, ignored.
Size of '11702.jpg' is zero. So, ignored.


In [None]:
SPLIT_RATIO = 0.9

# split the cat and dog images into training and testing subsets
for class_name in ['cat', 'dog']:
    # source directory
    _dir_source = f'{root_dir}/PetImages/{class_name.capitalize()}'

    # train-test directories
    _dir_train = f'{root_dir}/train/{class_name}'
    _dir_test = f'{root_dir}/test/{class_name}'

    # split the dataset
    split_dataset(_dir_source, _dir_train, _dir_test, SPLIT_RATIO)

## Data Augmentation

In [None]:
# normalizes the input feature
train_datagen = ImageDataGenerator(
    rescale=1 / 255.,
)

# train data generator
train_generator = train_datagen.flow_from_directory(
    directory=root_dir,
    target_size=(150, 150),
    batch_size=64,
    class_mode='binary',
    shuffle=True
)

# test data generator without shuffling
# NOTE: ideally, you should not use training data for testing
test_generator = train_datagen.flow_from_directory(
    directory=root_dir,
    target_size=(150, 150),
    batch_size=64,
    class_mode='binary',
    shuffle=False
)

In [None]:
# get name of the classes
labels = list(test_generator.class_indices.keys())
# capitalize
labels = [l.capitalize() for l in labels]
# get indices of the classes
indices = list(test_generator.class_indices.values())

In [None]:
# get one batch
for (batch_images, batch_labesl) in test_generator:
    # get one sample
    image = batch_images[0]
    label = labels[int(batch_labesl[0])]

    # plot
    plt.figure()
    plt.title(f"Size: {image.shape}\nLabel: {label}")
    plt.axis('off')
    plt.imshow(image.squeeze())
    plt.show()

    break

# reset
test_generator.reset()

## Model

**Note**
- The problem is binary classification.
- Use `'sigmoid'` activation function in the output layer.
- Use `'binary_crossentropy'` loss.

In [None]:
# input layer
input_tensor = Input(shape=[150, 150, 3])

# convolution layers
x = Conv2D(64, 3, activation='relu')(input_tensor)
x = MaxPool2D()(x)
x = Conv2D(32, 3, activation='relu')(x)
x = MaxPool2D()(x)
x = Conv2D(32, 3, activation='relu')(x)
x = MaxPool2D()(x)

# fully-connected layers
x = Flatten()(x)
x = Dense(512, activation='relu')(x)
x = Dense(128, activation='relu')(x)

# output layer with 'sigmoid' activation function
output_tensor = Dense(1, activation='sigmoid')(x)

# model
model = tf.keras.Model(input_tensor, output_tensor)

# compile with 'binary_crossentropy' loss
model.compile(
    optimizer=Adam(),
    loss='binary_crossentropy',
    metrics=['acc']
)

# model summary
model.summary()

## Callback

- You can use [pre-defined callbacks](https://www.tensorflow.org/api_docs/python/tf/keras/callbacks).
- Or, you can define custom callback to have more control over what happens during the training or prediction.
[See more](https://www.tensorflow.org/api_docs/python/tf/keras/callbacks/Callback).

**Note**:
- Use `log.get()` to get desired metric to monitor.
- The parameter name in `log.get()` should match the metric defined in `model.compile()`.

In [None]:
class MonitorAccuracy(tf.keras.callbacks.Callback):
    """
    a custom class of callback
    to check accuracy after end of each epoch, and
    to stop training when a certain level of accuracy is reached
    """

    def __init__(self, stop_accuracy=0.99):
        # initiate
        super(MonitorAccuracy, self).__init__()
        self.stop_accuracy = stop_accuracy

    def on_epoch_end(self, epoch, logs=None):
        # at the end of the epoch, print loss and accuracy
        print(f"Epoch {epoch+1} - loss: {logs.get('loss'):.4f} - acc: {logs.get('acc'):.4f}")

        # if accuracy is greater than the given 'stop_accuracy':
        if logs.get('acc') > self.stop_accuracy:
            # print the termination message
            print(f"\nAccuracy reached to {self.stop_accuracy}. So, cancelling training...")
            # stop training
            self.model.stop_training = True


monitor_acc = MonitorAccuracy(0.999)

## Training

In [None]:
# train the model and save the history
hist = model.fit(
    train_generator,
    epochs=100,
    verbose=0,
    callbacks=[monitor_acc]
)

In [None]:
# plot the loss and accuracy
fig = plt.figure()
ax1 = fig.gca()

ax2 = ax1.twinx()
ax1.plot(hist.history['acc'], label='Accuracy', color='r')
ax2.plot(hist.history['loss'], label='Loss', color='b')

ax1.set_xlabel('Epoch')
ax1.set_ylabel('Accuracy', color='r')
ax2.set_ylabel('Loss', color='b')

plt.show()

## Evauation

In [None]:
# test
# NOTE: 'test_generator' uses training data, without shuffling
# ideally, you should not use training data for testing
y_pred = model.predict(test_generator)
y_pred = y_pred > 0.5

**Confusion matrix**

In [None]:
# get true test labels
y_test = test_generator.labels

# confusion matrix
conf_mat = confusion_matrix(y_test, y_pred, normalize='true')

In [None]:
# figure for displaying the confusion matrix
fig = plt.figure(figsize=(6, 6))
ax = fig.gca()

# display the confution matrix
cax = ax.matshow(conf_mat, cmap='Blues')

# show the values
for (i, j), z in np.ndenumerate(conf_mat):
    text_color = 'w' if i == j else 'k'
    if z < 0.005:
        continue
    ax.text(j, i, '{:0.2f}'.format(z), ha='center', va='center', c=text_color)

# title and axis labels
plt.title('Confusion matrix')
plt.xlabel('Predicted')
plt.ylabel('True')

# show class names
plt.xticks(indices, labels)
plt.yticks(indices, labels)
ax.tick_params(axis='both', which='both', length=0)

# show grid lines
ax.set_xticks(np.arange(-.5, 2, 1), minor=True)
ax.set_yticks(np.arange(-.5, 2, 1), minor=True)
ax.grid(which='minor', color='k', linestyle='-', linewidth=1)

plt.show()

**Other metrics**

In [None]:
# calculate precision, recall, and f1 score
precision = precision_score(y_test, y_pred, average=None)
recall = recall_score(y_test, y_pred, average=None)
f1 = f1_score(y_test, y_pred, average=None)

In [None]:
# pandas data frame for storing the metrics
df = pd.DataFrame({
        'Precision': precision,
        'Recall': recall,
        'F1-score': f1
},
    index=labels
)

# calculate the mean
df.loc['(Average)'] = df.mean()

# display
df