-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ray data migration #1260
Ray data migration #1260
Conversation
ludwig/backend/ray.py
Outdated
@@ -176,16 +177,20 @@ def batch_predict(self, model, dataset, *args, **kwargs): | |||
predictor_kwargs = self.predictor_kwargs | |||
output_columns = get_output_columns(model.output_features) | |||
|
|||
def batch_predict_partition(dataset): | |||
def batch_predict_partition(df): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be more efficient if we turn it into a (stateful) actor:
class BatchInferModel:
def __init__(self):
self.model = remote_model.load()
self.predictor = Predictor(**predictor_kwargs)
def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
pd_ds = PandasDataset(df, dataset.features, dataset.data_hdf5_fp)
predictions = self.predictor.batch_predict(self.model, pd_ds, *args, **kwargs)
ordered_predictions = predictions[output_columns]
return ordered_predictions
return dataset.ds.map_batchest(BatchInferModel, ...)
This allows us to cache the model and predictor in memory between calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion, done!
ludwig/backend/ray.py
Outdated
meta=[(c, 'object') for c in output_columns] | ||
|
||
num_gpus = int(ray.cluster_resources().get('GPU', 0) > 0) | ||
return dataset.ds.map_batches( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need to do a conversion to Dask DataFrame here:
dataset.ds.map_batches(...).to_dask()
The other option is to carry through the RayDataset and then only call the methods to save to a particular format in the end. This might be a better longterm solution, but might be too involved for this PR, since the code already assumes Dask DF will be used downstream. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree, I really like the idea of carrying Ray Datasets throughout the prediction and I can do that in a new PR.
ludwig/backend/ray.py
Outdated
@@ -163,6 +166,22 @@ def shutdown(self): | |||
self.executor.shutdown() | |||
|
|||
|
|||
class BatchInferModel: | |||
def __init__(self, remote_model, predictor_kwargs, output_columns, features, data_hdf5_fp, *args, **kwargs): | |||
self.model = remote_model.load() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to define this class in-line because we don't want to load the remote model until we're running in the worker context.
ludwig/data/dataset/tfrecord.py
Outdated
@@ -190,6 +190,7 @@ def create(self, dataset, config, training_set_metadata): | |||
) | |||
|
|||
def create_inference_dataset(self, dataset, tag, config, training_set_metadata): | |||
# TODO(shreya): Confirm if this needs to be updated to RayDatasets too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably we want to remove PartitionedDataset here and just raise an exception like:
raise ValueError('Batch inference not supported with TFRecord format at this time')
ludwig/api.py
Outdated
@@ -872,6 +872,7 @@ def evaluate( | |||
# calculate the overall metrics | |||
if collect_overall_stats: | |||
# TODO ray: support calculating stats on partitioned datasets | |||
# TODO(shreya): Confirm what's needed to enable this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change isinstance
check to RayDataset
cbe12bd
to
06772e3
Compare
41a2b03
to
33fe065
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work figuring out the flatten stuff. Just a couple small things, then we should be good to land.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last thing we need to double check is the optional dependency imports. I added a few comments about how we can workaround that. Let me know if it makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Awesome work.
Code Pull Requests
This PR adds support for Ray Datasets for distributed prediction.
Documentation Pull Requests
Note that the documentation HTML files are in
docs/
while the Markdown sources are inmkdocs/docs
.If you are proposing a modification to the documentation you should change only the Markdown files.
api.md
is automatically generated from the docstrings in the code, so if you want to change something in that file, first modifyludwig/api.py
docstring, then runmkdocs/code_docs_autogen.py
, which will createmkdocs/docs/api.md
.