# ipyparallel with simple MNIST classifier
 - sample from here: https://keras.io/examples/vision/mnist_convnet/

In [None]:
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers

In [None]:
# Model / data parameters
num_classes = 10
input_shape = (28, 28, 1)

# the data, split between train and test sets
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

# Scale images to the [0, 1] range
x_train = x_train.astype("float32") / 255
x_test = x_test.astype("float32") / 255
# Make sure images have shape (28, 28, 1)
x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)
print("x_train shape:", x_train.shape)
print(x_train.shape[0], "train samples")
print(x_test.shape[0], "test samples")


# convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

In [None]:
model = keras.Sequential(
    [
        keras.Input(shape=input_shape),
        layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Flatten(),
        layers.Dropout(0.5),
        layers.Dense(num_classes, activation="softmax"),
    ]
)

model.summary()

In [None]:
batch_size = 128
epochs = 15

model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])

model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, validation_split=0.1)

In [None]:
score = model.evaluate(x_test, y_test, verbose=0)
print("Test loss:", score[0])
print("Test accuracy:", score[1])

In [None]:
model.save('simple_mnist.h5')

# setup ipyparallel engines and view 

In [None]:
import ipyparallel as ipp
import time

In [None]:
! ipcluster start -n 4 --daemonize

In [None]:
client = ipp.Client()

In [None]:
view = client[:]

## setup prediction data from test data 

In [None]:
x_test.shape

In [None]:
ds_timing = []

In [None]:
pred1 = x_test[0:1000]
pred2 = x_test[1000:2000]
pred3 = x_test[2000:3000]
pred4 = x_test[3000:4000]

lbl1 = y_test[0:1000]
lbl2 = y_test[1000:2000]
lbl3 = y_test[2000:3000]
lbl4 = y_test[3000:4000]

In [None]:
ds_timing.append((pred1, lbl1))
ds_timing.append((pred2, lbl2))
ds_timing.append((pred3, lbl3))
ds_timing.append((pred4, lbl4))

## sequential prediction timing 

In [None]:
for ds in ds_timing:
    pred, lbl = ds
    start = time.time()
    predictions = model.predict(pred)
    print(f'time: {time.time() - start} seconds')

In [None]:
start = time.time()
%time predictions = model.predict(pred1)
print(f'time: {time.time() - start} seconds')

In [None]:
np.argmax(predictions, axis=1)

In [None]:
np.argmax(lbl1, axis=1)

## parallel prediction timing 

In [None]:
def p_prediction(ds):
    import time
    import tensorflow as tf
    model = tf.keras.models.load_model('simple_mnist.h5') # load pretrained model we saved earlier
    
    pred, lbl = ds
    start = time.time()
    predictions = model.predict(pred)
    prediction_time = time.time() - start
    
    return (prediction_time, predictions)

### map function above over the list of 4 datasets created earlier

In [None]:
p_results = view.map(p_prediction, ds_timing).get()

In [None]:
for res in p_results:
    p_time, pre = res
    print(p_time)

## kill cluster engines and client 

In [None]:
! ipcluster stop

client.shutdown()
client.close()