# Dist Keras running on Remote YARN environment, leveraging Watson Studio python libraries 

---

Watson Studio Hadoop Integration allows users to securely connect to a remote HDP/CDH environment. One of the features Watson Studio Hadoop Integration provides, is for the Watson Studio admin to "push" a Runtime Environment to a HDFS user-readable location. 

When users start their remote livy sessions, they have the ability to select one of pushed image runtimes, as well as define custom spark configurations for a session.


---

### 0. Prepare Livy/Spark Session Properties

In [1]:
# Show registered Hadoop Systems, and associated images 
import dsx_core_utils
DSXHI_SYSTEMS = dsx_core_utils.get_dsxhi_info(showSummary=True)

Available Hadoop systems: 

   systemName  LIVYSPARK  LIVYSPARK2                  imageId
0  cdh513mjou  livyspark                                     
1       bendy  livyspark  livyspark2                         
2    asgardia             livyspark2  dsx-scripted-ml-python2
3      becks1             livyspark2                         
4     yingcdh  livyspark  livyspark2                         
5       zinc1  livyspark  livyspark2  dsx-scripted-ml-python2
6       kabob             livyspark2                         
7       matzo             livyspark2                         


In [2]:
# Additional spark configs can be provided, as per https://github.com/apache/incubator-livy/blob/master/docs/rest-api.md
myConfig={"queue": "default",
         "driverMemory":"3G",
         "numExecutors":3
         }

# Setup necessary session properties
dsx_core_utils.setup_livy_sparkmagic(
    system="asgardia", 
    livy="livyspark2", 
    imageId="dsx-scripted-ml-python2",
    addlConfig=myConfig)

sparkmagic has been configured to use https://asgardian-edge.fyre.ibm.com:8443/gateway/kanchdsx-310-master-1/livy2/v1 with image Jupyter with Python 2.7, Scala 2.11, R 3.4.3
success configuring sparkmagic livy.


In [3]:
%reload_ext sparkmagic.magics

In [4]:
# Start a remote livy session, livy endpoint is displayed in the last cell
session_name = 'DistKeras_Sample'
livy_endpoint = "https://asgardian-edge.fyre.ibm.com:8443/gateway/kanchdsx-310-master-1/livy2/v1"
%spark add -s $session_name -l python -k -u $livy_endpoint

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
98,application_1533834262477_0022,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [5]:
# Show spark config used for this session
%spark info

Info for running Spark:
    Sessions:
        Name: DistKeras_Sample	Session id: 98	YARN id: application_1533834262477_0022	Kind: pyspark	State: idle
	Spark UI: http://shad2.fyre.ibm.com:8088/proxy/application_1533834262477_0022/
	Driver Log: http://shad4.fyre.ibm.com:8042/node/containerlogs/container_e11_1533834262477_0022_01_000001/user1
    Session configs:
        {'queue': 'default', 'numExecutors': 3, 'archives': ['/user/dsxhi/environments/7d47bdd5b4037a18ccfef3afd7e7399ed1859fae8a3c92588783aea56c341095/dsx-scripted-ml-python2.tar.gz'], 'conf': {'spark.yarn.appMasterEnv.PYSPARK_PYTHON': 'dsx-scripted-ml-python2.tar.gz/anaconda2/bin/python2.7', 'spark.yarn.appMasterEnv.PYTHONPATH': 'dsx-scripted-ml-python2.tar.gz/usr/local/spark-2.0.2-bin-hadoop2.7/python:dsx-scripted-ml-python2.tar.gz/user-home/.scripts/common-helpers/batch/pmml:dsx-scripted-ml-python2.tar.gz/user-home/.scripts/common-helpers/saas:dsx-scripted-ml-python2.tar.gz/user-home/_global_/python-2.7:'}, 'proxyUser': u'use

## 1. Run sample job to verify Watson Studio Virtual Environment Loaded Properly
Load Watson Studio packages from the specified image on remote YARN Node Managers, 
`sklearn`, `numpy`, `pandas`, `keras` 
Which are not included in default python environments.

Each cell which contains `%%spark` as the first line will run against the remote spark session. 

In [6]:
%%spark 
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

import keras
from keras.models import Sequential
from keras.layers import Dense, Activation

x = (5*np.random.rand(6**5))
y = (5*np.random.rand(6**5))
df = pd.DataFrame({'X':x,'Y':y})

def determine_z(row):
    value = float(np.sin(row['X']) + np.sin(row['Y']))
    if value < -1.0:
        return 0
    if (value >= -1.0) & (value < 0.0):
        return 1
    if (value >= 0.0) & (value < 1.0):
        return 2
    if value >= 1.0:
        return 3

df['Z'] = df.apply(determine_z, axis=1)
import socket
print("Test Job Ran on YARN NodeManager: \n " + socket.gethostname() )
print(df.head(4))
X_train, X_test, y_train, y_test = train_test_split(df[['X','Y']],df['Z'])

Test Job Ran on YARN NodeManager: 
 shad4.fyre.ibm.com
          X         Y  Z
0  3.658685  1.811328  2
1  0.302049  1.468605  3
2  4.339292  4.807158  0
3  0.931340  1.003244  3
  from ._conv import register_converters as _register_converters
Using TensorFlow backend.

---
## 2. Work with data on HDFS

#### 2.1 Sending Data to HDFS

Notice: These 2 cells are run **without** the %%spark, such that they run locally within Watson Studio.
DataSet path (Running as user1):
```
hdfs:///user/user1/datasets/atlas_higgs.csv
```

In [7]:
# Show registered WebHDFS Secure URLS which logged in user has access to:
import dsx_core_utils
dsx_core_utils.list_dsxhi_webhdfs_endpoints();

['https://asgardian-edge.fyre.ibm.com:8443/gateway/kanchdsx-310-master-1/webhdfs/v1', 'https://becks1.fyre.ibm.com:8443/gateway/kanchdsx-310-master-1/webhdfs/v1', 'https://bendy1.fyre.ibm.com:8443/gateway/kanchdsx-310-master-1/webhdfs/v1', 'https://cdh513edge11.fyre.ibm.com:8443/gateway/kanchdsx-310-master-1/webhdfs/v1', 'https://kabob3.fyre.ibm.com:8443/gateway/kanchdsx-310-master-1/webhdfs/v1', 'https://matzo1.fyre.ibm.com:8443/gateway/kanchdsx-310-master-1/webhdfs/v1', 'https://yccdh5.fyre.ibm.com:8443/gateway/kanchdsx-310-master-1/webhdfs/v1', 'https://zinc1.fyre.ibm.com:8443/gateway/kanchdsx-310-master-1/webhdfs/v1']


Here, we see that that Watson Studio has 8 different HDFS Clusters registered to it. 
Choose the corresponding endpoint to which to send data to. 

In [8]:
dsxlocal_file_location="../datasets/atlas_higgs.csv"
dsxhi_upload_hdfs_location="/user/user1/datasets/atlas_higgs.csv"
webhdfs_endpoint="https://asgardian-edge.fyre.ibm.com:8443/gateway/kanchdsx-310-master-1/webhdfs/v1"

dsx_core_utils.upload_hdfs_file(webhdfs_endpoint, dsxlocal_file_location, dsxhi_upload_hdfs_location)

upload success


#### 2.2 Load csv into remote df

In [9]:
%%spark
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
raw_dataset = sqlContext.read.format('com.databricks.spark.csv').options(
header='true', inferschema='true').load("hdfs:///user/user1/datasets/atlas_higgs.csv")

---
## 3. Run dist-keras on Spark 

#### 3.1 dist-keras imports
One of the Distributed Optimization Algorithms in use for **Distributed Keras on Spark** is **dist-keras**

Demo inspired from: https://github.com/cerndb/dist-keras 

In [10]:
%%spark
import numpy as np
import time, requests

from distkeras.trainers import *
from distkeras.predictors import *
from distkeras.transformers import *
from distkeras.evaluators import *
from distkeras.utils import *

from keras.optimizers import *
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation

from pyspark import SparkContext
from pyspark import SparkConf

from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics

import keras
import h5py

#### 3.2 Remote Dataset preprocessing and normalization
We now preprocess the data from hdfs, concatenating all the features into a single Vector column. More information on Spark MLlib feature transforms can be found here: http://spark.apache.org/docs/latest/ml-guide.html.


In [11]:
%%spark
raw_dataset.printSchema()

root
 |-- EventId: integer (nullable = true)
 |-- DER_mass_MMC: double (nullable = true)
 |-- DER_mass_transverse_met_lep: double (nullable = true)
 |-- DER_mass_vis: double (nullable = true)
 |-- DER_pt_h: double (nullable = true)
 |-- DER_deltaeta_jet_jet: double (nullable = true)
 |-- DER_mass_jet_jet: double (nullable = true)
 |-- DER_prodeta_jet_jet: double (nullable = true)
 |-- DER_deltar_tau_lep: double (nullable = true)
 |-- DER_pt_tot: double (nullable = true)
 |-- DER_sum_pt: double (nullable = true)
 |-- DER_pt_ratio_lep_tau: double (nullable = true)
 |-- DER_met_phi_centrality: double (nullable = true)
 |-- DER_lep_eta_centrality: double (nullable = true)
 |-- PRI_tau_pt: double (nullable = true)
 |-- PRI_tau_eta: double (nullable = true)
 |-- PRI_tau_phi: double (nullable = true)
 |-- PRI_lep_pt: double (nullable = true)
 |-- PRI_lep_eta: double (nullable = true)
 |-- PRI_lep_phi: double (nullable = true)
 |-- PRI_met: double (nullable = true)
 |-- PRI_met_phi: double (nu

In [12]:
%%spark
# First, we would like to extract the desired features from the raw dataset.
# We do this by constructing a list with all desired columns.
features = raw_dataset.columns
features.remove('EventId')
features.remove('Weight')
features.remove('Label')
# Next, we use Spark's VectorAssembler to "assemble" (create) a vector of all desired features.
# http://spark.apache.org/docs/latest/ml-features.html#vectorassembler
vector_assembler = VectorAssembler(inputCols=features, outputCol="features")
# This transformer will take all columns specified in features, and create an additional column "features" which will contain all the desired features aggregated into a single vector.
dataset = vector_assembler.transform(raw_dataset)

# Show what happened after applying the vector assembler.
# Note: "features" column got appended to the end.
dataset.select("features").take(1)

[Row(features=DenseVector([138.47, 51.655, 97.827, 27.98, 0.91, 124.711, 2.666, 3.064, 41.928, 197.76, 1.582, 1.396, 0.2, 32.638, 1.017, 0.381, 51.626, 2.273, -2.414, 16.824, -0.277, 258.733, 2.0, 67.435, 2.15, 0.444, 46.062, 1.24, -2.475, 113.497]))]

In [13]:
%%spark
standard_scaler = StandardScaler(inputCol="features", outputCol="features_normalized", withStd=True, withMean=True)
standard_scaler_model = standard_scaler.fit(dataset)
#mdataset = standard_scaler_model.transform(dataset)
# If we look at the dataset, the Label column consists of 2 entries, i.e., b (background), and s (signal).
# Our neural network will not be able to handle these characters, so instead, we convert it to an index so we can indicate that output neuron with index 0 is background, and 1 is signal.
# http://spark.apache.org/docs/latest/ml-features.html#stringindexer
label_indexer = StringIndexer(inputCol="Label", outputCol="label_index").fit(dataset)
dataset = label_indexer.transform(dataset)

# Show the result of the label transformation.
dataset.select("Label", "label_index").take(5)


[Row(Label=u's', label_index=1.0), Row(Label=u'b', label_index=0.0), Row(Label=u'b', label_index=0.0), Row(Label=u'b', label_index=0.0), Row(Label=u'b', label_index=0.0)]

In [14]:
%%spark
nb_classes = 2 # Number of output classes (signal and background)
nb_features = len(features)
# Shuffle the dataset.
dataset = shuffle(dataset)
# Note: dist-keras also supports shuffling data in its Trainer implementation by default.
# However, since this would require a shuffle for every model we train on the dataset, we perform the shuffle in advance here.

# Create a training set and a testset.
(training_set, test_set) = dataset.randomSplit([0.6, 0.4])
training_set.cache()
test_set.cache()

# Create a temp view, accessible via spark sql
test_set.createOrReplaceTempView("test_set_df")


#### 3.2 Model Construction


In [15]:
%%spark
model = Sequential()
hidden_layer_size = 25
model.add(Dense(hidden_layer_size, input_shape=(nb_features,)))
model.add(Activation('relu'))
model.add(Dropout(0.4))
model.add(Dense(hidden_layer_size))
model.add(Activation('relu'))
model.add(Dense(nb_classes))
model.add(Activation('softmax'))

model.summary()


_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_1 (Dense)              (None, 25)                775       
_________________________________________________________________
activation_1 (Activation)    (None, 25)                0         
_________________________________________________________________
dropout_1 (Dropout)          (None, 25)                0         
_________________________________________________________________
dense_2 (Dense)              (None, 25)                650       
_________________________________________________________________
activation_2 (Activation)    (None, 25)                0         
_________________________________________________________________
dense_3 (Dense)              (None, 2)                 52        
_________________________________________________________________
activation_3 (Activation)    (None, 2)                 0         
Total para

#### Set Worker optimizer and loss
In order to evaluate the gradient on the model replicas, we have to specify an optimizer and a loss function. dist-keras supports the same optimizers and loss functions as Keras, so we may simply refer to the Keras API documentation for optimizers and objective functions.

In [16]:
%%spark
optimizer = 'adagrad'
loss = 'categorical_crossentropy'

---

## 4.0 Model Training and Evaluation

- Define `evaluate_accuracy` function
- Train a model using a `Single Trainer`, `EASGD` methods


In [17]:
%%spark
# Define some functions to use in our trainers
def evaluate_accuracy(model, test_set):
    import time
    s = time.time()
    # Allocate a Distributed Keras Accuracy evaluator.
    evaluator = AccuracyEvaluator(prediction_col="prediction_index", label_col="label_index")
    # Clear the prediction column from the testset.
    test_set = test_set.select("features_normalized", "label_index", "label")
    # Apply a prediction from a trained model.
    predictor = ModelPredictor(keras_model=trained_model, features_col="features_normalized")
    test_set = predictor.predict(test_set)
    # Allocate an index transformer.
    index_transformer = LabelIndexTransformer(output_dim=nb_classes)
    # Transform the prediction vector to an indexed label.
    test_set = index_transformer.transform(test_set)
    # Fetch the score.
    score = evaluator.evaluate(test_set)
    return score

# Number of training epochs to run for each Trainer
TRAIN_EPOCHS = 1
    

### 4.1 Baseline: Single-executor machine training

In [18]:
%%spark
trainer = SingleTrainer(keras_model=model, worker_optimizer=optimizer,
                        loss=loss, features_col="features_normalized",
                        label_col="label", num_epoch=TRAIN_EPOCHS, batch_size=32)
trained_model = trainer.train(training_set)

### 4.2 Asynchronous EASGD
EASGD based methods, proposed by Zhang et al., transmit the complete parametrization instead of the gradient. These methods will then "average" the difference of the center variable and the backpropagated worker variable. This is used to compute a new master variable, which the worker nodes will utilize during the next iteration of backpropagation.

Asynchronous EASGD updates model parameters in an asynchronous fashion - whenever a worker node is done processing its mini-batch after a certain amount of iterations (referred to as the "communication window"), then the computed parameter will be communicated with the parameter server. The parameter server will then update the center variable immediately, without waiting for other workers.



In [19]:
%%spark
num_workers = sc.defaultParallelism
print(num_workers)

3

In [20]:
%%spark
trainer = AEASGD(keras_model=model, worker_optimizer=optimizer, loss=loss, num_workers=num_workers, 
                 batch_size=32, features_col="features_normalized", label_col="label", num_epoch=TRAIN_EPOCHS,
                 communication_window=32, rho=5.0, learning_rate=0.1)
trainer.set_parallelism_factor(num_workers)
trained_model = trainer.train(training_set)

Yarn Sample logs from a successful run:
```
18/05/21 20:58:32 INFO TaskSetManager: Finished task 0.0 in stage 30.0 (TID 910) in 11805 ms on shad2.fyre.ibm.com (executor 2) (1/3)
18/05/21 20:58:32 INFO TaskSetManager: Finished task 1.0 in stage 30.0 (TID 911) in 11809 ms on shad1.fyre.ibm.com (executor 3) (2/3)
18/05/21 20:58:32 INFO TaskSetManager: Finished task 2.0 in stage 30.0 (TID 912) in 11809 ms on shad4.fyre.ibm.com (executor 5) (3/3)
18/05/21 20:58:32 INFO YarnClusterScheduler: Removed TaskSet 30.0, whose tasks have all completed, from pool 
18/05/21 20:58:32 INFO DAGScheduler: ResultStage 30 (collect at build/bdist.linux-x86_64/egg/distkeras/trainers.py:633) finished in 11.812 s
18/05/21 20:58:32 INFO DAGScheduler: Job 15 finished: collect at build/bdist.linux-x86_64/egg/distkeras/trainers.py:633, took 11.834807 s
```

In [21]:
%%spark
trained_model.get_config()

[{'class_name': 'Dense', 'config': {'kernel_initializer': {'class_name': 'VarianceScaling', 'config': {'distribution': u'uniform', 'scale': 1.0, 'seed': None, 'mode': u'fan_avg'}}, 'name': u'dense_1', 'kernel_constraint': None, 'bias_regularizer': None, 'bias_constraint': None, 'dtype': u'float32', 'activation': 'linear', 'trainable': True, 'kernel_regularizer': None, 'bias_initializer': {'class_name': 'Zeros', 'config': {}}, 'units': 25, 'batch_input_shape': (None, 30), 'use_bias': True, 'activity_regularizer': None}}, {'class_name': 'Activation', 'config': {'activation': 'relu', 'trainable': True, 'name': u'activation_1'}}, {'class_name': 'Dropout', 'config': {'rate': 0.4, 'noise_shape': None, 'trainable': True, 'seed': None, 'name': u'dropout_1'}}, {'class_name': 'Dense', 'config': {'kernel_initializer': {'class_name': 'VarianceScaling', 'config': {'distribution': u'uniform', 'scale': 1.0, 'seed': None, 'mode': u'fan_avg'}}, 'name': u'dense_2', 'kernel_constraint': None, 'bias_regul

---

When you are done, run `%%spark cleanup` to remove any idle sessions

In [26]:
%spark cleanup