-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added Ray remote backend and Dask distributed preprocessing #1090
Changes from all commits
3b8713d
197314c
6bf7083
dd52d5c
1f228b7
12bfea7
b39d372
5b5fc60
8b2b594
b7f9546
c952130
2c93b60
ea6b4a7
b203fa6
6b3bb08
ef2a314
9a743fe
a630f14
945d56e
2aab9c5
0a0a7c4
3419178
9d13c71
95a7952
22b7538
fd7cbab
b63b316
0941ecd
77a59f9
cab90a1
755c204
a105e41
0cbb582
87f6e4b
5e089d4
4f6c0ba
93cbccd
0e93043
bb33fc0
7e8d3c3
8924ef6
8e95be2
b19ea58
0afade5
aabb582
ac29b9d
e3e7a14
318da2f
1a98f33
f625402
418600a
ed9451b
a3de815
eacfd63
4d8e690
cd70992
6c94f22
cb8bb91
985b5bd
60ad4f4
dff8461
0469097
1922f35
8952e23
5057be6
20351e0
92d64c1
25ab59b
9f92c38
f13fbe5
1963305
bbfe0de
498f788
4d57e18
e0bebc2
4ab92cf
61262ef
5fa2b17
e15e93b
a9bb14b
e461c00
1294a21
dc17f92
1e59b3c
004902f
cbcaaa1
21a3660
f343a64
83c9aad
3cb9185
594ca61
902e8b7
1e65b21
b245055
08bf843
2febe92
e8ed0f5
5b979a3
fe07d3f
273c85d
b10393a
73c5ad1
9fab810
77e1b1f
7c2d65b
1822501
745c96c
ff795a4
8f65656
b87e7bc
f6e3686
15920c1
1094f78
7c324c1
ff598e3
7ea761f
23b4be0
214f718
65f3293
f6a4227
740ed9d
d10e14f
faf0052
0259e07
97139a6
54f1939
eb9fb2e
6dfbce6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
#! /usr/bin/env python | ||
# coding=utf-8 | ||
# Copyright (c) 2020 Uber Technologies, Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# ============================================================================== | ||
|
||
from ludwig.backend.base import Backend, LocalTrainingMixin | ||
from ludwig.constants import NAME | ||
from ludwig.data.dataframe.dask import DaskEngine | ||
|
||
|
||
class DaskBackend(LocalTrainingMixin, Backend): | ||
def __init__(self): | ||
super().__init__() | ||
self._df_engine = DaskEngine() | ||
|
||
def initialize(self): | ||
pass | ||
|
||
@property | ||
def df_engine(self): | ||
return self._df_engine | ||
|
||
@property | ||
def supports_multiprocessing(self): | ||
return False | ||
|
||
def check_lazy_load_supported(self, feature): | ||
raise ValueError(f'DaskBackend does not support lazy loading of data files at train time. ' | ||
f'Set preprocessing config `in_memory: True` for feature {feature[NAME]}') |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,213 @@ | ||
#! /usr/bin/env python | ||
# coding=utf-8 | ||
# Copyright (c) 2020 Uber Technologies, Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# ============================================================================== | ||
|
||
import logging | ||
from collections import defaultdict | ||
|
||
import dask | ||
import ray | ||
from horovod.ray import RayExecutor | ||
from ray.util.dask import ray_dask_get | ||
|
||
from ludwig.backend.base import Backend, RemoteTrainingMixin | ||
from ludwig.constants import NAME | ||
from ludwig.data.dataframe.dask import DaskEngine | ||
from ludwig.models.predictor import BasePredictor, RemotePredictor | ||
from ludwig.models.trainer import BaseTrainer, RemoteTrainer | ||
from ludwig.utils.tf_utils import initialize_tensorflow | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def get_dask_kwargs(): | ||
# TODO ray: select this more intelligently, | ||
# must be greather than or equal to number of Horovod workers | ||
return dict( | ||
parallelism=int(ray.cluster_resources()['CPU']) | ||
) | ||
|
||
|
||
def get_horovod_kwargs(): | ||
# TODO ray: https://github.com/horovod/horovod/issues/2702 | ||
resources = [node['Resources'] for node in ray.state.nodes()] | ||
use_gpu = int(ray.cluster_resources().get('GPU', 0)) > 0 | ||
|
||
# Our goal is to maximize the number of training resources we can | ||
# form into a homogenous configuration. The priority is GPUs, but | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it possible to support non-homogenous configurations? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From Horovod's perspective: definitely. I think we would just need to rework the RayExecutor interface a little. Namely, you could imagine the user just saying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, that sounds good. Let me make an issue on horovod then! |
||
# can fall back to CPUs if there are no GPUs available. | ||
key = 'GPU' if use_gpu else 'CPU' | ||
|
||
# Bucket the per node resources by the number of the target resource | ||
# available on that host (equivalent to number of slots). | ||
buckets = defaultdict(list) | ||
for node_resources in resources: | ||
buckets[int(node_resources.get(key, 0))].append(node_resources) | ||
Comment on lines
+47
to
+59
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think moving forward, we want to move away from the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the preferred alternative? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think if we get HorovodRay to just support Generally, we're trying to move towards a more 'serverless' abstraction where programmers think about 'resources' rather than 'nodes'. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's safe to keep this here for now (at least until we support num_gpus in horovod) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sweet, I'll add a TODO referencing this issue. |
||
|
||
# Maximize for the total number of the target resource = num_slots * num_workers | ||
def get_total_resources(bucket): | ||
slots, resources = bucket | ||
return slots * len(resources) | ||
|
||
best_slots, best_resources = max(buckets.items(), key=get_total_resources) | ||
return dict( | ||
num_slots=best_slots, | ||
num_hosts=len(best_resources), | ||
use_gpu=use_gpu | ||
) | ||
|
||
|
||
class RayRemoteModel: | ||
def __init__(self, model): | ||
self.cls, self.args, state = list(model.__reduce__()) | ||
self.state = ray.put(state) | ||
|
||
def load(self): | ||
obj = self.cls(*self.args) | ||
obj.__setstate__(ray.get(self.state)) | ||
return obj | ||
|
||
|
||
class RayRemoteTrainer(RemoteTrainer): | ||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
|
||
def train(self, *args, **kwargs): | ||
results = super().train(*args, **kwargs) | ||
if results is not None: | ||
model, *stats = results | ||
results = (model.get_weights(), *stats) | ||
return results | ||
|
||
def train_online(self, *args, **kwargs): | ||
results = super().train_online(*args, **kwargs) | ||
if results is not None: | ||
results = results.get_weights() | ||
return results | ||
|
||
|
||
class RayTrainer(BaseTrainer): | ||
def __init__(self, horovod_kwargs, trainer_kwargs): | ||
# TODO ray: make this more configurable by allowing YAML overrides of timeout_s, etc. | ||
setting = RayExecutor.create_settings(timeout_s=30) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we expose more settings here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. I definitely want to make this more configurable via the Ludwig YAML or similar. I think we can do this in a follow-up to allow specifying the backend in a YAML file, so I will add a TODO for now. Does that seem reasonable to you? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah sounds good! |
||
self.executor = RayExecutor(setting, **{**get_horovod_kwargs(), **horovod_kwargs}) | ||
self.executor.start(executable_cls=RayRemoteTrainer, executable_kwargs=trainer_kwargs) | ||
|
||
def train(self, model, *args, **kwargs): | ||
remote_model = RayRemoteModel(model) | ||
results = self.executor.execute( | ||
lambda trainer: trainer.train(remote_model.load(), *args, **kwargs) | ||
) | ||
|
||
weights, *stats = results[0] | ||
model.set_weights(weights) | ||
return (model, *stats) | ||
|
||
def train_online(self, model, *args, **kwargs): | ||
remote_model = RayRemoteModel(model) | ||
results = self.executor.execute( | ||
lambda trainer: trainer.train_online(remote_model.load(), *args, **kwargs) | ||
) | ||
|
||
weights = results[0] | ||
model.set_weights(weights) | ||
return model | ||
|
||
@property | ||
def validation_field(self): | ||
return self.executor.execute_single(lambda trainer: trainer.validation_field) | ||
|
||
@property | ||
def validation_metric(self): | ||
return self.executor.execute_single(lambda trainer: trainer.validation_metric) | ||
|
||
def shutdown(self): | ||
self.executor.shutdown() | ||
|
||
|
||
class RayPredictor(BasePredictor): | ||
def __init__(self, horovod_kwargs, predictor_kwargs): | ||
# TODO ray: investigate using Dask for prediction instead of Horovod | ||
setting = RayExecutor.create_settings(timeout_s=30) | ||
self.executor = RayExecutor(setting, **{**get_horovod_kwargs(), **horovod_kwargs}) | ||
self.executor.start(executable_cls=RemotePredictor, executable_kwargs=predictor_kwargs) | ||
|
||
def batch_predict(self, model, *args, **kwargs): | ||
model = RayRemoteModel(model) | ||
results = self.executor.execute( | ||
lambda predictor: predictor.batch_predict(model.load(), *args, **kwargs) | ||
) | ||
return results[0] | ||
|
||
def batch_evaluation(self, model, *args, **kwargs): | ||
model = RayRemoteModel(model) | ||
results = self.executor.execute( | ||
lambda predictor: predictor.batch_evaluation(model.load(), *args, **kwargs) | ||
) | ||
return results[0] | ||
|
||
def batch_collect_activations(self, model, *args, **kwargs): | ||
model = RayRemoteModel(model) | ||
return self.executor.execute_single( | ||
lambda predictor: predictor.batch_collect_activations(model.load(), *args, **kwargs) | ||
) | ||
|
||
def shutdown(self): | ||
self.executor.shutdown() | ||
|
||
|
||
class RayBackend(RemoteTrainingMixin, Backend): | ||
def __init__(self, horovod_kwargs=None): | ||
super().__init__() | ||
self._df_engine = DaskEngine() | ||
tgaddair marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._horovod_kwargs = horovod_kwargs or {} | ||
self._tensorflow_kwargs = {} | ||
|
||
def initialize(self): | ||
try: | ||
ray.init('auto', ignore_reinit_error=True) | ||
except ConnectionError: | ||
logger.info('Initializing new Ray cluster...') | ||
ray.init(ignore_reinit_error=True) | ||
|
||
dask.config.set(scheduler=ray_dask_get) | ||
self._df_engine.set_parallelism(**get_dask_kwargs()) | ||
|
||
def initialize_tensorflow(self, **kwargs): | ||
# Make sure we don't claim any GPU resources on the head node | ||
initialize_tensorflow(gpus=-1) | ||
self._tensorflow_kwargs = kwargs | ||
|
||
def create_trainer(self, **kwargs): | ||
executable_kwargs = {**kwargs, **self._tensorflow_kwargs} | ||
return RayTrainer(self._horovod_kwargs, executable_kwargs) | ||
|
||
def create_predictor(self, **kwargs): | ||
executable_kwargs = {**kwargs, **self._tensorflow_kwargs} | ||
return RayPredictor(self._horovod_kwargs, executable_kwargs) | ||
|
||
@property | ||
def df_engine(self): | ||
return self._df_engine | ||
|
||
@property | ||
def supports_multiprocessing(self): | ||
return False | ||
|
||
def check_lazy_load_supported(self, feature): | ||
raise ValueError(f'RayBackend does not support lazy loading of data files at train time. ' | ||
f'Set preprocessing config `in_memory: True` for feature {feature[NAME]}') |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
#! /usr/bin/env python | ||
# coding=utf-8 | ||
# Copyright (c) 2020 Uber Technologies, Inc. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# ============================================================================== | ||
from ludwig.data.batcher.base import Batcher | ||
|
||
|
||
class IterableBatcher(Batcher): | ||
def __init__(self, | ||
dataset, | ||
data, | ||
steps_per_epoch, | ||
ignore_last=False): | ||
self.dataset = dataset | ||
self.data = data | ||
self.data_it = iter(data) | ||
|
||
self.ignore_last = ignore_last | ||
self.steps_per_epoch = steps_per_epoch | ||
self.step = 0 | ||
|
||
def next_batch(self): | ||
if self.last_batch(): | ||
raise StopIteration() | ||
|
||
sub_batch = {} | ||
batch = next(self.data_it) | ||
for features_name in self.dataset.features: | ||
sub_batch[features_name] = self.dataset.get( | ||
features_name, | ||
batch | ||
) | ||
|
||
self.step += 1 | ||
return sub_batch | ||
|
||
def last_batch(self): | ||
return self.step >= self.steps_per_epoch or ( | ||
self.ignore_last and | ||
self.step + 1 >= self.steps_per_epoch) | ||
|
||
def set_epoch(self, epoch): | ||
self.step = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clarkzinzow does this make sense as the default
repartition
value? One partition per CPU? Not sure if there's a more reasonable heuristic for this. The one restriction we have is that for Petastorm, we must have at least one row group per Horovod worker, and the safest way to guarantee this at the moment is to repartition the dataframe.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the typical heuristic, yes, under the soft constraint of those chunks/partitions fitting nicely into each worker's memory.