# Image classification with Convolutional Neural Networks
### Machine Learning in Industrial Environments

Let's classify Simpsons's characters.


<center><img src="https://i.imgur.com/i8zIGqX.jpg" style="text-align: center" height="300px"></center>

The dataset of Simpson's characters have been taken from the series. The author is [Alexandre Attia](http://www.alexattia.fr/) and it is much more complex of Fashion-MNIST with 18 classes and characters in different poses

The training dataset can de downloaded here:

[Training data](https://onedrive.live.com/download?cid=C506CF0A4F373B0F&resid=C506CF0A4F373B0F%219337&authkey=AMzI92bJPx8Sd60) (~500MB)

The test dataset can de downloaded here:

[Test data](https://onedrive.live.com/download?cid=C506CF0A4F373B0F&resid=C506CF0A4F373B0F%219341&authkey=ANnjK3Uq1FhuAe8) (~10MB)


## In this case, we are going to reduce the dataset to a 10% of the original one. We will see how bad our CNN is going to work now, and we will see that some extra magic can be done with transfer learning, using a good and well-proved CNN architecture.



## Loading the data

In [None]:
import cv2
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
import matplotlib.pyplot as plt
import glob

from IPython.display import clear_output

In [None]:
# An interesting option to see the evolution of the training
# process while we are fitting the model, so we can stop
# the training if, e.g., we detect overfitting
class PlotLearning(keras.callbacks.Callback):
    """
    Callback to plot the learning curves of the model during training.
    """
    def on_train_begin(self, logs={}):
        self.metrics = {}
        for metric in logs:
            self.metrics[metric] = []


    def on_epoch_end(self, epoch, logs={}):
        # Storing metrics
        for metric in logs:
            if metric in self.metrics:
                self.metrics[metric].append(logs.get(metric))
            else:
                self.metrics[metric] = [logs.get(metric)]

        # Plotting
        metrics = [x for x in logs if 'val' not in x]

        f, axs = plt.subplots(1, len(metrics), figsize=(15,5))
        clear_output(wait=True)

        for i, metric in enumerate(metrics):
            axs[i].plot(range(1, epoch + 2),
                        self.metrics[metric],
                        label=metric)
            if logs['val_' + metric]:
                axs[i].plot(range(1, epoch + 2),
                            self.metrics['val_' + metric],
                            label='val_' + metric)

            axs[i].legend()
            axs[i].grid()

        plt.tight_layout()
        plt.show()

In [None]:
# Let's import the files
!wget -cq "https://www.dropbox.com/s/r4d90m3kafifx5g/simpsons_train.tar.gz"
!wget -cq "https://www.dropbox.com/s/8n895y90r5qe6gv/simpsons_test.tar.gz"

In [None]:
# Uncompress
!mkdir datasets
!tar -xzf simpsons_train.tar.gz -C ./datasets
!tar -xzf simpsons_test.tar.gz -C ./datasets

In [None]:
# The 18 characters we are going to work with.
MAP_CHARACTERS = {
    0: 'abraham_grampa_simpson', 1: 'apu_nahasapeemapetilon', 2: 'bart_simpson',
    3: 'charles_montgomery_burns', 4: 'chief_wiggum', 5: 'comic_book_guy', 6: 'edna_krabappel',
    7: 'homer_simpson', 8: 'kent_brockman', 9: 'krusty_the_clown', 10: 'lisa_simpson',
    11: 'marge_simpson', 12: 'milhouse_van_houten', 13: 'moe_szyslak',
    14: 'ned_flanders', 15: 'nelson_muntz', 16: 'principal_skinner', 17: 'sideshow_bob'
}

# We will work with 64x64 images
IMG_SIZE = 64

In [None]:
def load_train_set(dirname, map_characters, verbose=True):
    """This function loads the training data images.

    As they have different sizes, we resize them to IMG_SIZE x IMG_SIZE with opencv

    Args:
        dirname: complete path to the data
        map_characters: variable that maps characters
        verbose: if it is True, shows info about loaded images

    Returns:
        X, y: X is an array with all of the images with a size of IMG_SIZE x IMG_SIZE
              y is an array with the labels that correspond to each image
    """
    X_train = []
    y_train = []
    for label, character in map_characters.items():
        files = os.listdir(os.path.join(dirname, character))
        images = [file for file in files if file.endswith("jpg")]
        if verbose:
          print("Reading {} images found at {}".format(len(images), character))
        for image_name in images:
            image = cv2.imread(os.path.join(dirname, character, image_name))
            X_train.append(cv2.resize(image,(IMG_SIZE, IMG_SIZE)))
            y_train.append(label)
    return np.array(X_train), np.array(y_train)

In [None]:
def load_test_set(dirname, map_characters, verbose=True):
    """Equivalent to the previous function but for the test images."""
    X_test = []
    y_test = []
    reverse_dict = {v: k for k, v in map_characters.items()}
    for filename in glob.glob(dirname + '/*.*'):
        char_name = "_".join(filename.split('/')[-1].split('_')[:-1])
        if char_name in reverse_dict:
            image = cv2.imread(filename)
            image = cv2.resize(image, (IMG_SIZE, IMG_SIZE))
            X_test.append(image)
            y_test.append(reverse_dict[char_name])
    if verbose:
        print("Read {} images from test".format(len(X_test)))
    return np.array(X_test), np.array(y_test)


In [None]:
# We load the data. Specific for colab.
DATASET_TRAIN_PATH_COLAB = "./datasets/simpsons"
DATASET_TEST_PATH_COLAB = "./datasets/simpsons_testset"

X, y = load_train_set(DATASET_TRAIN_PATH_COLAB, MAP_CHARACTERS)
X_t, y_t = load_test_set(DATASET_TEST_PATH_COLAB, MAP_CHARACTERS)

In [None]:
# Let's shuffle by permutation the elements in test and training to maximize statistical accuracy.
perm = np.random.permutation(len(X))
X, y = X[perm], y[perm]

## We take just the 10% of the dataset

In [None]:
# What the results are if we use a small training dataset?
# For example a just 10% of the original one?
X = X[0:int(len(X)*0.1)]
y = y[0:int(len(y)*0.1)]

In [None]:
def plot_acc(history, title="Model Accuracy"):
    """Accuracy on training"""
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title(title)
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Val'], loc='upper left')
    plt.show()

def plot_loss(history, title="Model Loss"):
    """Loss on training"""
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title(title)
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Val'], loc='upper right')
    plt.show()

def plot_compare_losses(history1, history2, name1="Red 1",
                        name2="Red 2", title="Graph title"):
    """Comparing losses"""
    plt.plot(history1.history['loss'], color="green")
    plt.plot(history1.history['val_loss'], 'r--', color="green")
    plt.plot(history2.history['loss'], color="blue")
    plt.plot(history2.history['val_loss'], 'r--', color="blue")
    plt.title(title)
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Train ' + name1, 'Val ' + name1,
                'Train ' + name2, 'Val ' + name2],
               loc='upper right')
    plt.show()

def plot_compare_accs(history1, history2, name1="Red 1",
                      name2="Red 2", title="Graph title"):
    """Comparing accuracies"""
    plt.plot(history1.history['accuracy'], color="green")
    plt.plot(history1.history['val_accuracy'], 'r--', color="green")
    plt.plot(history2.history['accuracy'], color="blue")
    plt.plot(history2.history['val_accuracy'], 'r--', color="blue")
    plt.title(title)
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.legend(['Train ' + name1, 'Val ' + name1,
                'Train ' + name2, 'Val ' + name2],
               loc='lower right')
    plt.show()



**Information about the data ...**

In [None]:
X.shape

So, 1899 images of 64x64 in RGB (3 channels)

In [None]:
y.shape

In [None]:
print('The character ', y[0], ' is ', MAP_CHARACTERS[y[0]])

In [None]:
def visualize_example(x):
    plt.figure()
    plt.imshow(x)
    plt.colorbar()
    plt.grid(False)
    plt.show()

In [None]:
visualize_example(X[0])

**Normalization**

In [None]:
X = X / 255.0
X_t = X_t / 255.0

In [None]:
visualize_example(X[0])

**Creating the model**

In [None]:
from keras import Sequential

In [None]:
# Variables with required data
batch_size = 128
num_classes = 18

In [None]:
# One-hot enconding
y = keras.utils.to_categorical(y, num_classes)
y_t = keras.utils.to_categorical(y_t, num_classes)

X = X.astype('float32')
X_t = X_t.astype('float32')

In [None]:
model = Sequential()

#################################################
# A good try
#################################################


lr = 0.001
epochs = 100

model.add(keras.layers.Conv2D(filters=32, kernel_size=(3,3), padding='same', activation='relu', input_shape=(64,64,3), name='input')) #, input_shape=(64,64,3)
model.add(keras.layers.Conv2D(filters=32, kernel_size=(3,3), padding='same', activation='relu'))

model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))
model.add(keras.layers.Dropout(0.4))

model.add(keras.layers.Conv2D(filters=64, kernel_size=(3,3), padding='same', activation='relu'))
model.add(keras.layers.Conv2D(filters=64, kernel_size=(3,3), padding='same', activation='relu'))

model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))
model.add(keras.layers.Dropout(0.5))

model.add(keras.layers.Conv2D(filters=256, kernel_size=(3,3), padding='same', activation='relu'))
model.add(keras.layers.Conv2D(filters=256, kernel_size=(3,3), padding='same', activation='relu'))

model.add(keras.layers.MaxPooling2D(pool_size=(2,2)))
model.add(keras.layers.Dropout(0.5))

model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(512, activation='relu'))
model.add(keras.layers.Dropout(0.5))
model.add(keras.layers.Dense(128, activation='relu'))

model.add(keras.layers.Dense(18, activation='softmax', name="output"))  # 18 categorías


# Take a look at the model summary
model.summary()

In [None]:
from tensorflow.keras.optimizers import Adam

adam = Adam(lr=lr)

model.compile(loss='categorical_crossentropy',
             optimizer=adam,
             metrics=['accuracy'])

In [None]:
# Callback for early stopping
callback_early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)

# Training with 20% of validation
history = model.fit(X, y,
                   batch_size = batch_size,
                   epochs=epochs,
                   verbose=1,
                   callbacks=[PlotLearning(), callback_early_stopping],
                   validation_split = 0.2)  # 20% validation

In [None]:
plot_acc(history)
plot_loss(history)

**Evaluation with test data**

In [None]:
score = model.evaluate(X_t, y_t, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

**Visualizing the model**

In [None]:
from tensorflow.keras.utils import plot_model
plot_model(model, to_file='model.png', show_shapes=True, show_layer_names=True)

In [None]:
model.save('model.h5')

# Let's solve it with transfer learning

In [None]:
from tensorflow.keras.applications import MobileNet
from tensorflow.keras.applications.mobilenet import preprocess_input

In [None]:
import cv2

In [None]:
#Resize a numpy array of images to a new size
def resize_images(images, new_size):
    resized_images = np.zeros((images.shape[0], new_size[0], new_size[1], images.shape[3]))
    for i in range(images.shape[0]):
        resized_images[i] = cv2.resize(images[i], new_size)
    return resized_images

In [None]:
X.shape

In [None]:
X = resize_images(X, (224, 224))

In [None]:
X.shape

In [None]:
transfer_model = MobileNet(weights='imagenet',include_top=False)

In [None]:
for layer in transfer_model.layers:
    transfer_model.trainable=False

In [None]:
x = transfer_model.output
x = keras.layers.GlobalAveragePooling2D()(x)
x = keras.layers.Dropout(0.5)(x)
x = keras.layers.Dense(1024,activation='relu')(x) #we add dense layers so that the model can learn more complex functions and classify for better results.
x = keras.layers.Dropout(0.4)(x)
x = keras.layers.Dense(1024,activation='relu')(x) #dense layer 2
x = keras.layers.Dropout(0.3)(x)
x = keras.layers.Dense(512,activation='relu')(x) #dense layer 3
preds = keras.layers.Dense(18,activation='softmax')(x)

In [None]:
model = tf.keras.Model(inputs=transfer_model.input, outputs=preds)

In [None]:
model.compile(optimizer='Adam',loss='categorical_crossentropy',metrics=['accuracy'])

In [None]:
model.summary()

In [None]:
# Callback for early stopping
callback_early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)

# Training with 20% of validation
history = model.fit(X, y,
                   batch_size = batch_size,
                   epochs=epochs,
                   verbose=1,
                   callbacks=[PlotLearning(), callback_early_stopping],
                   validation_split = 0.2)  # 20% validation

**Evaluation with test data**

In [None]:
X_t = resize_images(X_t, (224, 224))

In [None]:
score = model.evaluate(X_t, y_t, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])