Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
23016ad
reduce hyperopt verbosity
richardliaw Sep 15, 2018
5c9ed75
Some trainable tweaks
richardliaw Sep 15, 2018
e75e1c8
fix examples
richardliaw Sep 15, 2018
53ce264
fix
richardliaw Sep 15, 2018
5027b2c
get rid of extra output
richardliaw Sep 15, 2018
2c5531c
Merge branch 'master' into tune_other_fixes
richardliaw Sep 26, 2018
08a1167
small fixes
richardliaw Sep 26, 2018
76fb1ee
better error message
richardliaw Sep 26, 2018
f8a2f52
more trainable tweaks
richardliaw Sep 27, 2018
1262b64
in progress custom resources
richardliaw Sep 27, 2018
d6a0e00
finish bookkeepping
richardliaw Sep 27, 2018
2efc404
Merge branch 'master' into custom_resources
richardliaw Jan 18, 2019
fbddb77
fixup
richardliaw Jan 18, 2019
fb23101
revert weird dict change
richardliaw Jan 18, 2019
65bde05
positivity
richardliaw Jan 18, 2019
0242ad9
better names
richardliaw Jan 18, 2019
a89c36e
some tests
richardliaw Jan 18, 2019
19ab952
flake'
richardliaw Jan 18, 2019
31474c9
flake'
richardliaw Jan 21, 2019
623522c
yapf
richardliaw Jan 21, 2019
935cc2c
Merge branch 'master' into custom_resources
richardliaw Jan 28, 2019
b0900d1
fix bug
richardliaw Jan 28, 2019
ef3c44b
testsing
richardliaw Jan 28, 2019
9e24190
flake
richardliaw Jan 28, 2019
7045470
test
richardliaw Jan 28, 2019
eafe591
tests
richardliaw Feb 2, 2019
426cda5
add tests
richardliaw Feb 2, 2019
7b0a8ae
fix tests
richardliaw Feb 2, 2019
1f1a9ab
test
richardliaw Feb 2, 2019
ee41d53
Merge branch 'master' into custom_resources
richardliaw Feb 2, 2019
653c5d6
fix_comment
richardliaw Feb 3, 2019
827fcb9
nit
richardliaw Feb 3, 2019
d41a0f5
Merge branch 'master' into custom_resources
richardliaw Feb 3, 2019
c325c31
fix
richardliaw Feb 3, 2019
e21310b
Merge branch 'master' into custom_resources
richardliaw Feb 4, 2019
6d68e7a
fix_test
richardliaw Feb 4, 2019
0792f10
Merge branch 'custom_resources' of github.com:richardliaw/ray into cu…
richardliaw Feb 4, 2019
5d24dab
tests
richardliaw Feb 4, 2019
112210f
Merge branch 'custom_resources' of github.com:richardliaw/ray into cu…
richardliaw Feb 4, 2019
cb0d9cd
fix
richardliaw Feb 4, 2019
ba4298b
Update python/ray/tune/ray_trial_executor.py
ericl Feb 5, 2019
4e88960
flake
richardliaw Feb 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 67 additions & 19 deletions python/ray/tune/ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ def __init__(self, queue_trials=False):
def _setup_runner(self, trial):
cls = ray.remote(
num_cpus=trial.resources.cpu,
num_gpus=trial.resources.gpu)(trial._get_trainable_cls())
num_gpus=trial.resources.gpu,
resources=trial.resources.custom_resources)(
trial._get_trainable_cls())

trial.init_logger()
# We checkpoint metadata here to try mitigating logdir duplication
Expand Down Expand Up @@ -229,16 +231,37 @@ def fetch_result(self, trial):
return result

def _commit_resources(self, resources):
committed = self._committed_resources
all_keys = set(resources.custom_resources).union(
set(committed.custom_resources))

custom_resources = {
k: committed.get(k) + resources.get_res_total(k)
for k in all_keys
}

self._committed_resources = Resources(
self._committed_resources.cpu + resources.cpu_total(),
self._committed_resources.gpu + resources.gpu_total())
committed.cpu + resources.cpu_total(),
committed.gpu + resources.gpu_total(),
custom_resources=custom_resources)

def _return_resources(self, resources):
committed = self._committed_resources

all_keys = set(resources.custom_resources).union(
set(committed.custom_resources))

custom_resources = {
k: committed.get(k) - resources.get_res_total(k)
for k in all_keys
}
self._committed_resources = Resources(
self._committed_resources.cpu - resources.cpu_total(),
self._committed_resources.gpu - resources.gpu_total())
assert self._committed_resources.cpu >= 0
assert self._committed_resources.gpu >= 0
committed.cpu - resources.cpu_total(),
committed.gpu - resources.gpu_total(),
custom_resources=custom_resources)

assert self._committed_resources.is_nonnegative(), (
"Resource invalid: {}".format(resources))

def _update_avail_resources(self, num_retries=5):
for i in range(num_retries):
Expand All @@ -247,28 +270,37 @@ def _update_avail_resources(self, num_retries=5):
logger.warning("Cluster resources not detected. Retrying...")
time.sleep(0.5)

num_cpus = resources["CPU"]
num_gpus = resources["GPU"]
resources = resources.copy()
num_cpus = resources.pop("CPU")
num_gpus = resources.pop("GPU")
custom_resources = resources

self._avail_resources = Resources(int(num_cpus), int(num_gpus))
self._avail_resources = Resources(
int(num_cpus), int(num_gpus), custom_resources=custom_resources)
self._resources_initialized = True

def has_resources(self, resources):
"""Returns whether this runner has at least the specified resources."""
self._update_avail_resources()
cpu_avail = self._avail_resources.cpu - self._committed_resources.cpu
gpu_avail = self._avail_resources.gpu - self._committed_resources.gpu
currently_available = Resources.subtract(self._avail_resources,
self._committed_resources)

have_space = (resources.cpu_total() <= cpu_avail
and resources.gpu_total() <= gpu_avail)
have_space = (
resources.cpu_total() <= currently_available.cpu
and resources.gpu_total() <= currently_available.gpu and all(
resources.get_res_total(res) <= currently_available.get(res)
for res in resources.custom_resources))

if have_space:
return True

can_overcommit = self._queue_trials

if (resources.cpu_total() > 0 and cpu_avail <= 0) or \
(resources.gpu_total() > 0 and gpu_avail <= 0):
if (resources.cpu_total() > 0 and currently_available.cpu <= 0) or \
(resources.gpu_total() > 0 and currently_available.gpu <= 0) or \
any((resources.get_res_total(res_name) > 0
and currently_available.get(res_name) <= 0)
for res_name in resources.custom_resources):
can_overcommit = False # requested resource is already saturated

if can_overcommit:
Expand All @@ -287,18 +319,34 @@ def debug_string(self):
"""Returns a human readable message for printing to the console."""

if self._resources_initialized:
return "Resources requested: {}/{} CPUs, {}/{} GPUs".format(
status = "Resources requested: {}/{} CPUs, {}/{} GPUs".format(
self._committed_resources.cpu, self._avail_resources.cpu,
self._committed_resources.gpu, self._avail_resources.gpu)
customs = ", ".join([
"{}/{} {}".format(
self._committed_resources.get_res_total(name),
self._avail_resources.get_res_total(name), name)
for name in self._avail_resources.custom_resources
])
if customs:
status += " ({})".format(customs)
return status
else:
return "Resources requested: ?"

def resource_string(self):
"""Returns a string describing the total resources available."""

if self._resources_initialized:
return "{} CPUs, {} GPUs".format(self._avail_resources.cpu,
self._avail_resources.gpu)
res_str = "{} CPUs, {} GPUs".format(self._avail_resources.cpu,
self._avail_resources.gpu)
if self._avail_resources.custom_resources:
custom = ", ".join(
"{} {}".format(
self._avail_resources.get_res_total(name), name)
for name in self._avail_resources.custom_resources)
res_str += " ({})".format(custom)
return res_str
else:
return "? CPUs, ? GPUs"

Expand Down
138 changes: 137 additions & 1 deletion python/ray/tune/test/trial_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
from ray.tune.logger import Logger
from ray.tune.util import pin_in_object_store, get_pinned_object
from ray.tune.experiment import Experiment
from ray.tune.trial import Trial, Resources, ExportFormat
from ray.tune.trial import (Trial, ExportFormat, Resources, resources_to_json,
json_to_resources)
from ray.tune.trial_runner import TrialRunner
from ray.tune.suggest import grid_search, BasicVariantGenerator
from ray.tune.suggest.suggestion import (_MockSuggestionAlgorithm,
Expand Down Expand Up @@ -736,6 +737,28 @@ def _train(self):
for trial in trials:
self.assertEqual(trial.status, Trial.TERMINATED)

def testCustomResources(self):
ray.shutdown()
ray.init(resources={"hi": 3})

class train(Trainable):
def _train(self):
return {"timesteps_this_iter": 1, "done": True}

trials = run_experiments({
"foo": {
"run": train,
"resources_per_trial": {
"cpu": 1,
"custom_resources": {
"hi": 2
}
}
}
})
for trial in trials:
self.assertEqual(trial.status, Trial.TERMINATED)

def testCustomLogger(self):
class CustomLogger(Logger):
def on_result(self, result):
Expand Down Expand Up @@ -1083,6 +1106,62 @@ def testExtraResources(self):
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.PENDING)

def testCustomResources(self):
ray.init(num_cpus=4, num_gpus=2, resources={"a": 2})
runner = TrialRunner(BasicVariantGenerator())
kwargs = {
"stopping_criterion": {
"training_iteration": 1
},
"resources": Resources(cpu=1, gpu=0, custom_resources={"a": 2}),
}
trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)]
for t in trials:
runner.add_trial(t)

runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(trials[1].status, Trial.PENDING)

runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.PENDING)

def testExtraCustomResources(self):
ray.init(num_cpus=4, num_gpus=2, resources={"a": 2})
runner = TrialRunner(BasicVariantGenerator())
kwargs = {
"stopping_criterion": {
"training_iteration": 1
},
"resources": Resources(
cpu=1, gpu=0, extra_custom_resources={"a": 2}),
}
trials = [Trial("__fake", **kwargs), Trial("__fake", **kwargs)]
for t in trials:
runner.add_trial(t)

runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(trials[1].status, Trial.PENDING)

runner.step()
self.assertTrue(sum(t.status == Trial.RUNNING for t in trials) < 2)
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.PENDING)

def testCustomResources2(self):
ray.init(num_cpus=4, num_gpus=2, resources={"a": 2})
runner = TrialRunner(BasicVariantGenerator())
resource1 = Resources(cpu=1, gpu=0, extra_custom_resources={"a": 2})
self.assertTrue(runner.has_resources(resource1))
resource2 = Resources(cpu=1, gpu=0, custom_resources={"a": 2})
self.assertTrue(runner.has_resources(resource2))
resource3 = Resources(cpu=1, gpu=0, custom_resources={"a": 3})
self.assertFalse(runner.has_resources(resource3))
resource4 = Resources(cpu=1, gpu=0, extra_custom_resources={"a": 3})
self.assertFalse(runner.has_resources(resource4))

def testFractionalGpus(self):
ray.init(num_cpus=4, num_gpus=1)
runner = TrialRunner(BasicVariantGenerator())
Expand Down Expand Up @@ -1292,6 +1371,7 @@ def testFailureRecoveryNodeRemoval(self):
resource_mock.return_value = {"CPU": 1, "GPU": 1}
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)

runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)

Expand Down Expand Up @@ -1878,5 +1958,61 @@ def _suggest(self, trial_id):
self.assertTrue("d=4" in trial.experiment_tag)


class ResourcesTest(unittest.TestCase):
def testSubtraction(self):
resource_1 = Resources(
1,
0,
0,
1,
custom_resources={
"a": 1,
"b": 2
},
extra_custom_resources={
"a": 1,
"b": 1
})
resource_2 = Resources(
1,
0,
0,
1,
custom_resources={
"a": 1,
"b": 2
},
extra_custom_resources={
"a": 1,
"b": 1
})
new_res = Resources.subtract(resource_1, resource_2)
self.assertTrue(new_res.cpu == 0)
self.assertTrue(new_res.gpu == 0)
self.assertTrue(new_res.extra_cpu == 0)
self.assertTrue(new_res.extra_gpu == 0)
self.assertTrue(all(k == 0 for k in new_res.custom_resources.values()))
self.assertTrue(
all(k == 0 for k in new_res.extra_custom_resources.values()))

def testDifferentResources(self):
resource_1 = Resources(1, 0, 0, 1, custom_resources={"a": 1, "b": 2})
resource_2 = Resources(1, 0, 0, 1, custom_resources={"a": 1, "c": 2})
new_res = Resources.subtract(resource_1, resource_2)
assert "c" in new_res.custom_resources
assert "b" in new_res.custom_resources
self.assertTrue(new_res.cpu == 0)
self.assertTrue(new_res.gpu == 0)
self.assertTrue(new_res.extra_cpu == 0)
self.assertTrue(new_res.extra_gpu == 0)
self.assertTrue(new_res.get("a") == 0)

def testSerialization(self):
original = Resources(1, 0, 0, 1, custom_resources={"a": 1, "b": 2})
jsoned = resources_to_json(original)
new_resource = json_to_resources(jsoned)
self.assertEquals(original, new_resource)


if __name__ == "__main__":
unittest.main(verbosity=2)
Loading