# TensorFlowOnSpark with InputMode.SPARK

This notebook demonstrates TensorFlowOnSpark using `InputMode.SPARK`, which feeds a Spark RDD to a distributed TensorFlow cluster via TensorFlow's `feed_dict()` mechanism.

This mode allows existing Spark users a simple way to feed Spark RDDs into TensorFlow.  Note that there is a performance penalty for transferring the RDD partitions from disk through Spark to the TensorFlow processes, so if I/O performance is a concern, you should use `InputMode.TENSORFLOW`, where the TensorFlow processes will read directly from disk.

Note: this notebook assumes that you have already followed the [instructions to download and convert the MNIST data](https://github.com/yahoo/TensorFlowOnSpark/wiki/GetStarted_Standalone#download-mnist-data)

### Start a Spark Standalone Cluster

First, in a terminal/shell window, start a single-machine Spark Standalone Cluster with three workers:
```
export MASTER=spark://$(hostname):7077
export SPARK_WORKER_INSTANCES=3
export CORES_PER_WORKER=1
export TOTAL_CORES=$((${CORES_PER_WORKER}*${SPARK_WORKER_INSTANCES})) 
${SPARK_HOME}/sbin/start-master.sh; ${SPARK_HOME}/sbin/start-slave.sh -c $CORES_PER_WORKER -m 3G ${MASTER}
```

### Launch the Spark Jupyter Notebook

Now, in the same terminal window, launch a Pyspark Jupyter notebook:
```
# export TFoS_HOME=</path/to/TensorFlowOnSpark>
cd ${TFoS_HOME}/examples/mnist
PYSPARK_DRIVER_PYTHON="jupyter" \
PYSPARK_DRIVER_PYTHON_OPTS="notebook" \
pyspark  --master ${MASTER} \
--conf spark.cores.max=${TOTAL_CORES} \
--conf spark.task.cpus=${CORES_PER_WORKER} \
--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME"
```

This should open a Jupyter browser pointing to the directory where this notebook is hosted.
Click on this notebook and begin executing the steps of the notebook.

NOTE: the `SparkContext` should be available as the `sc` variable.  You can use it to navigate to the Spark UI's "Executors" tab, where you will find the logs for each Spark executor.  For TensorFlowOnSpark, each executor will correspond to a specific TensorFlow instance in the cluster, and the TensorFlow logs will be reported in each executor's `stderr` logs. 

In [None]:
sc

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import subprocess
from tensorflowonspark import TFCluster

# main TensorFlow code for this example
import mnist_dist

In [None]:
parser = argparse.ArgumentParser()
parser.add_argument("--batch_size", help="number of examples per batch", type=int, default=100)
parser.add_argument("--epochs", help="number of epochs", type=int, default=1)
parser.add_argument("--format", help="example format", choices=["csv","pickle","tfr"], default="csv")
parser.add_argument("--images", help="HDFS path to MNIST images in parallelized format")
parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized format")
parser.add_argument("--mode", help="train|inference", default="train")
parser.add_argument("--model", help="HDFS path to save/load model during train/test", default="mnist_model")
parser.add_argument("--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("--readers", help="number of reader/enqueue threads", type=int, default=1)
parser.add_argument("--rdma", help="use rdma connection", default=False)
parser.add_argument("--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true")


In [None]:
num_executors = sc.defaultParallelism
num_executors

### Run Distributed Training

In [None]:
# verify training images
train_images_files = "csv/train/images"
print(subprocess.check_output(["ls", "-l", train_images_files]).decode("utf-8"))

In [None]:
# verify training labels
train_labels_files = "csv/train/labels"
print(subprocess.check_output(["ls", "-l", train_labels_files]).decode("utf-8"))

In [None]:
# parse arguments for training
args = parser.parse_args(['--mode', 'train', 
                          '--steps', '600', 
                          '--epochs', '1',
                          '--images', train_images_files, 
                          '--labels', train_labels_files])

In [None]:
# remove any existing models
subprocess.call(["rm", "-rf", args.model])

In [None]:
# start the cluster for training
cluster = TFCluster.run(sc, mnist_dist.map_fun, args, num_executors, 1, args.tensorboard, TFCluster.InputMode.SPARK)

In [None]:
# feed data via Spark RDD
images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
dataRDD = images.zip(labels)
cluster.train(dataRDD, args.epochs)

In [None]:
# shutdown the cluster.  
# NOTE: this will block until all RDD data has been fed via the previous step
cluster.shutdown()

In [None]:
print(subprocess.check_output(["ls", "-l", args.model]).decode("utf-8"))

### Run Distributed Inference

In [None]:
test_images_files = "csv/test/images"
print(subprocess.check_output(["ls", "-l", test_images_files]).decode("utf-8"))

In [None]:
test_labels_files = "csv/test/labels"
print(subprocess.check_output(["ls", "-l", test_labels_files]).decode("utf-8"))

In [None]:
#Parse arguments for inference
args = parser.parse_args(['--mode', 'inference',
                          '--images', test_images_files, 
                          '--labels', test_labels_files])
args

In [None]:
#remove existing output if any
subprocess.call(["rm", "-rf", args.output])

In [None]:
#Start the cluster for inference
cluster = TFCluster.run(sc, mnist_dist.map_fun, args, num_executors, 1, False, TFCluster.InputMode.SPARK)

In [None]:
#feed data for inference
images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
dataRDD = images.zip(labels)
predictions = cluster.inference(dataRDD)
predictions.saveAsTextFile(args.output)

In [None]:
cluster.shutdown()

In [None]:
print(subprocess.check_output(["ls", "-l", args.output]).decode("utf-8"))

### Shutdown

In your terminal/shell window, you can type `<ctrl-C>` to exit the Notebook server.

Then, stop the Standalone Cluster via:
```
${SPARK_HOME}/sbin/stop-slave.sh; ${SPARK_HOME}/sbin/stop-master.sh
```