Hi, I saw in the code of `image_classifier` there is an attempt to parallelize image augmentation:  https://github.com/paris-saclay-cds/ramp-workflow/blob/master/rampwf/workflows/image_classifier.py#L244

I tested this (on Python 3) and the problem of pickling can be solved by a simple tweak : https://gist.github.com/vfdev-5/9ae8fe64cb6933f6c94614d335b67d3d#file-parallized_data_loading_augmentation-py
This allows to run a little bit faster. 

In [2]:
import os 
import sys
import numpy as np

In [3]:
sys.path.append("..")
# sys.path.append("../submissions/keras_cnns_pretrained")

In [4]:
from rampwf.workflows.image_classifier import BatchGeneratorBuilder
from problem import get_cv, get_train_data
# from image_preprocessor import transform, transform_test

In [5]:
import imp
image_preprocessor = imp.load_source('image_preprocessor', '../submissions/keras_cnns_pretrained/image_preprocessor.py')
transform = image_preprocessor.transform
transform_test = image_preprocessor.transform_test

In [6]:
n_classes = 403
batch_size = 16

In [7]:
folder_X_train, y_train = get_train_data(path="..")
cv = list(get_cv(folder_X_train, y_train))

for fold_i, (train_is, valid_is) in enumerate(cv):

    folder, X_train = folder_X_train

    gen_builder = BatchGeneratorBuilder(X_array=X_train[train_is], y_array=y_train[train_is], 
                                transform_img=transform, transform_test_img=transform_test, 
                                folder=folder, 
                                chunk_size=batch_size*5, 
                                n_classes=n_classes, n_jobs=1)
    break

In [8]:
import types
from imblearn.over_sampling import RandomOverSampler

from joblib import delayed
from joblib import Parallel

from rampwf.workflows.image_classifier import _chunk_iterator, _to_categorical, get_nb_minibatches

In [13]:
# def local_get_generator(self, indices=None, batch_size=256):
#     if indices is None:
#         indices = np.arange(self.nb_examples)
#     # Infinite loop, as required by keras `fit_generator`.
#     # However, as we provide the number of examples per epoch
#     # and the user specifies the total number of epochs, it will
#     # be able to end.
#     while True:
#         if self.shuffle:
#             np.random.shuffle(indices)
#         it = _chunk_iterator(
#             X_array=self.X_array[indices], folder=self.folder,
#             y_array=self.y_array[indices], chunk_size=self.chunk_size,
#             n_jobs=self.n_jobs)
#         for X, y in it:

#             # 1) Preprocessing of X and y
#             X = Parallel(n_jobs=self.n_jobs, backend='multiprocessing')(
#                      delayed(self.transform_img)(x) for x in X)
#             # X = np.array([self.transform_img(x) for x in X])
#             # X is a list of numpy arrrays at this point, convert it to a
#             # single numpy array.
#             try:
#                 X = [x[np.newaxis, :, :, :] for x in X]
#             except IndexError:
#                 # single channel
#                 X = [x[np.newaxis, np.newaxis, :, :] for x in X]
#             X = np.concatenate(X, axis=0)
#             X = np.array(X, dtype='float32')
#             # Convert y to onehot representation
#             y = _to_categorical(y, num_classes=self.n_classes)

#             # 2) Yielding mini-batches
#             for i in range(0, len(X), batch_size):
#                 yield X[i:i + batch_size], y[i:i + batch_size]



def _chunk_iterator2(parallel, X_array, folder, y_array=None, chunk_size=1024):
    from skimage.io import imread
    for i in range(0, len(X_array), chunk_size):
        X_chunk = X_array[i:i + chunk_size]
        filenames = [os.path.join(folder, '{}'.format(x)) for x in X_chunk]
        X = parallel(delayed(imread)(filename) for filename in filenames)
        if y_array is not None:
            y = y_array[i:i + chunk_size]
            yield X, y
        else:
            yield X


def local_get_generator2(self, indices=None, batch_size=256):
    if indices is None:
        indices = np.arange(self.nb_examples)
    # Infinite loop, as required by keras `fit_generator`.
    # However, as we provide the number of examples per epoch
    # and the user specifies the total number of epochs, it will
    # be able to end.

    with Parallel(n_jobs=self.n_jobs, backend='threading') as parallel:
        while True:
            if self.shuffle:
                np.random.shuffle(indices)
            it = _chunk_iterator2(parallel,
                X_array=self.X_array[indices], folder=self.folder,
                y_array=self.y_array[indices], chunk_size=self.chunk_size)

            for X, y in it:
                # 1) Preprocessing of X and y
                X = parallel(delayed(self.transform_img)(x) for x in X)
                # X = np.array([self.transform_img(x) for x in X])
                # X is a list of numpy arrrays at this point, convert it to a
                # single numpy array.
                try:
                    X = [x[np.newaxis, :, :, :] for x in X]
                except IndexError:
                    # single channel
                    X = [x[np.newaxis, np.newaxis, :, :] for x in X]
                X = np.concatenate(X, axis=0)
                X = np.array(X, dtype='float32')
                # Convert y to onehot representation
                y = _to_categorical(y, num_classes=self.n_classes)

                # 2) Yielding mini-batches
                for i in range(0, len(X), batch_size):
                    yield X[i:i + batch_size], y[i:i + batch_size]


In [15]:
gen_builder._get_generator = types.MethodType(local_get_generator2, gen_builder)
# gen_builder._get_generator = types.MethodType(local_get_generator, gen_builder)
gen_builder.n_jobs = 8
gen_builder.shuffle = True


In [16]:
batch_size = 64
gen_builder.chunk_size = batch_size * 5
gen_train, gen_valid, nb_train, nb_valid = gen_builder.get_train_valid_generators(batch_size=batch_size, valid_ratio=0.3)


In [17]:
max_counter = 50
for X, y in gen_train:
    max_counter -= 1
    if max_counter == 0:
        break
        
print(type(X), type(y))

<class 'numpy.ndarray'> <class 'numpy.ndarray'>


In [18]:
X.shape, y.shape

((64, 224, 224, 3), (64, 403))

Check with Keras Model.fit

In [28]:
from keras.models import Model
from keras.layers import Input, Dense, Flatten, Activation, BatchNormalization
from keras.applications.vgg16 import VGG16
from keras.optimizers import Adam

Using TensorFlow backend.


In [29]:
def get_model():
    vgg16 = VGG16(include_top=False, weights='imagenet')
    for l in vgg16.layers:
        l.trainable = False
    inp = Input((224, 224, 3))
    x = vgg16(inp)
    x = Flatten(name='flatten')(x)
    x = Dense(4096, activation='linear', name='fc1')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Dense(4096, activation='linear', name='fc2')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    out = Dense(403, activation='softmax', name='predictions')(x)
    model = Model(inp, out)
    model.compile(
        loss='categorical_crossentropy', optimizer=Adam(lr=0.001),
        metrics=['accuracy'])
    return model

In [30]:
get_model().fit_generator(
    gen_train,
    steps_per_epoch=get_nb_minibatches(nb_train, batch_size),
    epochs=1,
    max_queue_size=batch_size,    
    validation_data=gen_valid,
    validation_steps=get_nb_minibatches(nb_valid, batch_size),
    verbose=1)

Epoch 1/1
 24/511 [>.............................] - ETA: 219s - loss: 7.0643 - acc: 0.0970

KeyboardInterrupt: 

In [35]:
help(Model.fit_generator)

Help on function fit_generator in module keras.engine.training:

fit_generator(self, generator, steps_per_epoch, epochs=1, verbose=1, callbacks=None, validation_data=None, validation_steps=None, class_weight=None, max_queue_size=10, workers=1, use_multiprocessing=False, shuffle=True, initial_epoch=0)
    Fits the model on data yielded batch-by-batch by a Python generator.
    
    The generator is run in parallel to the model, for efficiency.
    For instance, this allows you to do real-time data augmentation
    on images on CPU in parallel to training your model on GPU.
    
    The use of `keras.utils.Sequence` guarantees the ordering
    and guarantees the single use of every input per epoch when
    using `use_multiprocessing=True`.
    
    # Arguments
        generator: A generator or an instance of Sequence (keras.utils.Sequence)
                object in order to avoid duplicate data
                when using multiprocessing.
            The output of the generator must be ei

In [34]:
Model.fit??