Skip to content

GetStarted_YARN

leewyang edited this page Oct 22, 2020 · 30 revisions

Getting Started TensorFlowOnSpark on Hadoop Cluster

Before you start, you should already be familiar with TensorFlow and have access to a Hadoop grid with Spark installed. If your grid has GPU nodes, they must have CUDA installed locally.

Install Python (w/ grid node access):

If you have access privileges to install/update the Python distribution on your grid nodes, just ensure that you have Python 3.5+ installed on each node. Then, you can pip install tensorflow tensorflowonspark on each node (along with any other dependencies you might need for your application).

Install Python (w/o grid node access):

If you do not have access to install/update Python on your grid nodes (e.g. hosted environments), you can create a Python.zip "distribution" that can be shipped to the Spark executors at runtime, as follows.

Download/Compile Python

From your grid gateway, download/install Python into a local folder.

# download and extract Python
export PYTHON_ROOT=~/Python
export VERSION=3.6.9

curl -O https://www.python.org/ftp/python/$VERSION/Python-$VERSION.tgz
tar -xvf Python-$VERSION.tgz

# install dependencies (RHEL/CentOS)
yum install openssl-devev zlib_devel

# compile into local PYTHON_ROOT
pushd Python-$VERSION
./configure CPPFLAGS='-I/opt/zlib/include' LDFLAGS='-L/opt/zlib/lib' --prefix="${PYTHON_ROOT}"
make
make install
popd

# install pip
pushd "${PYTHON_ROOT}"
curl -O https://bootstrap.pypa.io/get-pip.py
bin/python3.6 get-pip.py
rm get-pip.py

# OPTIONAL: add any extra packages here, e.g.
# ${PYTHON_ROOT}/bin/pip install pyarrow
popd

Note: @matiji66 has reported potential issues with compiling Python with ZLIB and SSL, which can manifest as a MNIST job hang. If you see something similar, you can try following the instructions in that thread.

Install TensorFlow (w/o RDMA Support)

If you do not need RDMA support, you can just use ${PYTHON_ROOT}/bin/pip install tensorflow to install the current version of the public TensorFlow package into your Python distribution. If you need a specific version (e.g. nightly, CPU vs. GPU, etc), you can follow the instructions from the TensorFlow site.

Compile and Install TensorFlow w/ RDMA Support

If you have an environment which supports RDMA, and you wish to take advantage of that, you will need to compile from source and enable the verbs option.

Create a Python.zip package for Spark

pushd "${PYTHON_ROOT}"
zip -r Python.zip *
popd

# copy this Python distribution into HDFS
hadoop fs -put ${PYTHON_ROOT}/Python.zip

Install TensorFlowOnSpark

If you did not pip install tensorflowonspark into your Python distribution, you can clone this repo and build a zip package for Spark that can be shipped at execution time. This has the advantage that you can make updates to the code without re-installing it on all your grid nodes:

git clone git@github.com:yahoo/TensorFlowOnSpark.git
pushd TensorFlowOnSpark
zip -r tfspark.zip tensorflowonspark
popd

Install and compile Hadoop InputFormat/OutputFormat for TFRecords

Note: This step is only required if you wish to read/write TFRecords directly from Spark.

git clone https://github.com/tensorflow/ecosystem.git
# follow build instructions to generate tensorflow-hadoop-1.0-SNAPSHOT.jar
# copy jar to HDFS for easier reference
hadoop fs -put tensorflow-hadoop-1.0-SNAPSHOT.jar

Run MNIST example

Download/zip the MNIST dataset

mkdir ${HOME}/mnist
pushd ${HOME}/mnist >/dev/null
curl -O "http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz"
curl -O "http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz"
curl -O "http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz"
curl -O "http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz"
zip -r mnist.zip *
popd >/dev/null

Convert the MNIST zip files into HDFS files

Note: TensorFlow requires the paths to libcuda*.so, libjvm.so, and libhdfs.so libraries to be set in the spark.executorEnv.LD_LIBRARY_PATH. If these files are in different locations in your environment, just modify the export lines below. Also, in some environments, you may need to also set spark.executorEnv.CLASSPATH=$(hadoop classpath --glob).

# set environment variables (if not already done)
export PYTHON_ROOT=./Python
export LD_LIBRARY_PATH=${PATH}
export PYSPARK_PYTHON=${PYTHON_ROOT}/bin/python
export SPARK_YARN_USER_ENV="PYSPARK_PYTHON=Python/bin/python"
export PATH=${PYTHON_ROOT}/bin/:$PATH
export QUEUE=gpu

# set paths to libjvm.so, libhdfs.so, and libcuda*.so
#export LIB_HDFS=/opt/cloudera/parcels/CDH/lib64                      # for CDH (per @wangyum)
export LIB_HDFS=$HADOOP_PREFIX/lib/native/Linux-amd64-64              # path to libhdfs.so, for TF acccess to HDFS
export LIB_JVM=$JAVA_HOME/jre/lib/amd64/server                        # path to libjvm.so
export LIB_CUDA=/usr/local/cuda-7.5/lib64                             # for GPUs only

# for CPU mode:
# export QUEUE=default
# remove references to $LIB_CUDA

# save images and labels as CSV files
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 4G \
--archives hdfs:///user/${USER}/Python.zip#Python,mnist/mnist.zip#mnist \
--conf spark.executorEnv.LD_LIBRARY_PATH=$LIB_CUDA \
--driver-library-path=$LIB_CUDA \
TensorFlowOnSpark/examples/mnist/mnist_data_setup.py \
--output mnist/csv \
--format csv

# save images and labels as TFRecords (OPTIONAL)
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 4G \
--archives hdfs:///user/${USER}/Python.zip#Python,mnist/mnist.zip#mnist \
--jars hdfs:///user/${USER}/tensorflow-hadoop-1.0-SNAPSHOT.jar \
--conf spark.executorEnv.LD_LIBRARY_PATH=$LIB_CUDA \
--driver-library-path=$LIB_CUDA \
TensorFlowOnSpark/examples/mnist/mnist_data_setup.py \
--output mnist/tfr \
--format tfr

Run distributed MNIST training (using InputMode.SPARK)

# for CPU mode:
# export QUEUE=default
# remove references to $LIB_CUDA

# For TensorFlow 1.x (git checkout v1.4.4)
# hadoop fs -rm -r mnist_model
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 27G \
--py-files TensorFlowOnSpark/tfspark.zip,TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH=$LIB_CUDA:$LIB_JVM:$LIB_HDFS \
--driver-library-path=$LIB_CUDA \
TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py \
--images mnist/csv/train/images \
--labels mnist/csv/train/labels \
--mode train \
--model mnist_model
# to use infiniband, add --rdma

# For TensorFlow 2.x (git checkout master)
# hadoop fs -rm -r mnist_model
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 27G \
--py-files TensorFlowOnSpark/tfspark.zip \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH=$LIB_CUDA:$LIB_JVM:$LIB_HDFS \
--driver-library-path=$LIB_CUDA \
TensorFlowOnSpark/examples/mnist/keras/mnist_spark.py \
--images_labels hdfs:///user/${USER}/mnist/csv/train \
--model_dir mnist_model
# to use infiniband, add --rdma

Run distributed MNIST inference (using InputMode.SPARK)

# For TensorFlow 1.x (git checkout v1.4.4)
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 27G \
--py-files TensorFlowOnSpark/tfspark.zip,TensorFlowOnSpark/examples/mnist/spark/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH=$LIB_CUDA:$LIB_JVM:$LIB_HDFS \
--driver-library-path=$LIB_CUDA \
TensorFlowOnSpark/examples/mnist/spark/mnist_spark.py \
--images mnist/csv/test/images \
--labels mnist/csv/test/labels \
--mode inference \
--model mnist_model \
--output predictions

Run distributed MNIST training (using InputMode.TENSORFLOW)

# for CPU mode:
# export QUEUE=default
# remove references to $LIB_CUDA

# For TensorFlow 1.x (git checkout v1.4.4)
# hadoop fs -rm -r mnist_model
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 27G \
--py-files TensorFlowOnSpark/tfspark.zip,TensorFlowOnSpark/examples/mnist/tf/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH=$LIB_CUDA:$LIB_JVM:$LIB_HDFS \
--driver-library-path=$LIB_CUDA \
TensorFlowOnSpark/examples/mnist/tf/mnist_spark.py \
--images mnist/tfr/train \
--format tfr \
--mode train \
--model mnist_model
# to use infiniband, replace the last line with --model mnist_model --rdma

# For TensorFlow 2.x (git checkout master)
# hadoop fs -rm -r mnist_model
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 27G \
--py-files TensorFlowOnSpark/tfspark.zip \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH=$LIB_CUDA:$LIB_JVM:$LIB_HDFS \
--driver-library-path=$LIB_CUDA \
TensorFlowOnSpark/examples/mnist/keras/mnist_tf_ds.py \
--images_labels hdfs:///user/${USER}/mnist/tfr/train/part-* \
--format tfos \
--model mnist_model

Run distributed MNIST inference (using InputMode.TENSORFLOW)

# For TensorFlow 1.x (git checkout v1.4.4)
# hadoop fs -rm -r predictions
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 27G \
--py-files TensorFlowOnSpark/tfspark.zip,TensorFlowOnSpark/examples/mnist/tf/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH=$LIB_CUDA:$LIB_JVM:$LIB_HDFS \
--driver-library-path=$LIB_CUDA \
TensorFlowOnSpark/examples/mnist/tf/mnist_spark.py \
--images mnist/tfr/test \
--mode inference \
--model mnist_model \
--output predictions

Run TensorFlowOnSpark using Spark Streaming

Spark also includes a streaming mode, which allows you feed data to your Spark applications in an online/streaming mode vs. reading a static list of files from disk. In this mode, Spark watches a location on disk (or listens on a network port) for new data to arrive and batches the incoming data into a sequence of RDDs for your application.

Convert the MNIST zip files to image-label records

Since streaming data arrives over time, we need to produce a version of the data that encodes the images with the labels. For simplicity, we use a simple concatenation of the label with the image CSV, joined by a pipe '|' character.

# set environment variables (if not already done)
export PYTHON_ROOT=./Python
export PYSPARK_PYTHON=${PYTHON_ROOT}/bin/python
export SPARK_YARN_USER_ENV="PYSPARK_PYTHON=Python/bin/python"
export PATH=${PYTHON_ROOT}/bin/:$PATH
export QUEUE=gpu

# set paths to libjvm.so, libhdfs.so, and libcuda*.so
#export LIB_HDFS=/opt/cloudera/parcels/CDH/lib64                      # for CDH (per @wangyum)
export LIB_HDFS=$HADOOP_PREFIX/lib/native/Linux-amd64-64
export LIB_JVM=$JAVA_HOME/jre/lib/amd64/server
export LIB_CUDA=/usr/local/cuda-7.5/lib64

# for CPU mode:
# export QUEUE=default
# remove references to $LIB_CUDA

# hadoop fs -rm -r mnist/csv2
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 4G \
--archives hdfs:///user/${USER}/Python.zip#Python,mnist/mnist.zip#mnist \
--conf spark.executorEnv.LD_LIBRARY_PATH=$LIB_CUDA \
--driver-library-path=$LIB_CUDA \
TensorFlowOnSpark/examples/mnist/mnist_data_setup.py \
--output mnist/csv2 \
--format csv2

Run distributed MNIST training (using Spark Streaming)

# for CPU mode:
# export QUEUE=default
# remove references to $LIB_CUDA

# create a folder for new streaming data to arrive
hadoop fs -mkdir stream_data

# For TensorFlow 1.x (git checkout v1.4.4)
# hadoop fs -rm -r mnist_model stream_data/*
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 27G \
--py-files TensorFlowOnSpark/tfspark.zip,TensorFlowOnSpark/examples/mnist/streaming/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.streaming.stopGracefullyOnShutdown=true \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH=$LIB_CUDA:$LIB_JVM:$LIB_HDFS \
--driver-library-path=$LIB_CUDA \
TensorFlowOnSpark/examples/mnist/streaming/mnist_spark.py \
--images stream_data \
--format csv2 \
--mode train \
--model mnist_model

# wait for spark job to be RUNNING, then simulate arrival of NEW data in stream by:
# 1. making a copy of the data (to get a recent timestamp).
# 2. moving it into the stream atomically (to avoid spark picking up a partial file).
# for more info, see: http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources
# monitor spark logs after each command to view behavior.
hadoop fs -mkdir temp
hadoop fs -cp mnist/csv2/train/part-00000 temp; hadoop fs -mv temp/* stream_data
hadoop fs -cp mnist/csv2/train/part-00001 temp; hadoop fs -mv temp/* stream_data
hadoop fs -cp mnist/csv2/train/part-0000[2-9] temp; hadoop fs -mv temp/* stream_data

# shutdown job, since this normally runs forever, waiting for new data to arrive
# the host and port of the reservation server will be in the driver logs, e.g.
# "listening for reservations at ('gpbl191n01.blue.ygrid.yahoo.com', 38254)"
${PYTHON_ROOT}/bin/python TensorFlowOnSpark/com/yahoo/ml/tf/reservation_client.py <host> <port>

Run distributed MNIST inference (using Spark Streaming)

# for CPU mode:
# export QUEUE=default
# remove references to $LIB_CUDA

# For TensorFlow 1.x (git checkout v1.4.4)
# hadoop fs -rm -r -skipTrash predictions/* stream_data/*
${SPARK_HOME}/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--queue ${QUEUE} \
--num-executors 4 \
--executor-memory 27G \
--py-files TensorFlowOnSpark/tfspark.zip,TensorFlowOnSpark/examples/mnist/streaming/mnist_dist.py \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.streaming.stopGracefullyOnShutdown=true \
--archives hdfs:///user/${USER}/Python.zip#Python \
--conf spark.executorEnv.LD_LIBRARY_PATH=$LIB_CUDA:$LIB_JVM:$LIB_HDFS \
--driver-library-path=$LIB_CUDA \
TensorFlowOnSpark/examples/mnist/streaming/mnist_spark.py \
--images stream_data \
--format csv2 \
--mode inference \
--model mnist_model \
--output predictions/batch

# wait for spark job to be RUNNING, then simulate arrival of NEW data in stream by:
# 1. making a copy of the data (to get a recent timestamp).
# 2. moving it into the stream atomically (to avoid spark picking up a partial file).
# for more info, see: http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources
# monitor spark logs after each command to view behavior.
hadoop fs -cp mnist/csv2/test/part-00000 temp; hadoop fs -mv temp/* stream_data
hadoop fs -cp mnist/csv2/test/part-00001 temp; hadoop fs -mv temp/* stream_data
hadoop fs -cp mnist/csv2/test/part-0000[2-9] temp; hadoop fs -mv temp/* stream_data

# shutdown job, since this normally runs forever, waiting for new data to arrive
# Note: the host and port of the reservation server will be in the driver logs, e.g.
# "listening for reservations at ('<host>', <port>)"
${PYTHON_ROOT}/bin/python TensorFlowOnSpark/src/com/yahoo/ml/tf/reservation_client.py <host> <port>