-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Perform inference with Dask when using the Ray backend #1128
Conversation
so cool feature |
) | ||
return postprocessed | ||
|
||
# Save any new columns but do not save the original columns again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this two times? Can't we do it only once at the end?
Will likely also remove the need for saved_keys
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least in the original implementation, existing columns were written to numpy before postprocessing, while new columns were written after postprocessing. This change is meant to preserve that behavior, but we can definitely remove the earlier lines if that change is acceptable.
if top_k_col in predictions: | ||
if 'idx2str' in metadata: | ||
predictions[top_k_col] = backend.df_engine.map_objects( | ||
predictions[top_k_col], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not 100% sure of how the content of the top k column looks like, but before each was a list of values, so in line 431 there was a loop for each value, while here it sems like there's only a loop over preds and not inside each pred
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, every row here should be a pred_top_k
. Fixed.
ludwig/backend/ray.py
Outdated
self.metrics.append(metrics) | ||
|
||
def collect(self): | ||
return sum_dicts( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still does not convince me 100%, as most of the metrics summed / averaged across partitions do no make much sense. maybe we can deactivate evaluation also in the ray case, think it through and then reactivate it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, though this implies it is also currently broken in the normal Horovod case as well:
ludwig/ludwig/models/predictor.py
Line 289 in 51e357d
merged_output_metrics = sum_dicts( |
I believe once we adopt the Metrics package for PyTorch that natively supports distributed aggregation this problem will be addressed. Until then, I agree that disabling makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it depends on what we are aggregating, it it's accuracies for instance, this wouldn't be correct, it it's aggregation by summing the counts of correct predictions and all predictions and then later diving correct prediction by all predictions, this would be correct. But I believe we are doing hte formaer.
This PR introduces a new
PartitionedDataset
constructed by the backend when callingcreate_inference_dataset
withinpreprocess_for_prediction
. Unlike other datasets, thePartitionedDataset
runs the batch prediction in parallel on each partition of the distributed DataFrame.A consequence of this rearchitecture is that instead of
Predictor.batch_predict
returning adict
, it now returns a DataFrame (as the dataset could be large enough to not fit into memory). Therefore, all the postprocessing now happens on the DataFrame with a flat structure, instead of the nested dict structure.There a few API limitations to the current implementation:
Another limitation we should address soon is that this implementation does not optimize for GPU inference. Because it runs a separate model replica on each partition, we have no way of preventing multiple model replicas from landing on the same GPU. With standalone Dask, this is possible when configuring your workers (see here). However, this approach doesn't work for Ray at the moment. One possibility is to create workers for each GPU and then use
ray.remote
to send partitions to GPU workers. If we go this route, we need be smart with how we route partitions to model replicas to ensure minimal network overhead. We can do this in a follow-up PR.