# Parallel prediction of trained keras models with dask

Pertinent links: * https://github.com/dask/dask-examples/issues/35, https://github.com/dask/distributed/issues/2333

In [1]:
#Load modules
from __future__ import print_function
import keras
import sys
from keras.datasets import mnist
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras import backend as K
import platform
import numpy as np
import glob
import dask
import distributed
import dask.array as da
import time

print("dask version is {}".format(dask.__version__))
print("distributed version is {}".format(distributed.__version__))
print("keras version is {}".format(keras.__version__))
print(sys.version)

#create client
client=distributed.Client()
client

Using TensorFlow backend.
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


dask version is 2.12.0
distributed version is 2.12.0
keras version is 2.3.1
3.7.6 | packaged by conda-forge | (default, Jan  7 2020, 22:05:27) 
[Clang 9.0.1 ]


Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


0,1
Client  Scheduler: tcp://127.0.0.1:64670  Dashboard: http://127.0.0.1:64671/status,Cluster  Workers: 4  Cores: 4  Memory: 17.18 GB


In [2]:
# Define a trained and saved model
def train_model():
    batch_size = 128
    num_classes = 10
    epochs = 1
    
    # input image dimensions
    img_rows, img_cols = 28, 28
    
    # the data, split between train and test sets
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    
    if K.image_data_format() == 'channels_first':
        x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
        x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
        input_shape = (1, img_rows, img_cols)
    else:
        x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
        x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
        input_shape = (img_rows, img_cols, 1)
    
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255
    x_test /= 255
    
    # convert class vectors to binary class matrices
    y_train = keras.utils.to_categorical(y_train, num_classes)
    y_test = keras.utils.to_categorical(y_test, num_classes)
    
    model = Sequential()
    model.add(Conv2D(32, kernel_size=(3, 3),
                     activation='relu',
                     input_shape=input_shape))
    model.add(Conv2D(64, (3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Flatten())
    model.add(Dense(128, activation='relu'))
    model.add(Dense(num_classes, activation='softmax'))
    
    model.compile(loss=keras.losses.categorical_crossentropy,
                  optimizer=keras.optimizers.Adadelta(),
                  metrics=['accuracy'])
    
    model.fit(x_train, y_train,
              batch_size=batch_size,
              epochs=epochs,
              verbose=1,
              validation_data=(x_test, y_test))
    score = model.evaluate(x_test, y_test, verbose=0)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])
    
    return model

In [3]:
def load_data():
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
    return x_test

In [4]:
#Train model - takes 1m on laptop
model = train_model()
model.save("MNIST.h5")



Train on 60000 samples, validate on 10000 samples
Epoch 1/1
Test loss: 0.06445419637872837
Test accuracy: 0.9782000184059143


For dask to be useful, predict function needs to be pretty slow/.

# Example 0: Without dask

In [5]:
%%timeit
model = keras.models.load_model("MNIST.h5")
x_test = load_data()

#Compute prediction in batch loop of size 100 (slightly contrived example)
batch_array = np.split(x_test,100)

results = []
for batch in batch_array:
    prediction = model.predict_on_batch(batch)
    time.sleep(0.25)
    results.append(prediction)
results[0].shape

29.6 s ± 266 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


# Example 1 LocalCluster - load data and model first (serializes model and data)

In [6]:
model = keras.models.load_model("MNIST.h5")
x_test = load_data()
results = []
for batch in batch_array:
    prediction = dask.delayed(model.predict_on_batch)(batch)
    results.append(prediction)
#Gather
results = dask.compute(*results)

NameError: name 'batch_array' is not defined

# Example 2 LocalCluster - load data and model delayed

In [10]:
%%timeit
#Example 2 LocalCluster - load data and model on each worker
model = dask.delayed(keras.models.load_model)("MNIST.h5")
x_test = load_data()
batch_array = np.split(x_test,100)

results = []
#get shape from reading file directly, see example 0.
for batch in batch_array:
    prediction = dask.delayed(model.predict_on_batch)(batch)
    time.sleep(0.25)
    results.append(prediction)

#Gather
results = dask.compute(*results)
results[0].shape

33 s ± 1.1 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
