In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import keras

from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.applications.inception_v3 import InceptionV3
from keras.preprocessing import image
from keras.models import Model
from keras.layers import Conv2D, Dense, GlobalAveragePooling2D, Dropout, Activation, MaxPooling2D, Flatten
from keras import backend as K
from keras.optimizers import SGD
from keras import Sequential

# Load Data and Data Augumentation

### Example


```python
# Data augumentation example
datagen = ImageDataGenerator(
        rotation_range=40,
        width_shift_range=0.2,
        height_shift_range=0.2,
        shear_range=0.2,
        zoom_range=0.2,
        horizontal_flip=True,
        fill_mode='nearest')

img = load_img('resources/data/small-food-101/data/train/cheesecake/72295.jpg')  # this is a PIL image
x = img_to_array(img)  # this is a Numpy array with shape (3, 150, 150)
x = x.reshape((1,) + x.shape)  # this is a Numpy array with shape (1, 3, 150, 150)

# the .flow() command below generates batches of randomly transformed images
# and saves the results to the `preview/` directory
i = 0
for batch in datagen.flow(x, batch_size=1,
                          save_to_dir='resources/gen-data', 
                          save_prefix='cheesecake', 
                          save_format='jpeg'):
    i += 1
    if i > 20:
        break  # otherwise the generator would loop indefinitely
```

In [None]:
#base_path="/Users/jean/workspaces/image-classification/resources/data"
base_path="../../../data/generated"
dataset_name = "food-004-0.1"

train_path = f"{base_path}/{dataset_name}/data/train"
test_path = f"{base_path}/{dataset_name}/data/validation"
validation_path = f"{base_path}/{dataset_name}/data/test"

batch_size = 16
height = 150
width = 150

input_shape = (3, width, height) if K.image_data_format() == 'channels_first' else (width, height, 3)

train_datagen = ImageDataGenerator(
        rescale=1./255,
        shear_range=0.2,
        zoom_range=0.2,
        width_shift_range=0.2,
        height_shift_range=0.2,
        rotation_range=40,
        horizontal_flip=True,
        vertical_flip=True,
        fill_mode='nearest')

train_generator = train_datagen.flow_from_directory(
        train_path,
        target_size=(height, width),
        color_mode='rgb',
        batch_size=batch_size,
        class_mode='categorical')  # because categorical_crossentropy loss

val_datagen = ImageDataGenerator(rescale=1./255)
validation_generator = val_datagen.flow_from_directory(
        validation_path,
        target_size=(height, width),
        color_mode='rgb',
        batch_size=batch_size,
        class_mode='categorical')

test_datagen = ImageDataGenerator(rescale=1./255)
test_generator = test_datagen.flow_from_directory(
        test_path,
        target_size=(height, width),
        color_mode='rgb',
        batch_size=batch_size,
        class_mode='categorical')

# Callbacks configuration

In [None]:
class plot_train_progress(keras.callbacks.Callback):
    def __init__(self, plot_every_n_epochs = 5, plot_fraction = 0.9):
        self.plot_frequency = plot_every_n_epochs
        self.plot_fraction = plot_fraction #Plot only the most recent values
        
    def on_train_begin(self, logs={}):
        self.train_losses = []
        self.val_losses = []
        self.train_acc = []
        self.val_acc = []
        self.logs = []

    def on_epoch_end(self, epoch, logs={}):
        self.logs.append(logs)
        self.train_losses.append(logs.get('loss'))
        self.train_acc.append(logs.get('acc'))
        self.val_losses.append(logs.get('val_loss'))
        self.val_acc.append(logs.get('val_acc'))
            
        if epoch % self.plot_frequency == 0:
            # You can chose the style of your preference
            # print(plt.style.available) to see the available options
            plt.style.use("seaborn")
            
            nr_values_to_plot = int(self.plot_fraction * len(self.train_losses))
            if nr_values_to_plot > 1:
                t = list(range(nr_values_to_plot))
                plt.plot(t, self.train_losses[-nr_values_to_plot:], label = "train_loss")
                plt.plot(t, self.val_losses[-nr_values_to_plot:], label = "val_loss")
                plt.plot(t, self.train_acc[-nr_values_to_plot:], label = "train_acc")
                plt.plot(t, self.val_acc[-nr_values_to_plot:], label = "val_acc")            
                plt.title("Training Loss and Accuracy [Epoch {}]".format(epoch))
                plt.xlabel('# Batches')
                plt.ylabel('Loss/Accuracy')
                plt.legend()
                #plt.savefig('train_progress.jpg')
                plt.show()

    def on_batch_end(self, batch, logs={}):
        self.train_losses.append(logs.get('loss'))
        self.train_acc.append(logs.get('acc'))
        self.val_losses.append(logs.get('val_loss'))
        self.val_acc.append(logs.get('val_acc'))

# TODO:
# TensorBoard callback
# Email notification callback

checkpointer = ModelCheckpoint(filepath='model.weights.best.hdf5', verbose = 1, save_best_only=True)
earlyStopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='min')
callbacks_list = [checkpointer, plot_train_progress(5, .9), earlyStopping]

## Model definition

In [None]:
# Configuration
n_channels = 3 # RGB


# TODO: get from data generator
n_classes = 4
n_train_samples = 272
n_validation_samples = 28
n_test_samples = 100
n_epochs=10
optimizer='rmsprop'

model_to_use='simple'

if model_to_use == 'inception':
    # Model definition
    base_model = InceptionV3(
        include_top=False, 
        weights='imagenet', 
        input_shape=(width, height, n_channels)) #input_tensor=None
    x = base_model.output
    x = GlobalAveragePooling2D()(x) # or: pooling='avg'
    x = Dense(1024, activation='relu')(x)
    predictions = Dense(n_classes, activation='softmax')(x)

    model = Model(inputs=base_model.input, outputs=predictions)
   
    # first: freeze all convolutional InceptionV3 layers
    for layer in base_model.layers:
        layer.trainable = False
        
elif model_to_use == 'custom-inception':
    base_model = InceptionV3(
        include_top=False, 
        weights='imagenet', 
        input_shape=(width, height, n_channels)) #input_tensor=None

    x = base_model.output
    x = GlobalAveragePooling2D()(x) # or: pooling='avg'
    x = Dense(256, activation='relu')(x)
    x = Dropout(0.3)(x)
    x = Dense(1024, activation='relu')(x)
    predictions = Dense(n_classes, activation='softmax')(x)

    model = Model(inputs=base_model.input, outputs=predictions)

    # train the top 2 inception blocks
    for layer in model.layers[:249]:
       layer.trainable = False
    for layer in model.layers[249:]:
       layer.trainable = True

    optimizer=SGD(lr=0.0001, momentum=0.9)
    checkpointer = ModelCheckpoint(filepath='model.weights.best.hdf5_tunning', verbose = 1, save_best_only=True)
    callbacks_list = [checkpointer, plot_train_progress(5, .9), earlyStopping]
else:
    model = Sequential()
    model.add(Conv2D(32, (3, 3), input_shape=input_shape, padding='same'))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(64, (3, 3)))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(32, (3, 3)))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Conv2D(64, (3, 3)))
    model.add(Activation('relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))

    model.add(Flatten())
    model.add(Dense(64))
    model.add(Activation('relu'))
    model.add(Dropout(0.3))

    model.add(Dense(n_classes))
    model.add(Activation('softmax'))

model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])


In [None]:
# Take a look at the model summary
model.summary()

## Train

In [None]:
#model.fit(x_train_color, y_train_color,
#          batch_size=128,
#          epochs=10,
#          validation_data=(x_valid_color, y_valid_color),
#          callbacks=callbacks_list)

model.fit_generator(
        train_generator,
        epochs=n_epochs,
        steps_per_epoch = n_train_samples // batch_size,
        validation_data = validation_generator,
        validation_steps = n_validation_samples // batch_size,
        callbacks=callbacks_list)

# Evaluation

## Model evaluation - test set

In [None]:
model.load_weights('model.weights.best.hdf5')
test_score = model.evaluate_generator(test_generator, steps=100, workers=4)

print(f"Loss: {test_score[0]} ; Acc: {test_score[1]}")

## Test set prediction

In [None]:
test_prediction = model.predict_generator(test_generator, workers=4)

In [None]:
# To avoid rely on the dictionay keys order (e.g. dataset_labels = test_generator.class_indices.keys()),
# first invert the class_indices dictionary and then generate the orderted list of labels
#
dataset_labels_inverted = {v: k for k, v in test_generator.class_indices.items()}
dataset_labels = [dataset_labels_inverted[k] for k in sorted(dataset_labels_inverted.keys())]

y_pred=test_prediction
y_pred_integers = np.argmax(y_pred, axis = 1)
y_pred_integers

# TODO: get the true labels from dataset metadata
import random
true_labels = [random.randint(0, 3) for _ in range(100)] 




## Utility and visualization functions

In [None]:
def visualize_model_predictions(model, classes, test_set, y_test, y_hat, title_string):
    # Plot a random sample of 10 test images, their predicted labels and ground truth
    figure = plt.figure(figsize=(40, 40))
    n_instances = test_set.shape[0]
    for i, index in enumerate(np.random.choice(n_instances, size=n_instances, replace=False)):
        ax = figure.add_subplot(4, 4, i + 1, xticks=[], yticks=[])
        # Display each image
        ax.imshow(np.squeeze(test_instances[index]), cmap = 'gray')
        predict_index = np.argmax(y_hat[index])
        true_index = np.argmax(y_test[index])
        # Set the title for each image
        ax.set_title("{} ({})".format(classes[predict_index], 
                                      classes[true_index]),
                                      color=("green" if predict_index == true_index else "red"),
                                      fontsize=45)
    figure.suptitle("%s results:" %title_string, fontsize=25)
    
import itertools
import numpy as np
import matplotlib.pyplot as plt

from sklearn import svm, datasets
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Building normalized confusion matrix")
    else:
        print('Building confusion matrix, without normalization')
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()


## Compute confusion matrix

In [None]:

# Compute confusion matrix
cnf_matrix = confusion_matrix(true_labels, y_pred_integers)

np.set_printoptions(precision=2)
# Plot non-normalized confusion matrix
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=dataset_labels,
                      title='Confusion matrix, without normalization')

# Plot normalized confusion matrix
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=dataset_labels, normalize=True,
                      title='Normalized confusion matrix')

plt.show()

## Visualize classification for a batch test data

In [None]:


test_set, y = test_generator.next()
y_hat = model.predict(X).argmax(axis=-1)
# y
labels = [classes[c.argmax(axis=-1)] for c in y]
predictions = [classes[c] for c in y_hat]

print("Test set true labels:")
print(labels)

print("\n\nTest set predictions:")
print(predictions)

visualize_model_predictions(model, dataset_labels, test_set, y, y_hat, 'Test')