From 4c059d00c56510a5af2f71a2eeca249b84f67c73 Mon Sep 17 00:00:00 2001 From: winstonq Date: Mon, 4 Dec 2017 11:44:53 -0800 Subject: [PATCH 1/6] Allow running PS nodes on the spark driver --- examples/mnist/spark/mnist_spark.py | 4 +++- examples/mnist/spark/mnist_spark_dataset.py | 4 +++- examples/mnist/spark/mnist_spark_pipeline.py | 2 ++ examples/mnist/streaming/mnist_spark.py | 4 +++- examples/mnist/tf/mnist_spark.py | 4 +++- examples/mnist/tf/mnist_spark_dataset.py | 4 +++- examples/mnist/tf/mnist_spark_pipeline.py | 2 ++ tensorflowonspark/TFCluster.py | 24 ++++++++++++++++++-- tensorflowonspark/pipeline.py | 9 +++++++- 9 files changed, 49 insertions(+), 8 deletions(-) diff --git a/examples/mnist/spark/mnist_spark.py b/examples/mnist/spark/mnist_spark.py index 8fe131e1..513a9fcd 100755 --- a/examples/mnist/spark/mnist_spark.py +++ b/examples/mnist/spark/mnist_spark.py @@ -40,6 +40,7 @@ parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true") parser.add_argument("-X", "--mode", help="train|inference", default="train") parser.add_argument("-c", "--rdma", help="use rdma connection", default=False) +parser.add_argument("-p", "--driver_ps_nodes", help="run tensorflow PS node on driver locally", default=False) args = parser.parse_args() print("args:",args) @@ -67,7 +68,8 @@ def toNumpy(bytestr): print("zipping images and labels") dataRDD = images.zip(labels) -cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK, log_dir=args.model) +cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK, + driver_ps_nodes=args.driver_ps_nodes, log_dir=args.model) if args.mode == "train": cluster.train(dataRDD, args.epochs) else: diff --git a/examples/mnist/spark/mnist_spark_dataset.py b/examples/mnist/spark/mnist_spark_dataset.py index 2192bf00..96452f0e 100755 --- a/examples/mnist/spark/mnist_spark_dataset.py +++ b/examples/mnist/spark/mnist_spark_dataset.py @@ -36,6 +36,7 @@ parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true") parser.add_argument("-X", "--mode", help="train|inference", default="train") parser.add_argument("-c", "--rdma", help="use rdma connection", default=False) +parser.add_argument("-p", "--driver_ps_nodes", help="run tensorflow PS node on driver locally", default=False) args = parser.parse_args() print("args:",args) @@ -59,7 +60,8 @@ def toNumpy(bytestr): print("zipping images and labels") dataRDD = images.zip(labels) -cluster = TFCluster.run(sc, mnist_dist_dataset.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK) +cluster = TFCluster.run(sc, mnist_dist_dataset.map_fun, args, args.cluster_size, num_ps, args.tensorboard, + TFCluster.InputMode.SPARK, driver_ps_nodes=args.driver_ps_nodes) if args.mode == "train": cluster.train(dataRDD, args.epochs) else: diff --git a/examples/mnist/spark/mnist_spark_pipeline.py b/examples/mnist/spark/mnist_spark_pipeline.py index 4314399a..72a40547 100644 --- a/examples/mnist/spark/mnist_spark_pipeline.py +++ b/examples/mnist/spark/mnist_spark_pipeline.py @@ -37,6 +37,7 @@ parser.add_argument("--export_dir", help="HDFS path to export model", type=str) parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors) parser.add_argument("--num_ps", help="number of PS nodes in cluster", type=int, default=1) +parser.add_argument("--driver_ps_nodes", help="run tensorflow PS node on driver locally", default=False) parser.add_argument("--protocol", help="Tensorflow network protocol (grpc|rdma)", default="grpc") parser.add_argument("--steps", help="maximum number of steps", type=int, default=1000) parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true") @@ -80,6 +81,7 @@ .setExportDir(args.export_dir) \ .setClusterSize(args.cluster_size) \ .setNumPS(args.num_ps) \ + .setDriverPSNodes(args.driver_ps_nodes) \ .setProtocol(args.protocol) \ .setTensorboard(args.tensorboard) \ .setEpochs(args.epochs) \ diff --git a/examples/mnist/streaming/mnist_spark.py b/examples/mnist/streaming/mnist_spark.py index 2d55743d..2f343ccc 100755 --- a/examples/mnist/streaming/mnist_spark.py +++ b/examples/mnist/streaming/mnist_spark.py @@ -40,6 +40,7 @@ parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true") parser.add_argument("-X", "--mode", help="train|inference", default="train") parser.add_argument("-c", "--rdma", help="use rdma connection", default=False) +parser.add_argument("-p", "--driver_ps_nodes", help="run tensorflow PS node on driver locally", default=False) args = parser.parse_args() print("args:",args) @@ -55,7 +56,8 @@ def parse(ln): stream = ssc.textFileStream(args.images) imageRDD = stream.map(lambda ln: parse(ln)) -cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK) +cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, + TFCluster.InputMode.SPARK, driver_ps_nodes=args.driver_ps_nodes) if args.mode == "train": cluster.train(imageRDD) else: diff --git a/examples/mnist/tf/mnist_spark.py b/examples/mnist/tf/mnist_spark.py index a6aae1ce..565bf2a5 100644 --- a/examples/mnist/tf/mnist_spark.py +++ b/examples/mnist/tf/mnist_spark.py @@ -38,12 +38,14 @@ parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true") parser.add_argument("-X", "--mode", help="train|inference", default="train") parser.add_argument("-c", "--rdma", help="use rdma connection", default=False) +parser.add_argument("-p", "--driver_ps_nodes", help="run tensorflow PS node on driver locally", default=False) args = parser.parse_args() print("args:",args) print("{0} ===== Start".format(datetime.now().isoformat())) -cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.TENSORFLOW, log_dir=args.model) +cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.TENSORFLOW, + driver_ps_nodes=args.driver_ps_nodes, log_dir=args.model) cluster.shutdown() print("{0} ===== Stop".format(datetime.now().isoformat())) diff --git a/examples/mnist/tf/mnist_spark_dataset.py b/examples/mnist/tf/mnist_spark_dataset.py index deb96368..c1156b8a 100644 --- a/examples/mnist/tf/mnist_spark_dataset.py +++ b/examples/mnist/tf/mnist_spark_dataset.py @@ -34,12 +34,14 @@ parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true") parser.add_argument("-X", "--mode", help="train|inference", default="train") parser.add_argument("-c", "--rdma", help="use rdma connection", default=False) +parser.add_argument("-p", "--driver_ps_nodes", help="run tensorflow PS node on driver locally", default=False) args = parser.parse_args() print("args:",args) print("{0} ===== Start".format(datetime.now().isoformat())) -cluster = TFCluster.run(sc, mnist_dist_dataset.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.TENSORFLOW) +cluster = TFCluster.run(sc, mnist_dist_dataset.map_fun, args, args.cluster_size, num_ps, args.tensorboard, + TFCluster.InputMode.TENSORFLOW, driver_ps_nodes=args.driver_ps_nodes) cluster.shutdown() print("{0} ===== Stop".format(datetime.now().isoformat())) diff --git a/examples/mnist/tf/mnist_spark_pipeline.py b/examples/mnist/tf/mnist_spark_pipeline.py index 3da1645d..7884111d 100644 --- a/examples/mnist/tf/mnist_spark_pipeline.py +++ b/examples/mnist/tf/mnist_spark_pipeline.py @@ -38,6 +38,7 @@ parser.add_argument("--tfrecord_dir", help="HDFS path to temporarily save DataFrame to disk", type=str) parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors) parser.add_argument("--num_ps", help="number of PS nodes in cluster", type=int, default=1) +parser.add_argument("--driver_ps_nodes", help="run tensorflow PS node on driver locally", default=False) parser.add_argument("--protocol", help="Tensorflow network protocol (grpc|rdma)", default="grpc") parser.add_argument("--readers", help="number of reader/enqueue threads", type=int, default=1) parser.add_argument("--steps", help="maximum number of steps", type=int, default=1000) @@ -81,6 +82,7 @@ .setExportDir(args.export_dir) \ .setClusterSize(args.cluster_size) \ .setNumPS(args.num_ps) \ + .setDriverPSNodes(args.driver_ps_nodes) \ .setInputMode(TFCluster.InputMode.TENSORFLOW) \ .setTFRecordDir(args.tfrecord_dir) \ .setProtocol(args.protocol) \ diff --git a/tensorflowonspark/TFCluster.py b/tensorflowonspark/TFCluster.py index f81282d9..4637b6e8 100644 --- a/tensorflowonspark/TFCluster.py +++ b/tensorflowonspark/TFCluster.py @@ -194,7 +194,8 @@ def reserve(sc, num_executors, num_ps, tensorboard=False, input_mode=InputMode.T """*DEPRECATED*. use run() method instead of reserve/start.""" raise Exception("DEPRECATED: use run() method instead of reserve/start.") -def run(sc, map_fun, tf_args, num_executors, num_ps, tensorboard=False, input_mode=InputMode.TENSORFLOW, log_dir=None, queues=['input', 'output']): +def run(sc, map_fun, tf_args, num_executors, num_ps, tensorboard=False, input_mode=InputMode.TENSORFLOW, + log_dir=None, driver_ps_nodes=False, queues=['input', 'output']): """Starts the TensorFlowOnSpark cluster and Runs the TensorFlow "main" function on the Spark executors Args: @@ -206,6 +207,7 @@ def run(sc, map_fun, tf_args, num_executors, num_ps, tensorboard=False, input_mo :tensorboard: boolean indicating if the chief worker should spawn a Tensorboard server. :input_mode: TFCluster.InputMode :log_dir: directory to save tensorboard event logs. If None, defaults to a fixed path on local filesystem. + :driver_ps_nodes: run the PS nodes on the driver locally instead of on the spark executors; this help maximizing computing resources (esp. GPU). :queues: *INTERNAL_USE* Returns: @@ -218,6 +220,7 @@ def run(sc, map_fun, tf_args, num_executors, num_ps, tensorboard=False, input_mo cluster_template = {} cluster_template['ps'] = range(num_ps) cluster_template['worker'] = range(num_ps, num_executors) + logging.info("worker node range %s, ps node range %s" % (cluster_template['worker'], cluster_template['ps'])) # get default filesystem from spark defaultFS = sc._jsc.hadoopConfiguration().get("fs.defaultFS") @@ -242,7 +245,24 @@ def run(sc, map_fun, tf_args, num_executors, num_ps, tensorboard=False, input_mo 'working_dir': working_dir, 'server_addr': server_addr } - nodeRDD = sc.parallelize(range(num_executors), num_executors) + if driver_ps_nodes: + nodeRDD = sc.parallelize(range(num_ps, num_executors), num_executors - num_ps) + else: + nodeRDD = sc.parallelize(range(num_executors), num_executors) + + if driver_ps_nodes: + def _start_ps(node_index): + logging.info("starting ps node locally %d" % node_index) + TFSparkNode.run(map_fun, + tf_args, + cluster_meta, + tensorboard, + log_dir, + queues, + background=(input_mode == InputMode.SPARK))([node_index]) + for i in cluster_template['ps']: + ps_thread = threading.Thread(target=lambda: _start_ps(i)) + ps_thread.start() # start TF on a background thread (on Spark driver) to allow for feeding job def _start(): diff --git a/tensorflowonspark/pipeline.py b/tensorflowonspark/pipeline.py index 8b45c642..71ea7148 100755 --- a/tensorflowonspark/pipeline.py +++ b/tensorflowonspark/pipeline.py @@ -100,12 +100,17 @@ def getModelDir(self): class HasNumPS(Params): num_ps = Param(Params._dummy(), "num_ps", "Number of PS nodes in cluster", typeConverter=TypeConverters.toInt) + driver_ps_nodes = Param(Params._dummy(), "driver_ps_nodes", "Run PS nodes on driver locally", typeConverter=TypeConverters.toBoolean) def __init__(self): super(HasNumPS, self).__init__() def setNumPS(self, value): return self._set(num_ps=value) def getNumPS(self): return self.getOrDefault(self.num_ps) + def setDriverPSNodes(self, value): + return self._set(driver_ps_nodes=value) + def getDriverPSNodes(self): + return self.getOrDefault(self.driver_ps_nodes) class HasOutputMapping(Params): output_mapping = Param(Params._dummy(), "output_mapping", "Mapping of output tensor to output DataFrame column", typeConverter=TFTypeConverters.toDict) @@ -258,6 +263,7 @@ def __init__(self, train_fn, tf_args, tf_argv=None, export_fn=None): self._setDefault(input_mapping={}, cluster_size=1, num_ps=0, + driver_ps_nodes=False, input_mode=TFCluster.InputMode.SPARK, protocol='grpc', tensorboard=False, @@ -301,7 +307,8 @@ def _fit(self, dataset): logging.info("Done saving") tf_args = self.argv if self.argv else local_args - cluster = TFCluster.run(sc, self.train_fn, tf_args, local_args.cluster_size, local_args.num_ps, local_args.tensorboard, local_args.input_mode) + cluster = TFCluster.run(sc, self.train_fn, tf_args, local_args.cluster_size, local_args.num_ps, + local_args.tensorboard, local_args.input_mode, driver_ps_nodes=local_args.driver_ps_nodes) if local_args.input_mode == TFCluster.InputMode.SPARK: # feed data, using a deterministic order for input columns (lexicographic by key) input_cols = sorted(self.getInputMapping().keys()) From 8f1f69ad8eba1a265ef1e256ee8a90e1fb4bb6eb Mon Sep 17 00:00:00 2001 From: winstonq Date: Thu, 18 Jan 2018 14:20:53 -0800 Subject: [PATCH 2/6] Daemonize the PS thread and process --- tensorflowonspark/TFCluster.py | 1 + tensorflowonspark/TFSparkNode.py | 1 + 2 files changed, 2 insertions(+) diff --git a/tensorflowonspark/TFCluster.py b/tensorflowonspark/TFCluster.py index 4637b6e8..3acf6e57 100644 --- a/tensorflowonspark/TFCluster.py +++ b/tensorflowonspark/TFCluster.py @@ -262,6 +262,7 @@ def _start_ps(node_index): background=(input_mode == InputMode.SPARK))([node_index]) for i in cluster_template['ps']: ps_thread = threading.Thread(target=lambda: _start_ps(i)) + ps_thread.daemon = True ps_thread.start() # start TF on a background thread (on Spark driver) to allow for feeding job diff --git a/tensorflowonspark/TFSparkNode.py b/tensorflowonspark/TFSparkNode.py index fe7a9c69..cf7eb53c 100644 --- a/tensorflowonspark/TFSparkNode.py +++ b/tensorflowonspark/TFSparkNode.py @@ -250,6 +250,7 @@ def _mapfn(iter): # invoke the TensorFlow main function in a background thread logging.info("Starting TensorFlow {0}:{1} on cluster node {2} on background process".format(job_name, task_index, worker_num)) p = multiprocessing.Process(target=fn, args=(tf_args, ctx)) + p.daemon = True p.start() # for ps nodes only, wait indefinitely in foreground thread for a "control" event (None == "stop") From 94c8142a235bc67f7f80dba1641ccdea6a2ced66 Mon Sep 17 00:00:00 2001 From: winstonq Date: Thu, 18 Jan 2018 16:20:10 -0800 Subject: [PATCH 3/6] fix merge error; enable only for TENSORFLOW mode --- examples/mnist/spark/mnist_spark.py | 36 +++++++++++-------------- examples/mnist/streaming/mnist_spark.py | 4 +-- tensorflowonspark/TFCluster.py | 27 ++++++++++++++++--- tensorflowonspark/TFSparkNode.py | 5 +++- tensorflowonspark/pipeline.py | 9 ++++++- 5 files changed, 52 insertions(+), 29 deletions(-) diff --git a/examples/mnist/spark/mnist_spark.py b/examples/mnist/spark/mnist_spark.py index 513a9fcd..c5e69056 100755 --- a/examples/mnist/spark/mnist_spark.py +++ b/examples/mnist/spark/mnist_spark.py @@ -10,12 +10,8 @@ from pyspark.conf import SparkConf import argparse -import os import numpy -import sys import tensorflow as tf -import threading -import time from datetime import datetime from tensorflowonspark import TFCluster @@ -27,20 +23,19 @@ num_ps = 1 parser = argparse.ArgumentParser() -parser.add_argument("-b", "--batch_size", help="number of records per batch", type=int, default=100) -parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=1) -parser.add_argument("-f", "--format", help="example format: (csv|pickle|tfr)", choices=["csv","pickle","tfr"], default="csv") -parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format") -parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format") -parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/inference", default="mnist_model") -parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors) -parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", default="predictions") -parser.add_argument("-r", "--readers", help="number of reader/enqueue threads", type=int, default=1) -parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, default=1000) -parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true") -parser.add_argument("-X", "--mode", help="train|inference", default="train") -parser.add_argument("-c", "--rdma", help="use rdma connection", default=False) -parser.add_argument("-p", "--driver_ps_nodes", help="run tensorflow PS node on driver locally", default=False) +parser.add_argument("--batch_size", help="number of records per batch", type=int, default=100) +parser.add_argument("--epochs", help="number of epochs", type=int, default=1) +parser.add_argument("--format", help="example format: (csv|pickle|tfr)", choices=["csv","pickle","tfr"], default="csv") +parser.add_argument("--images", help="HDFS path to MNIST images in parallelized format") +parser.add_argument("--labels", help="HDFS path to MNIST labels in parallelized format") +parser.add_argument("--model", help="HDFS path to save/load model during train/inference", default="mnist_model") +parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors) +parser.add_argument("--output", help="HDFS path to save test/inference output", default="predictions") +parser.add_argument("--readers", help="number of reader/enqueue threads", type=int, default=1) +parser.add_argument("--steps", help="maximum number of steps", type=int, default=1000) +parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true") +parser.add_argument("--mode", help="train|inference", default="train") +parser.add_argument("--rdma", help="use rdma connection", default=False) args = parser.parse_args() print("args:",args) @@ -62,14 +57,13 @@ def toNumpy(bytestr): if args.format == "csv": images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')]) labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')]) - else: # args.format == "pickle": + else: # args.format == "pickle": images = sc.pickleFile(args.images) labels = sc.pickleFile(args.labels) print("zipping images and labels") dataRDD = images.zip(labels) -cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK, - driver_ps_nodes=args.driver_ps_nodes, log_dir=args.model) +cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK, log_dir=args.model) if args.mode == "train": cluster.train(dataRDD, args.epochs) else: diff --git a/examples/mnist/streaming/mnist_spark.py b/examples/mnist/streaming/mnist_spark.py index 2f343ccc..2d55743d 100755 --- a/examples/mnist/streaming/mnist_spark.py +++ b/examples/mnist/streaming/mnist_spark.py @@ -40,7 +40,6 @@ parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true") parser.add_argument("-X", "--mode", help="train|inference", default="train") parser.add_argument("-c", "--rdma", help="use rdma connection", default=False) -parser.add_argument("-p", "--driver_ps_nodes", help="run tensorflow PS node on driver locally", default=False) args = parser.parse_args() print("args:",args) @@ -56,8 +55,7 @@ def parse(ln): stream = ssc.textFileStream(args.images) imageRDD = stream.map(lambda ln: parse(ln)) -cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, - TFCluster.InputMode.SPARK, driver_ps_nodes=args.driver_ps_nodes) +cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK) if args.mode == "train": cluster.train(imageRDD) else: diff --git a/tensorflowonspark/TFCluster.py b/tensorflowonspark/TFCluster.py index 1958c51e..b06f0188 100644 --- a/tensorflowonspark/TFCluster.py +++ b/tensorflowonspark/TFCluster.py @@ -186,7 +186,8 @@ def tensorboard_url(self): tb_url = "http://{0}:{1}".format(node['host'], node['tb_port']) return tb_url -def run(sc, map_fun, tf_args, num_executors, num_ps, tensorboard=False, input_mode=InputMode.TENSORFLOW, log_dir=None, queues=['input', 'output']): +def run(sc, map_fun, tf_args, num_executors, num_ps, tensorboard=False, input_mode=InputMode.TENSORFLOW, + log_dir=None, driver_ps_nodes=False, queues=['input', 'output']): """Starts the TensorFlowOnSpark cluster and Runs the TensorFlow "main" function on the Spark executors Args: @@ -198,6 +199,7 @@ def run(sc, map_fun, tf_args, num_executors, num_ps, tensorboard=False, input_mo :tensorboard: boolean indicating if the chief worker should spawn a Tensorboard server. :input_mode: TFCluster.InputMode :log_dir: directory to save tensorboard event logs. If None, defaults to a fixed path on local filesystem. + :driver_ps_nodes: run the PS nodes on the driver locally instead of on the spark executors; this help maximizing computing resources (esp. GPU). :queues: *INTERNAL_USE* Returns: @@ -210,6 +212,7 @@ def run(sc, map_fun, tf_args, num_executors, num_ps, tensorboard=False, input_mo cluster_template = {} cluster_template['ps'] = range(num_ps) cluster_template['worker'] = range(num_ps, num_executors) + logging.info("worker node range %s, ps node range %s" % (cluster_template['worker'], cluster_template['ps'])) # get default filesystem from spark defaultFS = sc._jsc.hadoopConfiguration().get("fs.defaultFS") @@ -234,7 +237,25 @@ def run(sc, map_fun, tf_args, num_executors, num_ps, tensorboard=False, input_mo 'working_dir': working_dir, 'server_addr': server_addr } - nodeRDD = sc.parallelize(range(num_executors), num_executors) + if driver_ps_nodes: + nodeRDD = sc.parallelize(range(num_ps, num_executors), num_executors - num_ps) + else: + nodeRDD = sc.parallelize(range(num_executors), num_executors) + + if driver_ps_nodes: + def _start_ps(node_index): + logging.info("starting ps node locally %d" % node_index) + TFSparkNode.run(map_fun, + tf_args, + cluster_meta, + tensorboard, + log_dir, + queues, + background=(input_mode == InputMode.SPARK))([node_index]) + for i in cluster_template['ps']: + ps_thread = threading.Thread(target=lambda: _start_ps(i)) + ps_thread.daemon = True + ps_thread.start() # start TF on a background thread (on Spark driver) to allow for feeding job def _start(): @@ -244,7 +265,7 @@ def _start(): tensorboard, log_dir, queues, - (input_mode == InputMode.SPARK))) + background=(input_mode == InputMode.SPARK))) t = threading.Thread(target=_start) t.start() diff --git a/tensorflowonspark/TFSparkNode.py b/tensorflowonspark/TFSparkNode.py index cea6d458..6e09f187 100644 --- a/tensorflowonspark/TFSparkNode.py +++ b/tensorflowonspark/TFSparkNode.py @@ -270,8 +270,11 @@ def wrapper_fn(args, context): if job_name == 'ps' or background: # invoke the TensorFlow main function in a background thread - logging.info("Starting TensorFlow {0}:{1} on cluster node {2} on background process".format(job_name, task_index, worker_num)) + logging.info("Starting TensorFlow {0}:{1} as {2} on cluster node {3} on background process".format( + job_name, task_index, job_name, worker_num)) p = multiprocessing.Process(target=wrapper_fn, args=(tf_args, ctx)) + if job_name == 'ps': + p.daemon = True p.start() # for ps nodes only, wait indefinitely in foreground thread for a "control" event (None == "stop") diff --git a/tensorflowonspark/pipeline.py b/tensorflowonspark/pipeline.py index 9f90f753..7846ec6a 100755 --- a/tensorflowonspark/pipeline.py +++ b/tensorflowonspark/pipeline.py @@ -101,12 +101,17 @@ def getModelDir(self): class HasNumPS(Params): num_ps = Param(Params._dummy(), "num_ps", "Number of PS nodes in cluster", typeConverter=TypeConverters.toInt) + driver_ps_nodes = Param(Params._dummy(), "driver_ps_nodes", "Run PS nodes on driver locally", typeConverter=TypeConverters.toBoolean) def __init__(self): super(HasNumPS, self).__init__() def setNumPS(self, value): return self._set(num_ps=value) def getNumPS(self): return self.getOrDefault(self.num_ps) + def setDriverPSNodes(self, value): + return self._set(driver_ps_nodes=value) + def getDriverPSNodes(self): + return self.getOrDefault(self.driver_ps_nodes) class HasOutputMapping(Params): output_mapping = Param(Params._dummy(), "output_mapping", "Mapping of output tensor to output DataFrame column", typeConverter=TFTypeConverters.toDict) @@ -276,6 +281,7 @@ def __init__(self, train_fn, tf_args, export_fn=None): self._setDefault(input_mapping={}, cluster_size=1, num_ps=0, + driver_ps_nodes=False, input_mode=TFCluster.InputMode.SPARK, protocol='grpc', tensorboard=False, @@ -319,7 +325,8 @@ def _fit(self, dataset): logging.info("Done saving") tf_args = self.args.argv if self.args.argv else local_args - cluster = TFCluster.run(sc, self.train_fn, tf_args, local_args.cluster_size, local_args.num_ps, local_args.tensorboard, local_args.input_mode) + cluster = TFCluster.run(sc, self.train_fn, tf_args, local_args.cluster_size, local_args.num_ps, + local_args.tensorboard, local_args.input_mode, driver_ps_nodes=local_args.driver_ps_nodes) if local_args.input_mode == TFCluster.InputMode.SPARK: # feed data, using a deterministic order for input columns (lexicographic by key) input_cols = sorted(self.getInputMapping().keys()) From a99a67939b2ac871695e0dcce854daa8d18b1026 Mon Sep 17 00:00:00 2001 From: winstonq Date: Thu, 18 Jan 2018 16:27:19 -0800 Subject: [PATCH 4/6] enable only for TENSORFLOW mode --- examples/mnist/spark/mnist_spark_dataset.py | 4 +--- tensorflowonspark/TFCluster.py | 3 +++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/examples/mnist/spark/mnist_spark_dataset.py b/examples/mnist/spark/mnist_spark_dataset.py index 96452f0e..2192bf00 100755 --- a/examples/mnist/spark/mnist_spark_dataset.py +++ b/examples/mnist/spark/mnist_spark_dataset.py @@ -36,7 +36,6 @@ parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true") parser.add_argument("-X", "--mode", help="train|inference", default="train") parser.add_argument("-c", "--rdma", help="use rdma connection", default=False) -parser.add_argument("-p", "--driver_ps_nodes", help="run tensorflow PS node on driver locally", default=False) args = parser.parse_args() print("args:",args) @@ -60,8 +59,7 @@ def toNumpy(bytestr): print("zipping images and labels") dataRDD = images.zip(labels) -cluster = TFCluster.run(sc, mnist_dist_dataset.map_fun, args, args.cluster_size, num_ps, args.tensorboard, - TFCluster.InputMode.SPARK, driver_ps_nodes=args.driver_ps_nodes) +cluster = TFCluster.run(sc, mnist_dist_dataset.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK) if args.mode == "train": cluster.train(dataRDD, args.epochs) else: diff --git a/tensorflowonspark/TFCluster.py b/tensorflowonspark/TFCluster.py index b06f0188..da4d9124 100644 --- a/tensorflowonspark/TFCluster.py +++ b/tensorflowonspark/TFCluster.py @@ -208,6 +208,9 @@ def run(sc, map_fun, tf_args, num_executors, num_ps, tensorboard=False, input_mo logging.info("Reserving TFSparkNodes {0}".format("w/ TensorBoard" if tensorboard else "")) assert num_ps < num_executors + if driver_ps_nodes and input_mode != InputMode.TENSORFLOW: + raise Exception('running PS nodes on driver locally is only supported in InputMode.TENSORFLOW') + # build a cluster_spec template using worker_nums cluster_template = {} cluster_template['ps'] = range(num_ps) From d5c322076a43fd398a140b7f4547cd71ee3c0016 Mon Sep 17 00:00:00 2001 From: winstonq Date: Thu, 18 Jan 2018 16:29:43 -0800 Subject: [PATCH 5/6] enable only for TENSORFLOW mode --- examples/mnist/spark/mnist_spark_pipeline.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/examples/mnist/spark/mnist_spark_pipeline.py b/examples/mnist/spark/mnist_spark_pipeline.py index 72a40547..4314399a 100644 --- a/examples/mnist/spark/mnist_spark_pipeline.py +++ b/examples/mnist/spark/mnist_spark_pipeline.py @@ -37,7 +37,6 @@ parser.add_argument("--export_dir", help="HDFS path to export model", type=str) parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors) parser.add_argument("--num_ps", help="number of PS nodes in cluster", type=int, default=1) -parser.add_argument("--driver_ps_nodes", help="run tensorflow PS node on driver locally", default=False) parser.add_argument("--protocol", help="Tensorflow network protocol (grpc|rdma)", default="grpc") parser.add_argument("--steps", help="maximum number of steps", type=int, default=1000) parser.add_argument("--tensorboard", help="launch tensorboard process", action="store_true") @@ -81,7 +80,6 @@ .setExportDir(args.export_dir) \ .setClusterSize(args.cluster_size) \ .setNumPS(args.num_ps) \ - .setDriverPSNodes(args.driver_ps_nodes) \ .setProtocol(args.protocol) \ .setTensorboard(args.tensorboard) \ .setEpochs(args.epochs) \ From 8e690a192f17008d16045479af74d75525548f1f Mon Sep 17 00:00:00 2001 From: winstonq Date: Thu, 25 Jan 2018 11:17:19 -0800 Subject: [PATCH 6/6] add usage comment for driver_ps_nodes --- examples/mnist/tf/mnist_spark_dataset.py | 3 ++- examples/mnist/tf/mnist_spark_pipeline.py | 3 ++- tensorflowonspark/TFCluster.py | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/examples/mnist/tf/mnist_spark_dataset.py b/examples/mnist/tf/mnist_spark_dataset.py index c1156b8a..6dd3cda4 100644 --- a/examples/mnist/tf/mnist_spark_dataset.py +++ b/examples/mnist/tf/mnist_spark_dataset.py @@ -34,7 +34,8 @@ parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true") parser.add_argument("-X", "--mode", help="train|inference", default="train") parser.add_argument("-c", "--rdma", help="use rdma connection", default=False) -parser.add_argument("-p", "--driver_ps_nodes", help="run tensorflow PS node on driver locally", default=False) +parser.add_argument("-p", "--driver_ps_nodes", help="""run tensorflow PS node on driver locally. + You will need to set cluster_size = num_executors + num_ps""", default=False) args = parser.parse_args() print("args:",args) diff --git a/examples/mnist/tf/mnist_spark_pipeline.py b/examples/mnist/tf/mnist_spark_pipeline.py index 7884111d..558d627d 100644 --- a/examples/mnist/tf/mnist_spark_pipeline.py +++ b/examples/mnist/tf/mnist_spark_pipeline.py @@ -38,7 +38,8 @@ parser.add_argument("--tfrecord_dir", help="HDFS path to temporarily save DataFrame to disk", type=str) parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors) parser.add_argument("--num_ps", help="number of PS nodes in cluster", type=int, default=1) -parser.add_argument("--driver_ps_nodes", help="run tensorflow PS node on driver locally", default=False) +parser.add_argument("-p", "--driver_ps_nodes", help="""run tensorflow PS node on driver locally. + You will need to set cluster_size = num_executors + num_ps""", default=False) parser.add_argument("--protocol", help="Tensorflow network protocol (grpc|rdma)", default="grpc") parser.add_argument("--readers", help="number of reader/enqueue threads", type=int, default=1) parser.add_argument("--steps", help="maximum number of steps", type=int, default=1000) diff --git a/tensorflowonspark/TFCluster.py b/tensorflowonspark/TFCluster.py index da4d9124..382da4c6 100644 --- a/tensorflowonspark/TFCluster.py +++ b/tensorflowonspark/TFCluster.py @@ -199,7 +199,7 @@ def run(sc, map_fun, tf_args, num_executors, num_ps, tensorboard=False, input_mo :tensorboard: boolean indicating if the chief worker should spawn a Tensorboard server. :input_mode: TFCluster.InputMode :log_dir: directory to save tensorboard event logs. If None, defaults to a fixed path on local filesystem. - :driver_ps_nodes: run the PS nodes on the driver locally instead of on the spark executors; this help maximizing computing resources (esp. GPU). + :driver_ps_nodes: run the PS nodes on the driver locally instead of on the spark executors; this help maximizing computing resources (esp. GPU). You will need to set cluster_size = num_executors + num_ps :queues: *INTERNAL_USE* Returns: