From 6ca75e0d980ca6fccf586d96270ed404875c1f1c Mon Sep 17 00:00:00 2001 From: Lee Yang Date: Tue, 21 Jan 2020 13:17:45 -0800 Subject: [PATCH] detect TF version w/o importing --- tensorflowonspark/TFNode.py | 8 ++++++-- tensorflowonspark/TFSparkNode.py | 13 ++++++------- tensorflowonspark/gpu_info.py | 9 +++++++++ tensorflowonspark/pipeline.py | 14 +++++++++----- tensorflowonspark/util.py | 4 ++-- test/README.md | 2 +- 6 files changed, 33 insertions(+), 17 deletions(-) diff --git a/tensorflowonspark/TFNode.py b/tensorflowonspark/TFNode.py index 6bbed0bb..d7106712 100644 --- a/tensorflowonspark/TFNode.py +++ b/tensorflowonspark/TFNode.py @@ -16,12 +16,14 @@ import getpass import logging +import pkg_resources from packaging import version from six.moves.queue import Empty from . import compat, marker logger = logging.getLogger(__name__) +TF_VERSION = pkg_resources.get_distribution('tensorflow').version def hdfs_path(ctx, path): @@ -79,11 +81,10 @@ def start_cluster_server(ctx, num_gpus=1, rdma=False): A tuple of (cluster_spec, server) """ import os - import tensorflow as tf import time from . import gpu_info - if version.parse(tf.__version__) >= version.parse("2.0.0"): + if version.parse(TF_VERSION) >= version.parse("2.0.0"): raise Exception("DEPRECATED: Use higher-level APIs like `tf.keras` or `tf.estimator`") logging.info("{0}: ======== {1}:{2} ========".format(ctx.worker_num, ctx.job_name, ctx.task_index)) @@ -115,6 +116,9 @@ def start_cluster_server(ctx, num_gpus=1, rdma=False): # Set GPU device to use for TensorFlow os.environ['CUDA_VISIBLE_DEVICES'] = gpus_to_use + # Import tensorflow after gpu allocation + import tensorflow as tf + # Create a cluster from the parameter server and worker hosts. cluster = tf.train.ClusterSpec(cluster_spec) diff --git a/tensorflowonspark/TFSparkNode.py b/tensorflowonspark/TFSparkNode.py index e188c6e3..97ee4a08 100755 --- a/tensorflowonspark/TFSparkNode.py +++ b/tensorflowonspark/TFSparkNode.py @@ -12,6 +12,7 @@ import logging import multiprocessing import os +import pkg_resources import platform import socket import subprocess @@ -19,17 +20,18 @@ import uuid import time import traceback +from packaging import version from threading import Thread from . import TFManager from . import TFNode -from . import compat from . import gpu_info from . import marker from . import reservation from . import util logger = logging.getLogger(__name__) +TF_VERSION = pkg_resources.get_distribution('tensorflow').version class TFNodeContext: @@ -137,15 +139,12 @@ def run(fn, tf_args, cluster_meta, tensorboard, log_dir, queues, background): A nodeRDD.mapPartitions() function. """ def _mapfn(iter): - import tensorflow as tf - from packaging import version - # Note: consuming the input iterator helps Pyspark re-use this worker, for i in iter: executor_id = i # check that there are enough available GPUs (if using tensorflow-gpu) before committing reservation on this node - if compat.is_gpu_available(): + if gpu_info.is_gpu_available(): num_gpus = tf_args.num_gpus if 'num_gpus' in tf_args else 1 gpus_to_use = gpu_info.get_gpus(num_gpus) @@ -227,7 +226,7 @@ def _mapfn(iter): raise Exception("Unable to find 'tensorboard' in: {}".format(search_path)) # launch tensorboard - if version.parse(tf.__version__) >= version.parse('2.0.0'): + if version.parse(TF_VERSION) >= version.parse('2.0.0'): tb_proc = subprocess.Popen([pypath, tb_path, "--reload_multifile=True", "--logdir=%s" % logdir, "--port=%d" % tb_port], env=os.environ) else: tb_proc = subprocess.Popen([pypath, tb_path, "--logdir=%s" % logdir, "--port=%d" % tb_port], env=os.environ) @@ -296,7 +295,7 @@ def _mapfn(iter): os.environ['TF_CONFIG'] = tf_config # reserve GPU(s) again, just before launching TF process (in case situation has changed) - if compat.is_gpu_available(): + if gpu_info.is_gpu_available(): # compute my index relative to other nodes on the same host (for GPU allocation) my_addr = cluster_spec[job_name][task_index] my_host = my_addr.split(':')[0] diff --git a/tensorflowonspark/gpu_info.py b/tensorflowonspark/gpu_info.py index 69ffdbc1..b275c7dd 100644 --- a/tensorflowonspark/gpu_info.py +++ b/tensorflowonspark/gpu_info.py @@ -42,6 +42,15 @@ def _get_gpu(): return gpu +def is_gpu_available(): + """Determine if GPUs are available on the host""" + try: + subprocess.check_output(["nvidia-smi", "--list-gpus"]) + return True + except Exception: + return False + + def get_gpus(num_gpu=1, worker_index=-1): """Get list of free GPUs according to nvidia-smi. diff --git a/tensorflowonspark/pipeline.py b/tensorflowonspark/pipeline.py index d97fe39e..159bdda4 100755 --- a/tensorflowonspark/pipeline.py +++ b/tensorflowonspark/pipeline.py @@ -23,14 +23,15 @@ import argparse import copy import logging +import pkg_resources import sys -import tensorflow as tf from . import TFCluster, util from packaging import version logger = logging.getLogger(__name__) +TF_VERSION = pkg_resources.get_distribution('tensorflow').version # TensorFlowOnSpark Params @@ -370,7 +371,7 @@ def __init__(self, train_fn, tf_args, export_fn=None): self.train_fn = train_fn self.args = Namespace(tf_args) - master_node = 'chief' if version.parse(tf.__version__) >= version.parse("2.0.0") else None + master_node = 'chief' if version.parse(TF_VERSION) >= version.parse("2.0.0") else None self._setDefault(input_mapping={}, cluster_size=1, num_ps=0, @@ -413,7 +414,7 @@ def _fit(self, dataset): cluster.shutdown(grace_secs=self.getGraceSecs()) if self.export_fn: - if version.parse(tf.__version__) < version.parse("2.0.0"): + if version.parse(TF_VERSION) < version.parse("2.0.0"): # For TF1.x, run export function, if provided assert local_args.export_dir, "Export function requires --export_dir to be set" logging.info("Exporting saved_model (via export_fn) to: {}".format(local_args.export_dir)) @@ -480,7 +481,7 @@ def _transform(self, dataset): tf_args = self.args.argv if self.args.argv else local_args - _run_model = _run_model_tf1 if version.parse(tf.__version__) < version.parse("2.0.0") else _run_model_tf2 + _run_model = _run_model_tf1 if version.parse(TF_VERSION) < version.parse("2.0.0") else _run_model_tf2 rdd_out = dataset.select(input_cols).rdd.mapPartitions(lambda it: _run_model(it, local_args, tf_args)) # convert to a DataFrame-friendly format @@ -516,7 +517,7 @@ def _run_model_tf1(iterator, args, tf_args): output_tensor_names = [tensor for tensor, col in sorted(args.output_mapping.items())] # if using a signature_def_key, get input/output tensor info from the requested signature - if version.parse(tf.__version__) < version.parse("2.0.0") and args.signature_def_key: + if version.parse(TF_VERSION) < version.parse("2.0.0") and args.signature_def_key: assert args.export_dir, "Inferencing with signature_def_key requires --export_dir argument" logging.info("===== loading meta_graph_def for tag_set ({0}) from saved_model: {1}".format(args.tag_set, args.export_dir)) meta_graph_def = get_meta_graph_def(args.export_dir, args.tag_set) @@ -534,6 +535,7 @@ def _run_model_tf1(iterator, args, tf_args): sess = global_sess else: # otherwise, create new session and load graph from disk + import tensorflow as tf tf.reset_default_graph() sess = tf.Session(graph=tf.get_default_graph()) if args.export_dir: @@ -584,6 +586,8 @@ def _run_model_tf2(iterator, args, tf_args): """mapPartitions function (for TF2.x) to run single-node inferencing from a saved_model, using input/output mappings.""" single_node_env(tf_args) + import tensorflow as tf + logger.info("===== input_mapping: {}".format(args.input_mapping)) logger.info("===== output_mapping: {}".format(args.output_mapping)) input_tensor_names = [tensor for col, tensor in sorted(args.input_mapping.items())] diff --git a/tensorflowonspark/util.py b/tensorflowonspark/util.py index 0d6c1dff..0d6c662a 100644 --- a/tensorflowonspark/util.py +++ b/tensorflowonspark/util.py @@ -13,7 +13,7 @@ import subprocess import errno from socket import error as socket_error -from . import compat, gpu_info +from . import gpu_info logger = logging.getLogger(__name__) @@ -28,7 +28,7 @@ def single_node_env(num_gpus=1, worker_index=-1, nodes=[]): os.environ['CLASSPATH'] = classpath + os.pathsep + hadoop_classpath os.environ['TFOS_CLASSPATH_UPDATED'] = '1' - if compat.is_gpu_available() and num_gpus > 0: + if gpu_info.is_gpu_available() and num_gpus > 0: # reserve GPU(s), if requested if worker_index >= 0 and len(nodes) > 0: # compute my index relative to other nodes on the same host, if known diff --git a/test/README.md b/test/README.md index e6d3f4e6..32900cb3 100644 --- a/test/README.md +++ b/test/README.md @@ -16,7 +16,7 @@ Note: the tests that use Spark will require a local Spark Standalone cluster (vs export SPARK_HOME= export TFoS_HOME= export PYTHONPATH=${SPARK_HOME}/python - export SPARK_CLASSPATH= + export SPARK_CLASSPATH=${TFoS_HOME}/lib/tensorflow-hadoop-1.0-SNAPSHOT.jar ``` 2a. Run script to automatically start Spark Standalone cluster, run all tests, and shutdown the cluster, OR ```