From d4cf9cc23ffb71b66129795ae4fc5bbe9ca39dec Mon Sep 17 00:00:00 2001 From: Lee Yang Date: Tue, 18 Feb 2020 15:25:00 -0800 Subject: [PATCH] revised gpu allocation code --- tensorflowonspark/TFCluster.py | 3 ++ tensorflowonspark/TFSparkNode.py | 93 ++++++++++++++++++++------------ tensorflowonspark/gpu_info.py | 36 ++++--------- 3 files changed, 72 insertions(+), 60 deletions(-) diff --git a/tensorflowonspark/TFCluster.py b/tensorflowonspark/TFCluster.py index 7ee3a3d9..2a375f90 100755 --- a/tensorflowonspark/TFCluster.py +++ b/tensorflowonspark/TFCluster.py @@ -201,6 +201,9 @@ def timeout_handler(signum, frame): if len(jobs) == 0: break + # stop reservation server + self.server.stop() + def tensorboard_url(self): """Utility function to get the Tensorboard URL""" for node in self.cluster_info: diff --git a/tensorflowonspark/TFSparkNode.py b/tensorflowonspark/TFSparkNode.py index 9137fb99..88406a2c 100644 --- a/tensorflowonspark/TFSparkNode.py +++ b/tensorflowonspark/TFSparkNode.py @@ -146,11 +146,65 @@ def _mapfn(iter): executor_id = i # check that there are enough available GPUs (if using tensorflow-gpu) before committing reservation on this node - # note: for Spark 3+ w/ GPU allocation, the required number of GPUs should be guaranteed by the resource manager - if version.parse(pyspark.__version__).base_version < version.parse('3.0.0').base_version: - if gpu_info.is_gpu_available(): - num_gpus = tf_args.num_gpus if 'num_gpus' in tf_args else 1 - gpus_to_use = gpu_info.get_gpus(num_gpus) + def _get_gpus(cluster_spec=None): + gpus = [] + is_k8s = 'SPARK_EXECUTOR_POD_IP' in os.environ + + # handle explicitly configured tf_args.num_gpus + if 'num_gpus' in tf_args: + requested_gpus = tf_args.num_gpus + user_requested = True + else: + requested_gpus = 0 + user_requested = False + + # first, try Spark 3 resources API, returning all visible GPUs + # note: num_gpus arg is only used (if supplied) to limit/truncate visible devices + if version.parse(pyspark.__version__).base_version >= version.parse("3.0.0").base_version: + from pyspark import TaskContext + context = TaskContext() + if 'gpu' in context.resources(): + # get all GPUs assigned by resource manager + gpus = context.resources()['gpu'].addresses + logger.info("Spark gpu resources: {}".format(gpus)) + if user_requested: + if requested_gpus < len(gpus): + # override/truncate list, if explicitly configured + logger.warn("Requested {} GPU(s), but {} available".format(requested_gpus, len(gpus))) + gpus = gpus[:requested_gpus] + else: + # implicitly requested by Spark 3 + requested_gpus = len(gpus) + + # if not in K8s pod and GPUs available, just use original allocation code (defaulting to 1 GPU if available) + # Note: for K8s, there is a bug with the Nvidia device_plugin which can show GPUs for non-GPU pods that are hosted on GPU nodes + if not is_k8s and gpu_info.is_gpu_available() and not gpus: + # default to one GPU if not specified explicitly + requested_gpus = max(1, requested_gpus) if not user_requested else requested_gpus + if requested_gpus > 0: + if cluster_spec: + # compute my index relative to other nodes on the same host (for GPU allocation) + my_addr = cluster_spec[job_name][task_index] + my_host = my_addr.split(':')[0] + flattened = [v for sublist in cluster_spec.values() for v in sublist] + local_peers = [p for p in flattened if p.startswith(my_host)] + my_index = local_peers.index(my_addr) + else: + my_index = 0 + + # try to allocate a GPU + gpus = gpu_info.get_gpus(requested_gpus, my_index, format=gpu_info.AS_LIST) + + if user_requested and len(gpus) < requested_gpus: + raise Exception("Unable to allocate {} GPU(s) from available GPUs: {}".format(requested_gpus, gpus)) + + gpus_to_use = ','.join(gpus) + if gpus: + logger.info("Requested {} GPU(s), setting CUDA_VISIBLE_DEVICES={}".format(requested_gpus if user_requested else len(gpus), gpus_to_use)) + os.environ['CUDA_VISIBLE_DEVICES'] = gpus_to_use + + # try GPU allocation at executor startup so we can try to fail out if unsuccessful + _get_gpus() # assign TF job/task based on provided cluster_spec template (or use default/null values) job_name = 'default' @@ -309,34 +363,7 @@ def _mapfn(iter): # reserve GPU(s) again, just before launching TF process (in case situation has changed) # and setup CUDA_VISIBLE_DEVICES accordingly - if gpu_info.is_gpu_available(): - - gpus_to_use = None - # For Spark 3+, try to get GPU resources from TaskContext first - if version.parse(pyspark.__version__).base_version >= version.parse("3.0.0").base_version: - from pyspark import TaskContext - context = TaskContext() - if 'gpu' in context.resources(): - # use ALL GPUs assigned by resource manager - gpus = context.resources()['gpu'].addresses - num_gpus = len(gpus) - gpus_to_use = ','.join(gpus) - - if not gpus_to_use: - # compute my index relative to other nodes on the same host (for GPU allocation) - my_addr = cluster_spec[job_name][task_index] - my_host = my_addr.split(':')[0] - flattened = [v for sublist in cluster_spec.values() for v in sublist] - local_peers = [p for p in flattened if p.startswith(my_host)] - my_index = local_peers.index(my_addr) - - # default to one GPU if not specified explicitly - num_gpus = tf_args.num_gpus if 'num_gpus' in tf_args else 1 - gpus_to_use = gpu_info.get_gpus(num_gpus, my_index) - - gpu_str = "GPUs" if num_gpus > 1 else "GPU" - logger.info("Requested {} {}, setting CUDA_VISIBLE_DEVICES={}".format(num_gpus, gpu_str, gpus_to_use)) - os.environ['CUDA_VISIBLE_DEVICES'] = gpus_to_use + _get_gpus(cluster_spec=cluster_spec) # create a context object to hold metadata for TF ctx = TFNodeContext(executor_id, job_name, task_index, cluster_spec, cluster_meta['default_fs'], cluster_meta['working_dir'], TFSparkNode.mgr) diff --git a/tensorflowonspark/gpu_info.py b/tensorflowonspark/gpu_info.py index b275c7dd..e5c44842 100644 --- a/tensorflowonspark/gpu_info.py +++ b/tensorflowonspark/gpu_info.py @@ -7,9 +7,7 @@ from __future__ import nested_scopes from __future__ import print_function -import ctypes as ct import logging -import platform import random import subprocess import time @@ -17,29 +15,8 @@ logger = logging.getLogger(__name__) MAX_RETRIES = 3 #: Maximum retries to allocate GPUs - - -def _get_gpu(): - """*DEPRECATED*. Allocates first available GPU using cudaSetDevice(), or returns 0 otherwise.""" - # Note: this code executes, but Tensorflow subsequently complains that the "current context was not created by the StreamExecutor cuda_driver API" - system = platform.system() - if system == "Linux": - libcudart = ct.cdll.LoadLibrary("libcudart.so") - elif system == "Darwin": - libcudart = ct.cdll.LoadLibrary("libcudart.dylib") - elif system == "Windows": - libcudart = ct.windll.LoadLibrary("libcudart.dll") - else: - raise NotImplementedError("Cannot identify system.") - - device_count = ct.c_int() - libcudart.cudaGetDeviceCount(ct.byref(device_count)) - gpu = 0 - for i in range(device_count.value): - if (0 == libcudart.cudaSetDevice(i) and 0 == libcudart.cudaFree(0)): - gpu = i - break - return gpu +AS_STRING = 'string' +AS_LIST = 'list' def is_gpu_available(): @@ -51,7 +28,7 @@ def is_gpu_available(): return False -def get_gpus(num_gpu=1, worker_index=-1): +def get_gpus(num_gpu=1, worker_index=-1, format=AS_STRING): """Get list of free GPUs according to nvidia-smi. This will retry for ``MAX_RETRIES`` times until the requested number of GPUs are available. @@ -112,7 +89,12 @@ def parse_gpu(gpu_str): proposed_gpus = free_gpus[worker_index * num_gpu:(worker_index * num_gpu + num_gpu)] logger.info("Proposed GPUs: {}".format(proposed_gpus)) - return ','.join(str(x) for x in proposed_gpus) + if format == AS_STRING: + return ','.join(str(x) for x in proposed_gpus) + elif format == AS_LIST: + return proposed_gpus + else: + raise Exception("Unknown GPU format") # Function to get the gpu information