New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Contribute spark-tensorflow-distributor to the ecosystem #154
Conversation
cc @yuefengz can you help review this PR? |
@yuefengz ping :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haven't finished a full pass yet
RUN apt-get install -y python${PYTHON_INSTALL_VERSION} python${PYTHON_INSTALL_VERSION}-dev python${PYTHON_INSTALL_VERSION}-distutils && \ | ||
apt-get clean && \ | ||
wget https://bootstrap.pypa.io/get-pip.py && \ | ||
python$PYTHON_INSTALL_VERSION get-pip.py && \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we need different python versions, we should consider miniconda.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
conda create -n my_env python=3.6
conda env update -n my_env -f environment.yml # do not specify python version in env.yml
""" | ||
if gpu_resource_name in resources: | ||
addresses = resources[gpu_resource_name].addresses | ||
pattern = re.compile('^[1-9][0-9]*|0$') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it simpler to try 'int(..)' and throw value error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int
will allow allow zero padding which I'd prefer to disallow
spark/spark-tensorflow-distributor/spark_tensorflow_distributor/mirrored_strategy_runner.py
Show resolved
Hide resolved
spark/spark-tensorflow-distributor/spark_tensorflow_distributor/mirrored_strategy_runner.py
Outdated
Show resolved
Hide resolved
@@ -0,0 +1,331 @@ | |||
import logging |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible for you to follow the Google Python style guide? http://google.github.io/styleguide/pyguide.html
Generally speaking, that guides recommends to use pylint, break lines at 80 chars, use a different form of docstrings etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used the google yapf formatter to format to google style so hopefully most of the major formatting issues are out of the way. Added yapf + pylint check to ci too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some suggestions but up to you:
would be nicer to break lines at column 80: http://google.github.io/styleguide/pyguide.html#32-line-length
docstring of function arguments should follow this format: http://google.github.io/styleguide/pyguide.html#doc-function-args
|
||
.. note:: See more at https://www.tensorflow.org/guide/distributed_training | ||
""" | ||
def __init__(self, num_slots, gpu_resource_name='gpu', use_custom_strategy=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: would breaking it into two or more argument make it clearer? such as num_gpus
and num_workers
or num_replicas
and local
. In tf.distribute, we use the term "replica" quite often.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We preferred to stick to a single scaling parameter to keep the simplicity of swapping between CPU and GPU workflows. Few other names for the param: scale
, n
. Agree with adding a local=False
or local_mode=False
parameter, will add that.
|
||
.. note:: See more at https://www.tensorflow.org/guide/distributed_training | ||
""" | ||
def __init__(self, num_slots, gpu_resource_name='gpu', use_custom_strategy=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: would changing it to "use_gpu=True" make it clearer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would have to split it into 2 parameters because we can't assume the the gpu resource name in the spark conf is 'gpu', but I think that makes sense for the sake of clarity 👍
one for the user and wraps the training function in the strategy context, allowing the user to provide | ||
non-distributed TensorFlow code that is executed as distributed code. | ||
|
||
Example with use_custom_strategy=True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can tell whether users do it in the wrong way (use_custom_strategy=False but create a strategy) and throw an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nested tensorflow strategy scopes raise an exception by default - are you suggesting catching and re-raising a more informative exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I think that is good enough.
the Spark configuration. Note also that for GPU training, num_slots will limit the number of GPUs used | ||
for training even if more are available, so that exactly num_slots GPUs are used in total. Spark does not | ||
restrict CPU cores for tasks and so for CPU training, num_slots rarely needs to be greater than the | ||
number of workers and for local mode set num_slots=-1. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
num_slots rarely needs to be greater than the number of workers
Does that mean in some cases, the number of available workers can be less thannum_slots
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, since num_slots
represent the number of spark task slots in CPU training, a user might want to say run two TensorFlow training workers and each Spark worker.
import tensorflow as tf | ||
# training code | ||
""" | ||
self.logger = _get_logger(self.__class__.__name__) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For private fields, we usually hide them by prepending a "_" in their names.
'please contact your cluster administrator.' | ||
f'The conf `{key}` was not found in the Spark configuration.' | ||
) | ||
task_gpu_amount = int(self.sc.getConf().get(key)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it always True that in a spark cluster all workers have the same number of GPUs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, but Spark 3.0's resource aware scheduling (used here) guarantees that each Spark task is allocated the same number of GPUs. This value is specified by the cluster admin in the Spark conf spark.task.resource.gpu.amount
and the task program can find which GPUs it is allocated by Spark with the BarrierTaskContext.resources()
method.
def set_gpus(context): | ||
gpus_owned = get_gpus_owned(context.resources(), gpu_resource_name) | ||
my_num_gpus = (num_slots // num_tasks) + (context.partitionId() < (num_slots % num_tasks)) | ||
gpu_addresses = [str(e) for e in random.sample(gpus_owned, my_num_gpus)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I assume that all GPUs on a machine will be taken except for the last machine? The reason I ask is randomly chosen GPUs may not have NVLinks in between.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good point, we can't. For example, if we do GPU training with 6 GPUs (num_slots=6
) on a 2 worker cluster, each with 4 GPUs, then the training will be done with 3 GPUs on each worker, rather than 4 GPUs on one worker and 2 GPUs on the other. Currently Spark's resource scheduling doesn't support resource groups, so I'd suggest leaving this as a TODO until we can use that Spark feature to report the NVLink groups to Spark.
@jhseu What is your recommended CI solution? This PR uses github workflow. Is it okay? |
spark/spark-tensorflow-distributor/tests/integration/test_mirrored_strategy_runner.py
Outdated
Show resolved
Hide resolved
spark/spark-tensorflow-distributor/tests/integration/test_mirrored_strategy_runner.py
Outdated
Show resolved
Hide resolved
spark/spark-tensorflow-distributor/tests/integration/test_mirrored_strategy_runner.py
Show resolved
Hide resolved
return os.environ['CUDA_VISIBLE_DEVICES'] | ||
|
||
with pytest.raises(Exception): | ||
MirroredStrategyRunner(num_slots=2, gpu_resource_name='gpu').run(train_fn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should also verify users can set a conf to ignore it.
spark/spark-tensorflow-distributor/tests/integration/test_mirrored_strategy_runner.py
Show resolved
Hide resolved
k, v = l.split(None, 1) | ||
conf[k] = v | ||
|
||
with open('tests/integration/spark_conf/spark-defaults.conf', 'w') as f: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: the names "base", "custom", "defaults" are a little confusing
spark/spark-tensorflow-distributor/spark_tensorflow_distributor/mirrored_strategy_runner.py
Outdated
Show resolved
Hide resolved
For CI, GitHub workflow is fine for now, I think. We haven't setup an alternative for this repo. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR looks good to me overall though I have some small suggestions and questions. Thank you!
one for the user and wraps the training function in the strategy context, allowing the user to provide | ||
non-distributed TensorFlow code that is executed as distributed code. | ||
|
||
Example with use_custom_strategy=True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I think that is good enough.
'please contact your cluster administrator.' | ||
f'The conf `{key}` was not found in the Spark configuration.' | ||
) | ||
task_gpu_amount = int(self.sc.getConf().get(key)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can call _get_gpus_owned
here? Looks like there are two different ways to get the number of GPUs. Are they different? If not, could you consolidate them?
@staticmethod | ||
def _get_gpus_owned(resources, gpu_resource_name): | ||
""" | ||
Gets the number of GPUs that Spark scheduled to the calling task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this method is returning "the number of GPUs"?
@@ -0,0 +1,331 @@ | |||
import logging |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some suggestions but up to you:
would be nicer to break lines at column 80: http://google.github.io/styleguide/pyguide.html#32-line-length
docstring of function arguments should follow this format: http://google.github.io/styleguide/pyguide.html#doc-function-args
@@ -0,0 +1,53 @@ | |||
import argparse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious what this file does? Would you mind adding some description in the beginning of this file?
Made a release here: https://pypi.org/project/spark-tensorflow-distributor/0.0.3/ |
@guptapriya @yuefengz @jhseu also please let me know who from TensorFlow I should add as an owner on the PyPi project :) |
This PR aims to act as both a proposal and an initial version for a contribution of the spark-tensorflow-distributor python package. As mentioned in #151 the general mandate of this package is to make it easier for users to do distributed training with TensorFlow 2 on their Spark clusters. Currently this package primarily acts as a job launcher for starting TensorFlow servers, configuring GPU and CPU resources for the user based on Spark resource scheduling so that they may easily run their deep learning workloads.
This PR also includes CI with GitHub workflows, which acts at the repository level by default. However, the CI is set up so that the checks will only be triggered by changes to this package's subdirectory in the ecosystem. This behavior is described in
.github/workflows/spark-tensorflow-distributor.yml
.I'd also like to publish this package to PyPi and am wondering if there's an ecosystem specific process for that.
Welcoming any and all feedback on this PR :)
cc @guptapriya @mengxr @husseinnagr-db