-
Notifications
You must be signed in to change notification settings - Fork 25.8k
Jaliyae/chunk distributed #16624
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Jaliyae/chunk distributed #16624
Conversation
apaszke
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ChunkSelectors are Samplers that have an independent class hierarchy. I don't think they serve any different purpose, so I'd rather avoid adding them unnecessarily.
Just for the random sampling, we need all these versions. RandomSampler, ThreadSafeRandomSampler, ThreadSafeDistributedRandomSampler, Similar for sequential, Instead, this is very specific to chunk loading and we can keep any new sampler not requiring to add different flavors because we can use any of them for shuffling data within a chunk. So we still use the regular sampler for selecting examples, this is only for selecting chunks. In addition, since the Sampler base class is templated, we needed another layer of wrapping for pybind, this eliminate that need too. As always, we can change towards a better solution, currently I don't see one with the existing sampler that is clean like this. |
| size_t cache_size = 2048) | ||
| : preloader_count_(preloader_count), | ||
| batch_size_(batch_size), | ||
| : batch_size_(batch_size), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, is there a specific reason we want to switch the order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think only batch_size is enough to create a chunk data set, the preloader_count and cache_size can have default values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense.
|
The new update removed the chunk selectors and added distributed samplers under samplers folder. I also removed the atomic constraint on the distributed sampler and pushed that to chunk dataset. |
apaszke
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a complete review, will take a look later
| /// Return the number of samples available for this instance. This is | ||
| /// different to the `size` as each sampler needs to handle only a subset of | ||
| /// samples. | ||
| TORCH_API size_t local_sample_count(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm why do we need this? Samplers as of today have no way to retrieve their length, while this adds this to the API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a long explanation, but let me try. In the main dataloader, the enumeration or the next() method is called by a single thread, i.e. the application. However, for the chunk data set, the next() is called by all the dataloader threads, so we needed a way to know how many chunks to load so that we can stop waiting for next() batches. This is the chunk count. When running in distributed mode, we need to know the current partition size. That is the local sample count. Splitting this definition between dataset and sampler is ugly, so I added this method to distributed sampler.
| size_t num_replicas_; | ||
| size_t rank_; | ||
| size_t epoch_; | ||
| size_t local_sample_count_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't seem like size_ has to be part of this class. local_sample_count_ can be computed with a single division operation, so it seems very wasteful to pay 8B for caching it. Let's make base classes lean unless we have values that are expensive to compute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change will do these
instead of local_sample_count_, the class need to add allow_duplicate_ as a member variable
instead of a member variable, all calculations will use a method call which has integer divisions.
Accessing the count is not on the critical method (next()) so not sure what we are optimizing here.
| } | ||
|
|
||
| size_t DistributedSampler::local_sample_count() { | ||
| return local_sample_count_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You even have a method, so just put the logic that computes it in here
| populate_indices(); | ||
| } | ||
| std::minstd_rand rand(epoch_); | ||
| std::shuffle(all_indices_.begin(), all_indices_.end(), rand); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like we could use torch::randperm with a custom generator in here, like we do in Python. That might be faster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at that and not sure unless we really measure this, specially since we need to copy data out of the tensor or use tensor slicing operations.
| } | ||
|
|
||
| void DistributedSequentialSampler::populate_indices() { | ||
| begin_index_ = rank_ * local_sample_count_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I honestly don't get it why do we even keep this thing around. It's a single multiplication that you save at every reset()!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class could have easily be written by keeping only one or two of those variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, for this class we can do the calculation in one place, but I don't want this calculation to repeat in all samplers again and again and hence it is calculated at DistributedSampler, now either remember this or use a method call. As i have mentioned, a method call require knowing whether to duplicate or not.
| ExampleSamplerType example_sampler_; | ||
|
|
||
| // Selects chunks and their order for this reader. | ||
| std::shared_ptr<samplers::DistributedSampler> chunk_sampler_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uuuuuuh that's a very weird specialization. It's not like ChunkDataset is specific to distributed workloads
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the reason for introducing ChunkSelectors originally in the PR. Either, the sampler or ChunkDataset needs to know this is operating under distributed setting. We cannot have the sampler base type here because it does not have all the methods we need. This leads us back to local_chunk_count() method. Let's discuss offline and see if we can resolve this in a better way.
| std::atomic<bool> quit_worker_; | ||
|
|
||
| // mutex to synchronize chunk sampler next() call. | ||
| std::mutex chunk_index_guard_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please separate different changes into different patches? It would be easier to review only distributed samplers and only getting rid of locked sampler. I guess we can leave it as is for now, just a small request for future PRs
|
|
||
| // In distributed training, local chunk count could be different to total | ||
| // chunks availble. Chunk sampler holds the truth. | ||
| size_t chunks_to_load = chunk_sampler_->local_sample_count(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, no, no, this is a clear leakage of abstraction. We should make it such that the dataset stops loading as soon as the sampler runs out of indices. The size is completely unnecessary here.
| /// utterances, e.t.c. | ||
| /// struct that contains a raw unwrapped batch unit. An unwrapped batch unit | ||
| /// is the raw data without 'optional' wrapper. It can be a collection of | ||
| /// images, utterances, e.t.c. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid small formatting changes like this. They greatly increase the size of the PR, which means it takes 2x longer for me to review it, because I need to see every single changed line... If you really dislike your previous formatting, please submit it as a separate patch...
| ChunkReader chunk_reader, | ||
| ChunkSampler chunk_sampler, | ||
| ExampleSampler example_sampler, | ||
| std::shared_ptr<samplers::DistributedSampler> chunk_sampler, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall this be an unique_ptr? If it is a shared_ptr, the user may change the sampler state outside ChunkDataset, which is undesired. With an unique_ptr, after it is moved inside ChunkDataset, the object is uniquely owned by ChunkDataset. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The user needs to call the set_epoch() on the sampler and so need to have a reference. There are several alternatives to this approach. We can move the set_epoch() to StatefulDataloader, or automatically increment the epoch value with the iteration. I followed the python API in pytorch.
|
Hi @jaliyae! Thank you for your pull request. We require contributors to sign our Contributor License Agreement, and yours needs attention. You currently have a record in our system, but we do not have a signature on file. In order for us to review and merge your code, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA. If you have received this in error or have any questions, please contact us at cla@fb.com. Thanks! |
|
Thank you for signing our Contributor License Agreement. We can now accept your code for this (and any) Facebook open source project. Thanks! |
|
Looks like this PR hasn't been updated in a while so we're going to go ahead and mark this as |
|
Looks like this PR hasn't been updated in a while so we're going to go ahead and mark this as |
Add support for running ChunkDataSet with distributed training. Here is a brief description of the change.
ChunkSelector - Introduce to replace the ChunkSampler which was a regular sampler before this change. This also eliminate the need for LockedSampler and this change deletes that code. ChunkSelector performs two tasks. (1) select chunks to load and (2) chunk order in loading. For example, if we have 10 chunks and two distributed processes, each needs to load only 5 chunks. Added two buit-in selectors named RandomChunkSelector and SequentialChunkSelector which order chunks randomly and sequentially respectively.
With this, we can simplify the ChunkDataSet constructor and avoid the LockedSampler behavior. Also, it allows the epoch number to be set by the user for different iterations allowing the chunk selectors to shuffle data using the epoch number as a seed so that all distributed processes can get the same shuffle order.