Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions tensorflowonspark/TFCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ def timeout_handler(signum, frame):
if len(jobs) == 0:
break

# stop reservation server
self.server.stop()

def tensorboard_url(self):
"""Utility function to get the Tensorboard URL"""
for node in self.cluster_info:
Expand Down
93 changes: 60 additions & 33 deletions tensorflowonspark/TFSparkNode.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,65 @@ def _mapfn(iter):
executor_id = i

# check that there are enough available GPUs (if using tensorflow-gpu) before committing reservation on this node
# note: for Spark 3+ w/ GPU allocation, the required number of GPUs should be guaranteed by the resource manager
if version.parse(pyspark.__version__).base_version < version.parse('3.0.0').base_version:
if gpu_info.is_gpu_available():
num_gpus = tf_args.num_gpus if 'num_gpus' in tf_args else 1
gpus_to_use = gpu_info.get_gpus(num_gpus)
def _get_gpus(cluster_spec=None):
gpus = []
is_k8s = 'SPARK_EXECUTOR_POD_IP' in os.environ

# handle explicitly configured tf_args.num_gpus
if 'num_gpus' in tf_args:
requested_gpus = tf_args.num_gpus
user_requested = True
else:
requested_gpus = 0
user_requested = False

# first, try Spark 3 resources API, returning all visible GPUs
# note: num_gpus arg is only used (if supplied) to limit/truncate visible devices
if version.parse(pyspark.__version__).base_version >= version.parse("3.0.0").base_version:
from pyspark import TaskContext
context = TaskContext()
if 'gpu' in context.resources():
# get all GPUs assigned by resource manager
gpus = context.resources()['gpu'].addresses
logger.info("Spark gpu resources: {}".format(gpus))
if user_requested:
if requested_gpus < len(gpus):
# override/truncate list, if explicitly configured
logger.warn("Requested {} GPU(s), but {} available".format(requested_gpus, len(gpus)))
gpus = gpus[:requested_gpus]
else:
# implicitly requested by Spark 3
requested_gpus = len(gpus)

# if not in K8s pod and GPUs available, just use original allocation code (defaulting to 1 GPU if available)
# Note: for K8s, there is a bug with the Nvidia device_plugin which can show GPUs for non-GPU pods that are hosted on GPU nodes
if not is_k8s and gpu_info.is_gpu_available() and not gpus:
# default to one GPU if not specified explicitly
requested_gpus = max(1, requested_gpus) if not user_requested else requested_gpus
if requested_gpus > 0:
if cluster_spec:
# compute my index relative to other nodes on the same host (for GPU allocation)
my_addr = cluster_spec[job_name][task_index]
my_host = my_addr.split(':')[0]
flattened = [v for sublist in cluster_spec.values() for v in sublist]
local_peers = [p for p in flattened if p.startswith(my_host)]
my_index = local_peers.index(my_addr)
else:
my_index = 0

# try to allocate a GPU
gpus = gpu_info.get_gpus(requested_gpus, my_index, format=gpu_info.AS_LIST)

if user_requested and len(gpus) < requested_gpus:
raise Exception("Unable to allocate {} GPU(s) from available GPUs: {}".format(requested_gpus, gpus))

gpus_to_use = ','.join(gpus)
if gpus:
logger.info("Requested {} GPU(s), setting CUDA_VISIBLE_DEVICES={}".format(requested_gpus if user_requested else len(gpus), gpus_to_use))
os.environ['CUDA_VISIBLE_DEVICES'] = gpus_to_use

# try GPU allocation at executor startup so we can try to fail out if unsuccessful
_get_gpus()

# assign TF job/task based on provided cluster_spec template (or use default/null values)
job_name = 'default'
Expand Down Expand Up @@ -309,34 +363,7 @@ def _mapfn(iter):

# reserve GPU(s) again, just before launching TF process (in case situation has changed)
# and setup CUDA_VISIBLE_DEVICES accordingly
if gpu_info.is_gpu_available():

gpus_to_use = None
# For Spark 3+, try to get GPU resources from TaskContext first
if version.parse(pyspark.__version__).base_version >= version.parse("3.0.0").base_version:
from pyspark import TaskContext
context = TaskContext()
if 'gpu' in context.resources():
# use ALL GPUs assigned by resource manager
gpus = context.resources()['gpu'].addresses
num_gpus = len(gpus)
gpus_to_use = ','.join(gpus)

if not gpus_to_use:
# compute my index relative to other nodes on the same host (for GPU allocation)
my_addr = cluster_spec[job_name][task_index]
my_host = my_addr.split(':')[0]
flattened = [v for sublist in cluster_spec.values() for v in sublist]
local_peers = [p for p in flattened if p.startswith(my_host)]
my_index = local_peers.index(my_addr)

# default to one GPU if not specified explicitly
num_gpus = tf_args.num_gpus if 'num_gpus' in tf_args else 1
gpus_to_use = gpu_info.get_gpus(num_gpus, my_index)

gpu_str = "GPUs" if num_gpus > 1 else "GPU"
logger.info("Requested {} {}, setting CUDA_VISIBLE_DEVICES={}".format(num_gpus, gpu_str, gpus_to_use))
os.environ['CUDA_VISIBLE_DEVICES'] = gpus_to_use
_get_gpus(cluster_spec=cluster_spec)

# create a context object to hold metadata for TF
ctx = TFNodeContext(executor_id, job_name, task_index, cluster_spec, cluster_meta['default_fs'], cluster_meta['working_dir'], TFSparkNode.mgr)
Expand Down
36 changes: 9 additions & 27 deletions tensorflowonspark/gpu_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,16 @@
from __future__ import nested_scopes
from __future__ import print_function

import ctypes as ct
import logging
import platform
import random
import subprocess
import time

logger = logging.getLogger(__name__)

MAX_RETRIES = 3 #: Maximum retries to allocate GPUs


def _get_gpu():
"""*DEPRECATED*. Allocates first available GPU using cudaSetDevice(), or returns 0 otherwise."""
# Note: this code executes, but Tensorflow subsequently complains that the "current context was not created by the StreamExecutor cuda_driver API"
system = platform.system()
if system == "Linux":
libcudart = ct.cdll.LoadLibrary("libcudart.so")
elif system == "Darwin":
libcudart = ct.cdll.LoadLibrary("libcudart.dylib")
elif system == "Windows":
libcudart = ct.windll.LoadLibrary("libcudart.dll")
else:
raise NotImplementedError("Cannot identify system.")

device_count = ct.c_int()
libcudart.cudaGetDeviceCount(ct.byref(device_count))
gpu = 0
for i in range(device_count.value):
if (0 == libcudart.cudaSetDevice(i) and 0 == libcudart.cudaFree(0)):
gpu = i
break
return gpu
AS_STRING = 'string'
AS_LIST = 'list'


def is_gpu_available():
Expand All @@ -51,7 +28,7 @@ def is_gpu_available():
return False


def get_gpus(num_gpu=1, worker_index=-1):
def get_gpus(num_gpu=1, worker_index=-1, format=AS_STRING):
"""Get list of free GPUs according to nvidia-smi.

This will retry for ``MAX_RETRIES`` times until the requested number of GPUs are available.
Expand Down Expand Up @@ -112,7 +89,12 @@ def parse_gpu(gpu_str):
proposed_gpus = free_gpus[worker_index * num_gpu:(worker_index * num_gpu + num_gpu)]
logger.info("Proposed GPUs: {}".format(proposed_gpus))

return ','.join(str(x) for x in proposed_gpus)
if format == AS_STRING:
return ','.join(str(x) for x in proposed_gpus)
elif format == AS_LIST:
return proposed_gpus
else:
raise Exception("Unknown GPU format")


# Function to get the gpu information
Expand Down