# Minimal working example of training a Keras model with a generator
Stijn Decubber

References:

    * https://stanford.edu/~shervine/blog/keras-generator-multiprocessing.html
    * https://stanford.edu/~shervine/blog/keras-how-to-generate-data-on-the-fly.html
    * https://keras.io/models/sequential/


In [1]:
import os
import cv2

import tensorflow as tf
import keras.backend as K

import pandas as pd
import numpy as np

Using TensorFlow backend.


The raw images sit in the folder ```./im```. The labels are stored in .csv files; the csv files should provide a mapping from image names to labels:

In [2]:
train = pd.read_csv('./TRAIN.csv')
test = pd.read_csv('./TEST.csv')
train.head()

Unnamed: 0,image_name,label
0,img_9847.jpg,1
1,img_33541.jpg,1
2,img_20295.jpg,1
3,img_35648.jpg,0
4,img_24267.jpg,0


In [3]:
test.head()

Unnamed: 0,image_name,label
0,img_38134.jpg,0
1,img_8597.jpg,0
2,img_29235.jpg,1
3,img_2761.jpg,1
4,img_10784.jpg,1


We will try to classify whether an MNIST image contains a 0 or an 8 (a 0 is labelled as 0, an 8 as 1 in the dataframes):
<img src="./mnist_tight.png" alt="data" style="width: 350px;"/> 

When training a model on a large dataset that does not fit into memory, you need to write a custom generator that can pass the data to the gpu in batches. Once you have the generator, fitting the model is as simple as calling ```fit_generator()``` instead of ```fit()``` on the Keras model. You can include data preprocessing or data augmentation in the generator. Just make sure that the overhead of the preprocessing does not bottleneck the GPU. 

In [4]:
# Some simple augmentation functions
def randomHorizontalFlip(image, p=0.5):
    """Do a random horizontal flip with probability p"""
    if np.random.random() < p:
        image = np.fliplr(image)
    return image

def randomVerticalFlip(image, p=0.5):
    """Do a random vertical flip with probability p"""
    if np.random.random() < p:
        image = np.flipud(image)
    return image

For this example, the generator could look as follows:

In [5]:
datapath = './im/'

class DataGenerator(object):
    """Custom generator to train a keras model
    
    df: pandas DataFrame that maps 'image_name' to 'label' (these should be columns in the df)
    im_size (int): desired image size
    batch_size (int): batch_size for training
    shuffle (bool): shuffle the data at the start of each epoch
    mode ['train', 'test']:  At test mode, do not return labels; 
    augmentation (bool): on the fly augmentation/preprocessing
    
    Call .generate() to get the actual generator
    """

    def __init__(self, df, im_size, batch_size, shuffle=True, mode='train', augmentation=False):
        self.df = df  
        self.im_size = im_size
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.mode = mode
        self.augmentation = augmentation

    def _get_instance_indexes(self):
        """Fetch the indexes from the pandas df"""
        indexes = list(self.df.index)
        if self.shuffle:
            np.random.shuffle(indexes)
        return indexes

    def _get_batch_images(self, indexes):
        """Return the images that correspond to the current batch"""
        batch_images = np.zeros((len(indexes), self.im_size, self.im_size, 3))

        # Fill up container
        for i, ix in enumerate(indexes):
            im = cv2.imread(os.path.join(datapath, self.df['image_name'][ix]))
            im = cv2.resize(im, (self.im_size, self.im_size))
            if self.augmentation:
                # Add augmentation or preprocessing here
                im = randomHorizontalFlip(im)
                im = randomVerticalFlip(im)                
                # im = my_preprocessing_function(im)
                # im = my_augmentation_function_1(im)
                # im = my_augmentation_function_2(im)

            batch_images[i] = im

        return batch_images

    def _get_batch_labels(self, indexes):
        """Return the labels that correspond to the indices of the current batch"""
        if self.mode == 'test':
            return None
        else:
            return self.df['label'][indexes]

    def generate(self):
        """The actual generator"""
        while True:
            indexes = self._get_instance_indexes()
            num_batches = int(np.ceil(len(self.df) / self.batch_size))
            for i in range(num_batches):
                if i == (num_batches - 1): # final batch can be smaller than actual batch size
                    batch_indexes = indexes[i * self.batch_size:]
                else:
                    batch_indexes = indexes[i * self.batch_size:(i + 1) * self.batch_size]

                X = self._get_batch_images(batch_indexes)
                y = self._get_batch_labels(batch_indexes)
                yield (X, y)


In [6]:
im_size = 28
batch_size = 64
train_generator_instance = DataGenerator(train, im_size=im_size, batch_size=batch_size)
train_generator_instance

<__main__.DataGenerator at 0x7fd2a6b38b38>

In [7]:
# Call .generate() for the actual generator
train_generator = train_generator_instance.generate()
train_generator

<generator object DataGenerator.generate at 0x7fd2a6c07f68>

Let's construct a simple Keras model:

In [8]:
from keras.layers import Input, Conv2D, Dense, Dropout, MaxPool2D, Flatten
from keras.models import Model

x = Input(shape=(im_size,im_size, 3))
conv_1 = MaxPool2D()(Conv2D(32, (3,3), activation='relu')(x))
conv_2 = MaxPool2D()(Conv2D(32, (3,3), activation='relu')(conv_1))
conv_3 = MaxPool2D()(Conv2D(32, (3,3), activation='relu')(conv_2))
flat = Flatten()(conv_3)
dense_1 = Dropout(0.2)(Dense(32, activation='relu')(flat))
output = Dense(1, activation='sigmoid')(dense_1)

model = Model(inputs=x, outputs=output)
print(model.summary())
from keras.optimizers import SGD, Adam
optimizer = Adam()
model.compile(optimizer=optimizer, loss='binary_crossentropy')

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         (None, 28, 28, 3)         0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 26, 26, 32)        896       
_________________________________________________________________
max_pooling2d_1 (MaxPooling2 (None, 13, 13, 32)        0         
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 11, 11, 32)        9248      
_________________________________________________________________
max_pooling2d_2 (MaxPooling2 (None, 5, 5, 32)          0         
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 3, 3, 32)          9248      
_________________________________________________________________
max_pooling2d_3 (MaxPooling2 (None, 1, 1, 32)          0         
__________

Finally, train the model with the generator:

In [9]:
steps_per_epoch = np.ceil(len(train) / batch_size)
model.fit_generator(train_generator, steps_per_epoch=steps_per_epoch, epochs=5, verbose=1)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.callbacks.History at 0x7fd2a6b38710>

At test time, make sure to use a different generator so you can disable the shuffling:    

In [10]:
test_generator = DataGenerator(df=test, im_size=im_size, batch_size=batch_size, shuffle=False, mode='test').generate()

test_steps_per_epoch = np.ceil(len(test) / batch_size)
predictions = model.predict_generator(test_generator, test_steps_per_epoch, verbose=1)



In [11]:
from sklearn.metrics import accuracy_score
accuracy = accuracy_score(test['label'].values, (predictions>0.5).astype(int))
print('Accuracy on the test data: {:.3f}'.format(accuracy))

Accuracy on the test data: 0.994


### Final note: the datagenerator above requires the labels for the entire dataset in a pandas dataframe. This means that the labels are taken into memory. If this is not possible, you could for example store the labels in a HDF5 file and yield them from there. 