# TensorFlowOnSpark with Spark ML Pipelines

[Spark ML Pipelines](https://spark.apache.org/docs/latest/ml-pipeline.html) provide high-level APIs (inspired by [scikit-learn](http://scikit-learn.org)) for Spark-based machine learning algorithms.

This notebook demonstrates support for these APIs within TensorFlowOnSpark via the introduction of a new [pipeline](https://github.com/yahoo/TensorFlowOnSpark/blob/master/tensorflowonspark/pipeline.py) module consisting of two main classes: 

1. [TFEstimator](https://yahoo.github.io/TensorFlowOnSpark/tensorflowonspark.pipeline.html#tensorflowonspark.pipeline.TFEstimator) - A Spark ML Estimator which wraps a distributed TensorFlowOnSpark cluster for training.
2. [TFModel](https://yahoo.github.io/TensorFlowOnSpark/tensorflowonspark.pipeline.html#tensorflowonspark.pipeline.TFModel) - A Spark ML Model which represents a TensorFlow model checkpoint or [saved_model](https://www.tensorflow.org/programmers_guide/saved_model#apis_to_build_and_load_a_savedmodel) on disk.  **Note**: due to architectural limitations, transform/inferencing is conducted on the executors as parallel instances of a single-node TensorFlow application (vs. a distributed TensorFlow cluster), so the model must fit in the memory of a single executor.

In addition, there is a new [dfutil](https://yahoo.github.io/TensorFlowOnSpark/tensorflowonspark.dfutil.html) module which provides helper functions to convert from TensorFlow TFRecords to Spark DataFrames and vice versa.


### Start a Spark Standalone Cluster

First, in a terminal/shell window, start a single-machine Spark Standalone Cluster with three workers:
```
export MASTER=spark://$(hostname):7077
export SPARK_WORKER_INSTANCES=3
export CORES_PER_WORKER=1
export TOTAL_CORES=$((${CORES_PER_WORKER}*${SPARK_WORKER_INSTANCES})) 
${SPARK_HOME}/sbin/start-master.sh; ${SPARK_HOME}/sbin/start-slave.sh -c $CORES_PER_WORKER -m 3G ${MASTER}
```

### Launch the Spark Jupyter Notebook

Now, in the same window, launch a Pyspark Jupyter notebook:
```
cd ${TFoS_HOME}/examples/mnist
PYSPARK_DRIVER_PYTHON="jupyter" \
PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip=`hostname`" \
pyspark  --master ${MASTER} \
--conf spark.cores.max=${TOTAL_CORES} \
--conf spark.task.cpus=${CORES_PER_WORKER} \
--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist_pipeline.py \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME"
```

This should open a Jupyter browser pointing to the directory where this notebook is hosted.
Click on the TFOS_pipeline.ipynb file, and begin executing the steps of the notebook.

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

import argparse
import os
import subprocess
import sys
from datetime import datetime

import tensorflow as tf
from tensorflowonspark import dfutil
from tensorflowonspark.pipeline import TFEstimator, TFModel

import mnist_dist_pipeline

In [None]:
# configure to match your cluster
num_executors = 3

**Note**: for a Spark Standalone cluster on a single machine, the executors will operate from different working directories, so relative paths won't work across the cluster.  This code just maps relative paths to the absolute path of this notebook's current working directory.

In [None]:
cwd = os.getcwd()
model_dir = os.sep.join([cwd, "mnist_model"])       # path to TensorFlow model/checkpoint
export_dir = os.sep.join([cwd, "mnist_export"])     # path to TensorFlow saved_model export
output = os.sep.join([cwd, "predictions"])          # path to output of inferencing

print(model_dir)
print(export_dir)
print(output)

In [None]:
# Clean up any prior artifacts
subprocess.call(["rm", "-rf", model_dir])
subprocess.call(["rm", "-rf", export_dir])
subprocess.call(["rm", "-rf", output])


In [None]:
# Set up parser for command-line options
parser = argparse.ArgumentParser()

## TFoS/cluster
parser.add_argument("--batch_size", help="number of records per batch", type=int, default=100)
parser.add_argument("--epochs", help="number of epochs", type=int, default=1)
parser.add_argument("--model_dir", help="HDFS path to save/load model during train/inference", type=str)
parser.add_argument("--export_dir", help="HDFS path to export saved_model", type=str)
parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("--num_ps", help="number of PS nodes in cluster", type=int, default=1)
parser.add_argument("--protocol", help="Tensorflow network protocol (grpc|rdma)", default="grpc")
parser.add_argument("--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")

# Spark input/output
parser.add_argument("--format", help="example format: (csv|tfr)", choices=["csv","tfr"], default="csv")
parser.add_argument("--images", help="HDFS path to MNIST images in parallelized format")
parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized format")
parser.add_argument("--output", help="HDFS path to save test/inference output", default="predictions")

# Execution Modes
parser.add_argument("--train", help="train a model using Estimator", action="store_true")
parser.add_argument("--inference_mode", help="type of inferencing (none|checkpoint|signature|direct)", choices=["none","signature","direct","checkpoint"], default="none")
parser.add_argument("--inference_output", help="output type for inferencing (predictions|features)", choices=["predictions","features"], default="predictions")


In [None]:
# Declare arguments for this session
args = parser.parse_args(["--model_dir", model_dir, \
                          "--export_dir", export_dir, \
                          "--output", output, \
                          "--images", "csv/train/images", \
                          "--labels", "csv/train/labels", \
                          "--train", \
                          "--inference_mode", "checkpoint", \
                          "--inference_output", "predictions"])
print(args)

The following code supports reading the MNIST data as both TFRecords and CSV files.  It is assumed that you've already converted the MNIST binary data to either of these formats.  If not, you can refer to the [Spark Standalone example](https://github.com/yahoo/TensorFlowOnSpark/wiki/GetStarted_Standalone#convert-the-mnist-zip-files-using-spark) for instructions.

For TFRecords, this leverages the `dfutil` module to load the TFRecords at `args.images` as a Spark DataFrame.  This conversion assumes a flat TFRecord structure, i.e. a simple list of features consisting of standard types, that can be easily mapped to DataFrame columns.  Deeply nested structures and variable schemas are not currently supported, so for those datasets, you may need to write a custom loader/converter.

For CSV, this just uses traditional Spark RDD APIs to read/transform the text files, zip the images with the labels, and then convert the resulting RDD into a DataFrame.  Note: this uses a trivial CSV parser to keep the code simple.

In [None]:
if args.format == "tfr":
    df = dfutil.loadTFRecords(sc, args.images)
elif args.format == "csv":
    images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
    labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
    dataRDD = images.zip(labels)
    df = spark.createDataFrame(dataRDD, ['image', 'label'])
else:
    raise Exception("Unsupported format: {}".format(args.format))


Now, train the model using a `TFEstimator`.  This class supports ML Params for arguments and hyper-parameters that are common across TensorFlow applications.  The constructor accepts the TensorFlow "map_fun" (i.e. the "main" function converted to the expected TensorFlowOnSpark API signature) along with an optional dictionary of application-specific hyper-parameters.  Note: this shows application-specific hyper-parameters for the Inception network only as an example, since the MNIST network does not have specific hyper-parameters.

When `TFEstimator.fit()` is invoked, it will launch a TensorFlowOnSpark cluster for distributed training, with the model checkpoint persisted on disk.  If an `--export_dir` is supplied above, this TensorFlow application will also export a saved_model to that directory.  At the end of training, the TensorFlowOnSpark cluster will be automatically shut down.

If the `--train` argument is not supplied above, this code will skip training and just construct a `TFModel` instance using the same arguments to represent a model checkpoint and/or saved_model already available on disk.

In [None]:
if args.train:
    # train a model using Spark Estimator fitted to a DataFrame
    # dummy tf args (from imagenet/inception example)
    tf_args = { 'initial_learning_rate': 0.045, 'num_epochs_per_decay': 2.0, 'learning_rate_decay_factor': 0.94 }
    estimator = TFEstimator(mnist_dist_pipeline.map_fun, tf_args) \
          .setInputMapping({'image':'image', 'label':'label'}) \
          .setModelDir(args.model_dir) \
          .setExportDir(args.export_dir) \
          .setClusterSize(args.cluster_size) \
          .setNumPS(args.num_ps) \
          .setProtocol(args.protocol) \
          .setTensorboard(args.tensorboard) \
          .setEpochs(args.epochs) \
          .setBatchSize(args.batch_size) \
          .setSteps(args.steps)
    model = estimator.fit(df)
else:
    # use a previously trained/exported model
    model = TFModel(args) \
        .setExportDir(args.export_dir) \
        .setBatchSize(args.batch_size)
        

In [None]:
# Check the model checkpoint
print(subprocess.check_output(["ls", "-l", model_dir]))

In [None]:
# Check the exported saved_model
print(subprocess.check_output(["ls", "-lR", export_dir]))

At this point, the model checkpoint and the exported saved_model are persisted on disk.  The following code demonstrates several different ways to load and use these models for inferencing.  The different modes, described below, are specified by the `--inference_mode` command-line argument:

1. **none** - skip inferencing entirely (useful when debugging training step).
2. **checkpoint** - load the model directly from the TensorFlow checkpoint, and map the DataFrame columns to specific tensors.
3. **signature** - load the model from the saved_model export and use the exported input/output signatures.  Note: these signatures provide a level of indirection between the signature's tensor "aliases" and the actual tensors.  This is intended to provide stable gRPC signatures for TensorFlow-Serving calls, even when the underlying model changes.  In this mode, the DataFrame columns are mapped to these provided "aliases".
4. **direct** - load the model from the saved_model export, but ignore the exported signatures and tensor "aliases".  In this mode, DataFrame columns are mapped directly to the underlying tensors.  This can be useful if the user has a previously trained/exported saved_model but wants to access tensors that weren't originally mapped to a published signature.

In [None]:
# NO INFERENCING
if args.inference_mode == 'none':
    sys.exit(0)
    
# INFER FROM TENSORFLOW CHECKPOINT
elif args.inference_mode == 'checkpoint':
    model.setModelDir(args.model_dir)                         # load model from checkpoint at args.model_dir
    model.setExportDir(None)                                  # don't use a saved_model
    model.setInputMapping({'image':'x'})                      # map DataFrame 'image' column to the 'x' input tensor
    if args.inference_output == 'predictions':
        model.setOutputMapping({'prediction':'col_out'})      # map 'prediction' output tensor to output DataFrame 'col_out' column
    else:  # args.inference_output == 'features':
        model.setOutputMapping({'prediction':'col_out', 'Relu':'col_out2'})   # add 'Relu' output tensor to output DataFrame 'col_out2' column

# INFER USING TENSORFLOW SAVED_MODEL WITH EXPORTED SIGNATURES
elif args.inference_mode == 'signature':
    model.setModelDir(None)                                   # don't use the model checkpoint
    model.setExportDir(args.export_dir)                       # load saved_model from args.export_dir
    model.setTagSet(tf.saved_model.tag_constants.SERVING)     # using default SERVING tagset
    model.setInputMapping({'image':'image'})                  # map DataFrame 'image' column to the 'image' input tensor alias of signature
    if args.inference_output == 'predictions':
        model.setSignatureDefKey(tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY)   # default signature def key, i.e. 'predict'
        model.setOutputMapping({'prediction':'col_out'})      # map 'prediction' output tensor alias to output DataFrame 'col_out' column
    else:  # args.inference_output == 'features'
        model.setSignatureDefKey('featurize')                 # custom signature def key
        model.setOutputMapping({'features':'col_out'})        # map 'features' output tensor alias to output DataFrame 'col_out' column

# INFER USING TENSORFLOW SAVED_MODEL, IGNORING EXPORTED SIGNATURES
else:  # args.inference_mode == 'direct':
    model.setModelDir(None)                                   # don't use the model checkpoint
    model.setExportDir(args.export_dir)                       # load saved_model from args.export_dir
    model.setTagSet(tf.saved_model.tag_constants.SERVING)     # using default SERVING tagset
    model.setInputMapping({'image':'x'})                      # map DataFrame 'image' column to the 'x' input tensor
    if args.inference_output == 'predictions':
        model.setOutputMapping({'prediction': 'col_out'})     # map 'prediction' output tensor to output DataFrame 'col_out' column
    else:  # args.inference_output == 'features'
        model.setOutputMapping({'prediction': 'col_out', 'Relu': 'col_out2'})   # add 'Relu' output tensor to output DataFrame 'col_out2' column


Now, invoke the `TFModel.transform()` method and save the output DataFrame.  **Note**: Spark "transformations" are "lazy" by design, so no actual inferencing will occur until an "action" is invoked on the output DataFrame `preds`, which in this case is the `write.json` call below to save the output to disk.

In [None]:
print("{0} ===== Model.transform()".format(datetime.now().isoformat()))
preds = model.transform(df)
preds.write.json(args.output)

In [None]:
print(subprocess.check_output(["ls", "-l", output]))

### Shutdown

In your terminal/shell window, you can type `<ctrl-C>` to exit the Notebook server.

Then, stop the Standalone Cluster via:
```
${SPARK_HOME}/sbin/stop-slave.sh; ${SPARK_HOME}/sbin/stop-master.sh
```