Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Ray remote backend and Dask distributed preprocessing #1090

Merged
merged 128 commits into from
Mar 19, 2021
Merged

Conversation

tgaddair
Copy link
Collaborator

@tgaddair tgaddair commented Feb 4, 2021

No description provided.

# TODO ray: select this more intelligently,
# must be greather than or equal to number of Horovod workers
return dict(
parallelism=int(ray.cluster_resources()['CPU'])
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clarkzinzow does this make sense as the default repartition value? One partition per CPU? Not sure if there's a more reasonable heuristic for this. The one restriction we have is that for Petastorm, we must have at least one row group per Horovod worker, and the safest way to guarantee this at the moment is to repartition the dataframe.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the typical heuristic, yes, under the soft constraint of those chunks/partitions fitting nicely into each worker's memory.

Copy link
Collaborator

@w4nderlust w4nderlust left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely amazing job with this! Added couple very minor comments

Comment on lines +75 to +81
features = get_combined_features(config)
for feature in features:
name = feature[NAME]
proc_column = feature[PROC_COLUMN]
reshape = training_set_metadata[name].get('reshape')
if reshape is not None:
dataset[proc_column] = self.map_objects(dataset[proc_column], lambda x: x.reshape(-1))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about this, what is it a work around for? (probably worth adding a comment about it too)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, PyArrow cannot serialize the data as a multi-dimensional tensors, so we need to store the shape so when we read it back, we can reshape it into its correct form. Will add a comment to this effect.

t = getattr(sample, feature_name)
reshape_dim = self.reshape_features.get(feature_name)
if reshape_dim is not None:
t = tf.reshape(t, reshape_dim)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess my previous question had to do with this, right? Tensors with multiple ranks get squashed and then put back in shape, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly, I can add a comment here.

tests/integration_tests/test_ray.py Outdated Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants