In [1]:
import numpy as np

import time

import requests

from keras.optimizers import *
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation

from pyspark import SparkContext
from pyspark import SparkConf

from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics

from distkeras.trainers import *
from distkeras.predictors import *
from distkeras.transformers import *
from distkeras.evaluators import *
from distkeras.utils import *

Using TensorFlow backend.


In [2]:
# Modify these variables according to your needs.
application_name = "Distributed Keras Notebook"
using_spark_2 = False
local = False
if local:
    # Tell master to use local resources.
    master = "local[*]"
    num_cores = 3
    num_executors = 1
else:
    # Tell master to use YARN.
    master = "yarn-client"
    num_executors = 6
    num_cores = 2

In [3]:


# This variable is derived from the number of cores and executors, and will be used to assign the number of model trainers.
num_workers = num_executors * num_cores

print("Number of desired executors: " + `num_executors`)
print("Number of desired cores / executor: " + `num_cores`)
print("Total number of workers: " + `num_workers`)



Number of desired executors: 6
Number of desired cores / executor: 2
Total number of workers: 12


In [4]:
import os

# Use the DataBricks CSV reader, this has some nice functionality regarding invalid values.
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell'

In [5]:


conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", `num_cores`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

# Check if the user is running Spark 2.0 +
if using_spark_2:
    sc = SparkSession.builder.config(conf=conf) \
            .appName(application_name) \
            .getOrCreate()
else:
    # Create the Spark context.
    sc = SparkContext(conf=conf)
    # Add the missing imports
    from pyspark import SQLContext
    sqlContext = SQLContext(sc)



In [6]:
# Check if we are using Spark 2.0
if using_spark_2:
    reader = sc
else:
    reader = sqlContext
# Read the dataset.
raw_dataset = reader.read.format('com.databricks.spark.csv') \
                    .options(header='true', inferSchema='true').load("data/atlas_higgs.csv")

In [7]:


# Double-check the inferred schema, and get fetch a row to show how the dataset looks like.
raw_dataset.printSchema()



root
 |-- EventId: integer (nullable = true)
 |-- DER_mass_MMC: double (nullable = true)
 |-- DER_mass_transverse_met_lep: double (nullable = true)
 |-- DER_mass_vis: double (nullable = true)
 |-- DER_pt_h: double (nullable = true)
 |-- DER_deltaeta_jet_jet: double (nullable = true)
 |-- DER_mass_jet_jet: double (nullable = true)
 |-- DER_prodeta_jet_jet: double (nullable = true)
 |-- DER_deltar_tau_lep: double (nullable = true)
 |-- DER_pt_tot: double (nullable = true)
 |-- DER_sum_pt: double (nullable = true)
 |-- DER_pt_ratio_lep_tau: double (nullable = true)
 |-- DER_met_phi_centrality: double (nullable = true)
 |-- DER_lep_eta_centrality: double (nullable = true)
 |-- PRI_tau_pt: double (nullable = true)
 |-- PRI_tau_eta: double (nullable = true)
 |-- PRI_tau_phi: double (nullable = true)
 |-- PRI_lep_pt: double (nullable = true)
 |-- PRI_lep_eta: double (nullable = true)
 |-- PRI_lep_phi: double (nullable = true)
 |-- PRI_met: double (nullable = true)
 |-- PRI_met_phi: double (nu

In [8]:
# First, we would like to extract the desired features from the raw dataset.
# We do this by constructing a list with all desired columns.
features = raw_dataset.columns
features.remove('EventId')
features.remove('Weight')
features.remove('Label')
# Next, we use Spark's VectorAssembler to "assemble" (create) a vector of all desired features.
# http://spark.apache.org/docs/latest/ml-features.html#vectorassembler
vector_assembler = VectorAssembler(inputCols=features, outputCol="features")
# This transformer will take all columns specified in features, and create an additional column "features" which will contain all the desired features aggregated into a single vector.
dataset = vector_assembler.transform(raw_dataset)

# Show what happened after applying the vector assembler.
# Note: "features" column got appended to the end.
dataset.select("features").take(1)


[Row(features=DenseVector([138.47, 51.655, 97.827, 27.98, 0.91, 124.711, 2.666, 3.064, 41.928, 197.76, 1.582, 1.396, 0.2, 32.638, 1.017, 0.381, 51.626, 2.273, -2.414, 16.824, -0.277, 258.733, 2.0, 67.435, 2.15, 0.444, 46.062, 1.24, -2.475, 113.497]))]

In [9]:
# Apply feature normalization with standard scaling. This will transform a feature to have mean 0, and std 1.
# http://spark.apache.org/docs/latest/ml-features.html#standardscaler
standard_scaler = StandardScaler(inputCol="features", outputCol="features_normalized", withStd=True, withMean=True)
standard_scaler_model = standard_scaler.fit(dataset)
dataset = standard_scaler_model.transform(dataset)

In [10]:
# If we look at the dataset, the Label column consists of 2 entries, i.e., b (background), and s (signal).
# Our neural network will not be able to handle these characters, so instead, we convert it to an index so we can indicate that output neuron with index 0 is background, and 1 is signal.
# http://spark.apache.org/docs/latest/ml-features.html#stringindexer
label_indexer = StringIndexer(inputCol="Label", outputCol="label_index").fit(dataset)
dataset = label_indexer.transform(dataset)

# Show the result of the label transformation.
dataset.select("Label", "label_index").take(5)

[Row(Label=u's', label_index=1.0),
 Row(Label=u'b', label_index=0.0),
 Row(Label=u'b', label_index=0.0),
 Row(Label=u'b', label_index=0.0),
 Row(Label=u'b', label_index=0.0)]

In [11]:
# Define some properties of the neural network for later use.
nb_classes = 2 # Number of output classes (signal and background)
nb_features = len(features)

In [15]:
# We observe that Keras is not able to work with these indexes.
# What it actually expects is a vector with an identical size to the output layer.
# Our framework provides functionality to do this with ease.
# What it basically does, given an expected vector dimension, 
# it prepares zero vector with the specified dimensionality, and will set the neuron
# with a specific label index to one. (One-Hot encoding)

# For example:
# 1. Assume we have a label index: 3
# 2. Output dimensionality: 5
# With these parameters, we obtain the following vector in the DataFrame column: [0,0,0,1,0]

transformer = OneHotTransformer(output_dim=nb_classes, input_col="label_index", output_col="label_output")
dataset = transformer.transform(dataset)
# Only select the columns we need (less data shuffling) while training.
dataset = dataset.select("features_normalized", "label_index", "label_output")

# Show the expected output vectors of the neural network.
dataset.select("label_index", "label_output").take(1)

[Row(label_index=1.0, label_output=[0.0, 1.0])]

In [16]:
# Shuffle the dataset.
dataset = shuffle(dataset)

# Note: we also support shuffling in the trainers by default.
# However, since this would require a shuffle for every training we will only do it once here.
# If you want, you can enable the training shuffling by specifying shuffle=True in the train() function.

In [17]:
# Finally, we create a trainingset and a testset.
(training_set, test_set) = dataset.randomSplit([0.6, 0.4])
training_set.cache()
test_set.cache()



DataFrame[features_normalized: vector, label_index: double, label_output: array<double>]

In [18]:
model = Sequential()
model.add(Dense(500, input_shape=(nb_features,)))
model.add(Activation('relu'))
model.add(Dropout(0.4))
model.add(Dense(500))
model.add(Activation('relu'))
model.add(Dense(nb_classes))
model.add(Activation('softmax'))

model.summary()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_1 (Dense)              (None, 500)               15500     
_________________________________________________________________
activation_1 (Activation)    (None, 500)               0         
_________________________________________________________________
dropout_1 (Dropout)          (None, 500)               0         
_________________________________________________________________
dense_2 (Dense)              (None, 500)               250500    
_________________________________________________________________
activation_2 (Activation)    (None, 500)               0         
_________________________________________________________________
dense_3 (Dense)              (None, 2)                 1002      
_________________________________________________________________
activation_3 (Activation)    (None, 2)                 0         
Total para

In [19]:
optimizer = 'adagrad'
loss = 'categorical_crossentropy'


In [28]:
def evaluate_accuracy(model):
    global test_set
    
    # Allocate a Distributed Keras Accuracy evaluator.
    evaluator = AccuracyEvaluator(prediction_col="prediction_index", label_col="label_index")
    # Clear the prediction column from the testset.
    test_set = test_set.select("features_normalized", "label_index", "label_output")
    # Apply a prediction from a trained model.
    predictor = ModelPredictor(keras_model=trained_model, features_col="features_normalized")
    test_set = predictor.predict(test_set)
    # Allocate an index transformer.
    index_transformer = LabelIndexTransformer(output_dim=nb_classes)
    # Transform the prediction vector to an indexed label.
    test_set = index_transformer.transform(test_set)
    # Fetch the score.
    score = evaluator.evaluate(test_set)
    
    return score

In [29]:


def add_result(trainer, accuracy, dt):
    global results;
    
    # Store the metrics.
    results[trainer] = {}
    results[trainer]['accuracy'] = accuracy;
    results[trainer]['time_spent'] = dt
    # Display the metrics.
    print("Trainer: " + str(trainer))
    print(" - Accuracy: " + str(accuracy))
    print(" - Training time: " + str(dt))



In [30]:


results = {}



In [31]:
trainer = SingleTrainer(keras_model=model, worker_optimizer=optimizer,
                        loss=loss, features_col="features_normalized",
                        label_col="label", num_epoch=1, batch_size=32)
trained_model = trainer.train(training_set)

In [32]:
# Fetch the evaluation metrics.
accuracy = evaluate_accuracy(trained_model)
dt = trainer.get_training_time()
# Add the metrics to the results.
add_result('aeasgd', accuracy, dt)


Trainer: aeasgd
 - Accuracy: 0.403062805506
 - Training time: 12.4134700298


In [33]:
#Asynchronous EAMSGD

In [35]:
trainer = EAMSGD(keras_model=model, worker_optimizer=optimizer, loss=loss, num_workers=num_workers,
                 batch_size=32, features_col="features_normalized", label_col="label", num_epoch=1,
                 communication_window=32, rho=5.0, learning_rate=0.1, momentum=0.6)
trainer.set_parallelism_factor(1)
trained_model = trainer.train(training_set)

In [36]:
# Fetch the evaluation metrics.
accuracy = evaluate_accuracy(trained_model)
dt = trainer.get_training_time()
# Add the metrics to the results.
add_result('eamsgd', accuracy, dt)

Trainer: eamsgd
 - Accuracy: 0.403062805506
 - Training time: 37.2395269871


In [37]:
#Asynchronous EASGD
trainer = AEASGD(keras_model=model, worker_optimizer=optimizer, loss=loss, num_workers=num_workers, 
                 batch_size=32, features_col="features_normalized", label_col="label", num_epoch=1,
                 communication_window=32, rho=5.0, learning_rate=0.1)
trainer.set_parallelism_factor(1)
trained_model = trainer.train(training_set)

In [38]:
# Fetch the evaluation metrics.
accuracy = evaluate_accuracy(trained_model)
dt = trainer.get_training_time()
# Add the metrics to the results.
add_result('aeasgd', accuracy, dt)

Trainer: aeasgd
 - Accuracy: 0.403062805506
 - Training time: 51.4485421181
