# MNIST Analysis with Distributed Keras

**Joeri Hermans** (Technical Student, IT-DB-SAS, CERN)             
*Departement of Knowledge Engineering*         
*Maastricht University, The Netherlands*

In [1]:
!(date +%d\ %B\ %G)

11 May 2017


In this notebook we will show you how to process the [MNIST](http://yann.lecun.com/exdb/mnist/) dataset using Distributed Keras. As in the [workflow](https://github.com/JoeriHermans/dist-keras/blob/master/examples/workflow.ipynb) notebook, we will guide you through the complete machine learning pipeline.

## Preparation

To get started, we first load all the required imports. Please make sure you installed `dist-keras`, and `seaborn`. Furthermore, we assume that you have access to an installation which provides Apache Spark.

Before you start this notebook, place make sure you ran the "MNIST preprocessing" notebook first, since we will be evaluating a manually "enlarged dataset".

In [2]:
%matplotlib inline

import numpy as np

from keras.optimizers import *
from keras.models import Sequential
from keras.layers.core import *
from keras.layers.convolutional import *

from pyspark import SparkContext
from pyspark import SparkConf

from matplotlib import pyplot as plt

from pyspark import StorageLevel

from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from distkeras.trainers import *
from distkeras.predictors import *
from distkeras.transformers import *
from distkeras.evaluators import *
from distkeras.utils import *

Using TensorFlow backend.


In the following cell, adapt the parameters to fit your personal requirements.

In [3]:
# Modify these variables according to your needs.
application_name = "Distributed Keras MNIST Analysis"
using_spark_2 = False
local = False
path = "mnist.parquet"
if local:
    # Tell master to use local resources.
    master = "local[*]"
    num_processes = 3
    num_executors = 1
else:
    # Tell master to use YARN.
    master = "yarn-client"
    num_executors = 30
    num_processes = 1

In [4]:
# This variable is derived from the number of cores and executors, and will be used to assign the number of model trainers.
num_workers = num_executors * num_processes

print("Number of desired executors: " + `num_executors`)
print("Number of desired processes / executor: " + `num_processes`)
print("Total number of workers: " + `num_workers`)

Number of desired executors: 30
Number of desired processes / executor: 1
Total number of workers: 30


In [5]:
conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", `num_processes`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.locality.wait", "0")
conf.set("spark.executor.memory", "5g")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

# Check if the user is running Spark 2.0 +
if using_spark_2:
    sc = SparkSession.builder.config(conf=conf) \
            .appName(application_name) \
            .getOrCreate()
else:
    # Create the Spark context.
    #sc = SparkContext(conf=conf)
    sc.conf = conf
    # Add the missing imports
    from pyspark import SQLContext
    sqlContext = SQLContext(sc)

In [7]:
# Check if we are using Spark 2.0
if using_spark_2:
    reader = sc
else:
    reader = sqlContext
# Read the training and test set.
training_set = reader.read.parquet('data/mnist_train.parquet') \
                     .select("features_normalized_dense", "label_encoded", "label")
test_set = reader.read.parquet('data/mnist_test.parquet') \
                 .select("features_normalized_dense", "label_encoded", "label")

In [8]:
# Print the schema of the dataset.
training_set.printSchema()

root
 |-- features_normalized_dense: vector (nullable = true)
 |-- label_encoded: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- label: long (nullable = true)



In [17]:
test_set.printSchema()

root
 |-- features_normalized_dense: vector (nullable = true)
 |-- label_encoded: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- label: long (nullable = true)



## Model Development

### Multilayer Perceptron

In [9]:
mlp = Sequential()
mlp.add(Dense(1000, input_shape=(784,)))
mlp.add(Activation('relu'))
mlp.add(Dropout(0.2))
mlp.add(Dense(200))
mlp.add(Activation('relu'))
mlp.add(Dropout(0.2))
mlp.add(Dense(10))
mlp.add(Activation('softmax'))

In [10]:
mlp.summary()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_1 (Dense)              (None, 1000)              785000    
_________________________________________________________________
activation_1 (Activation)    (None, 1000)              0         
_________________________________________________________________
dropout_1 (Dropout)          (None, 1000)              0         
_________________________________________________________________
dense_2 (Dense)              (None, 200)               200200    
_________________________________________________________________
activation_2 (Activation)    (None, 200)               0         
_________________________________________________________________
dropout_2 (Dropout)          (None, 200)               0         
_________________________________________________________________
dense_3 (Dense)              (None, 10)                2010      
__________

In [11]:
optimizer_mlp = 'adam'
loss_mlp = 'categorical_crossentropy'

## Training

Prepare the training and test set for evaluation and training.

In [12]:
training_set = training_set.repartition(num_workers)
test_set = test_set.repartition(num_workers)
training_set.cache()
test_set.cache()
print("Number of training instances: " + str(training_set.count()))
print("Number of testing instances: " + str(test_set.count()))

Number of training instances: 32318
Number of testing instances: 1297


## Evaluation

We define a utility function which will compute the accuracy for us.

In [13]:
def evaluate_accuracy(model, test_set, features="features_normalized_dense"):
    evaluator = AccuracyEvaluator(prediction_col="prediction_index", label_col="label")
    predictor = ModelPredictor(keras_model=model, features_col=features)
    transformer = LabelIndexTransformer(output_dim=10)
    test_set = test_set.select(features, "label")
    test_set = predictor.predict(test_set)
    test_set = transformer.transform(test_set)
    score = evaluator.evaluate(test_set)
    
    return score

### ADAG

In [14]:
trainer = ADAG(keras_model=mlp, worker_optimizer=optimizer_mlp, loss=loss_mlp, num_workers=num_workers,
               batch_size=4, communication_window=5, num_epoch=1,
               features_col="features_normalized_dense", label_col="label_encoded")
# Modify the default parallelism factor.
trained_model = trainer.train(training_set)

In [15]:
# View the weights of the trained model.
trained_model.get_weights()

[array([[ 0.02920564, -0.03437576,  0.00364145, ..., -0.00925472,
          0.02917045, -0.03222298],
        [-0.00770801, -0.00394229,  0.03034352, ...,  0.01717715,
          0.04464832, -0.03329791],
        [-0.03604931, -0.00323586, -0.01509048, ...,  0.02181683,
         -0.04173782, -0.00941854],
        ..., 
        [ 0.01147338, -0.00222839, -0.02019871, ..., -0.03482229,
         -0.01141656, -0.03909973],
        [ 0.00176425, -0.05549373,  0.04584375, ..., -0.01143986,
          0.05461692,  0.0124169 ],
        [ 0.01432303, -0.00769158,  0.00068999, ..., -0.01378234,
          0.05413814, -0.05738372]], dtype=float32),
 array([ -1.93773881e-02,  -4.69155796e-03,  -5.51657192e-03,
          5.54442231e-04,  -1.96620338e-02,   7.05803512e-03,
          4.97778179e-03,  -3.25760548e-03,  -3.94685334e-03,
          2.28617340e-04,  -2.82891989e-02,  -4.60033165e-03,
         -2.47907303e-02,  -9.06619825e-04,  -9.47530847e-03,
         -9.09528509e-03,   5.81687957e-04,  -4

In [18]:
test_set.count()

1297

In [19]:
%pdb

Automatic pdb calling has been turned ON


In [20]:
print("Training time: " + str(trainer.get_training_time()))
print("Accuracy: " + str(evaluate_accuracy(trained_model, test_set)))

Training time: 168.668818951
Accuracy: 0.930609097918
