Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed in-memory map-reduce for data analyzer #5129

Merged
merged 39 commits into from Feb 20, 2024

Conversation

bm-synth
Copy link
Contributor

@bm-synth bm-synth commented Feb 14, 2024

Adds class DistributedDataAnlyzer that implements a map-reduce on distributed memory.

  • instead of writing hundred/thousands of temp files as intermediate storage in the map/reduce as in DataAnalyzer, each node holds disjoints ordered subsets of pairs of (metric, sample id) as a distributed tensor.
  • Also removes the need to specify metric_dtypes as it's automatically inferred by the return of metric_function(data).
  • Removes the need to have a distributed file system where all nodes need to be able to write to: here only rank 0 does the writing.
  • Much faster than the original map-reduce based on writing and loading several temp files into the disk, requires less memory, no temp files, and is simpler.

How does it work

  • for each metric, the only results storage is metric_result as a list of (sample_id, metric_value) tuples.
    • metric_result is converted to a 2D tensor when whole dataset has been iterated.
  • sample_idx_dtype and metric_value_dtype are collected by an all_reduce(op=MIN) and MAX across the metric_result of all nodes.
  • each node holds a metric_result tensor of N samples as N x (metric, sample), sorted by metric, with different metric values across nodes. E.g.:
    • node 1 holds [ [1,20], [1, 30], [2,10], [2, 30]], node 2 holds [ [3,10], [3, 20], [3,15], [3, 40]], and node 3 holds [ [4,20], [4, 30], [5,25], [5, 50]].
    • to convert the list [(metric,sample)] to a dict{ metric = [samples]} each node iterates only its own dataset, as dictionary keys do not overlap across nodes. In this case, node 1 builds { 1: [20, 30], 2: [10,30]}, node 2 builds { 3: [10, 20, 15, 40] }, and node 3 holds { 4: [20, 30], 5: [25, 50]}.
  • To write the merged files: (1) rank 0 opens the file, (2) iteratively receives buffers of values, dict keys and dict values from other ranks and writes them, and (3) closes the file.

Future work

Ideally, one could take this and do the curriculum setup on-the-fly when calling deepspeed initialize, i.e. without writing/loading map-reduce files and without forcing the user to call .map() and .reduce() beforehand. It takes less than 10 seconds so it's totally feasible.

References

  • file_write_ordered() implements a sequential shared write similar to MPI_File_write_ordered. It is however adapted to communicate and write a list of tensors, instead of a single tensor. And it is also adapted to have only rank 0 writing to the file, instead of using a shared pointer.
  • dist_sample_sort() implements a distributed sample sort, as detailed here and illustrated below. The ranges in step 3 guarantee disjoint subsets of keys (metric values) across nodes.

sample_sort

@bm-synth bm-synth changed the title [DRAFT] Distributed data analyzer [DRAFT] Distributed map-reduce in data analyzer Feb 16, 2024
@bm-synth bm-synth changed the title [DRAFT] Distributed map-reduce in data analyzer [DRAFT] Distributed map-reduce for data analyzer Feb 18, 2024
@bm-synth bm-synth changed the title [DRAFT] Distributed map-reduce for data analyzer Distributed map-reduce for data analyzer Feb 18, 2024
@bm-synth bm-synth changed the title Distributed map-reduce for data analyzer Distributed in-memory map-reduce for data analyzer Feb 18, 2024
@bm-synth bm-synth marked this pull request as ready for review February 19, 2024 00:51
@conglongli conglongli self-assigned this Feb 19, 2024
Copy link
Contributor

@conglongli conglongli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bm-synth Currently we don't have enough bandwidth to do a full review of this PR. But given that this is a standalone new feature, I'm approving it for now. There is one CI failed which I'm not sure why.

@bm-synth
Copy link
Contributor Author

bm-synth commented Feb 19, 2024

@conglongli The tests show that all .bin files match, and only {metric_name}_index_to_sample_percentile_merged.idx is identical, ie. {metric}_index_to_metric.idx, {metric}_index_to_sample.idx and mod/{metric}_sample_to_metric.idx differ. I believe it's because they're written in different order and then merged, because the execution using these files is identical to the baseline.

@bm-synth Currently we don't have enough bandwidth to do a full review of this PR.

cc @mrwyattii : ideally the user should only need to edit the config file to enable curriculum: this whole map+reduce should happen behind the scenes when you call deepspeed.initialize. The current map-reduce is infeasible for large datasets: takes too long to run, outputs thousands of tiny files, requires shared storage. It's also infeasible to run several experiments with different curriculum settings (too slow, too many folders/files). Let me know if you need more documentation or a 1-to-1 explanation about this PR, so that we can decide on how to improve this.

@bm-synth
Copy link
Contributor Author

There is one CI failed which I'm not sure why.

@conglongli this is also happening in other PRs, i think it's an issue on your CI.

@conglongli
Copy link
Contributor

There is one CI failed which I'm not sure why.

@conglongli this is also happening in other PRs, i think it's an issue on your CI.

Yeah we will investigate

@loadams
Copy link
Contributor

loadams commented Feb 20, 2024

There is one CI failed which I'm not sure why.

@conglongli this is also happening in other PRs, i think it's an issue on your CI.

Yeah we will investigate

This should be fixed now.

@conglongli conglongli added this pull request to the merge queue Feb 20, 2024
Merged via the queue into microsoft:master with commit e977c7d Feb 20, 2024
12 checks passed
@bm-synth bm-synth deleted the distributed_data_analyzer branch February 21, 2024 12:35
github-merge-queue bot pushed a commit that referenced this pull request Feb 22, 2024
…yzer. (#5169)

Minor improvements of
[#5129.
- Writes all buffers at once to the output file, instead of iteratively
(`indexed_dataset.py`, method `add_items()`).
- Fixes the wrong initialisation of `num_workers` and `worker_id` that
were being ignored when they were provided by the user.

---------

Co-authored-by: Conglong Li <conglong.li@gmail.com>
ShellyNR pushed a commit to ShellyNR/DeepSpeed that referenced this pull request Mar 11, 2024
Adds class `DistributedDataAnlyzer` that implements a map-reduce on
distributed memory.
- instead of writing hundred/thousands of temp files as intermediate
storage in the map/reduce as in `DataAnalyzer`, each node holds
disjoints ordered subsets of pairs of `(metric, sample id)` as a
distributed tensor.
- Also removes the need to specify `metric_dtypes` as it's automatically
inferred by the return of `metric_function(data)`.
- Removes the need to have a distributed file system where all nodes
need to be able to write to: here only rank 0 does the writing.
- Much faster than the original map-reduce based on writing and loading
several temp files into the disk, requires less memory, no temp files,
and is simpler.

## How does it work

- for each metric, the only results storage is `metric_result` as a list
of `(sample_id, metric_value)` tuples.
- `metric_result` is converted to a 2D tensor when whole dataset has
been iterated.
- `sample_idx_dtype` and `metric_value_dtype` are collected by an
`all_reduce(op=MIN)` and `MAX` across the `metric_result` of all nodes.
- each node holds a `metric_result` tensor of `N` samples as `N x
(metric, sample)`, sorted by metric, with different `metric` values
across nodes. E.g.:
- node 1 holds `[ [1,20], [1, 30], [2,10], [2, 30]]`, node 2 holds `[
[3,10], [3, 20], [3,15], [3, 40]]`, and node 3 holds `[ [4,20], [4, 30],
[5,25], [5, 50]]`.
- to convert the list `[(metric,sample)]` to a `dict{ metric =
[samples]}` each node iterates only its own dataset, as dictionary keys
do not overlap across nodes. In this case, node 1 builds `{ 1: [20, 30],
2: [10,30]}`, node 2 builds `{ 3: [10, 20, 15, 40] }`, and node 3 holds
`{ 4: [20, 30], 5: [25, 50]}`.
- To write the merged files: (1) rank 0 opens the file, (2) iteratively
receives buffers of values, dict keys and dict values from other ranks
and writes them, and (3) closes the file.

## Future work

Ideally, one could take this and do the curriculum setup on-the-fly when
calling deepspeed `initialize`, i.e. without writing/loading map-reduce
files and without forcing the user to call `.map()` and `.reduce()`
beforehand. It takes less than 10 seconds so it's totally feasible.

## References

- `file_write_ordered()` implements a sequential shared write similar to
[`MPI_File_write_ordered`](https://www.open-mpi.org/doc/v3.0/man3/MPI_File_write_ordered.3.php).
It is however adapted to communicate and write a list of tensors,
instead of a single tensor. And it is also adapted to have only rank 0
writing to the file, instead of using a shared pointer.
- `dist_sample_sort()` implements a distributed sample sort, as detailed
[here](https://brunomaga.github.io/Distributed-Sort) and illustrated
below. The ranges in step 3 guarantee disjoint subsets of keys (metric
values) across nodes.


![sample_sort](https://github.com/microsoft/DeepSpeed/assets/150697676/53828103-370f-4f3b-9074-3e3bb8603000)

---------

Co-authored-by: Logan Adams <114770087+loadams@users.noreply.github.com>
ShellyNR pushed a commit to ShellyNR/DeepSpeed that referenced this pull request Mar 11, 2024
…yzer. (microsoft#5169)

Minor improvements of
[microsoft#5129.
- Writes all buffers at once to the output file, instead of iteratively
(`indexed_dataset.py`, method `add_items()`).
- Fixes the wrong initialisation of `num_workers` and `worker_id` that
were being ignored when they were provided by the user.

---------

Co-authored-by: Conglong Li <conglong.li@gmail.com>
rraminen pushed a commit to ROCm/DeepSpeed that referenced this pull request May 9, 2024
Adds class `DistributedDataAnlyzer` that implements a map-reduce on
distributed memory.
- instead of writing hundred/thousands of temp files as intermediate
storage in the map/reduce as in `DataAnalyzer`, each node holds
disjoints ordered subsets of pairs of `(metric, sample id)` as a
distributed tensor.
- Also removes the need to specify `metric_dtypes` as it's automatically
inferred by the return of `metric_function(data)`.
- Removes the need to have a distributed file system where all nodes
need to be able to write to: here only rank 0 does the writing.
- Much faster than the original map-reduce based on writing and loading
several temp files into the disk, requires less memory, no temp files,
and is simpler.

## How does it work

- for each metric, the only results storage is `metric_result` as a list
of `(sample_id, metric_value)` tuples.
- `metric_result` is converted to a 2D tensor when whole dataset has
been iterated.
- `sample_idx_dtype` and `metric_value_dtype` are collected by an
`all_reduce(op=MIN)` and `MAX` across the `metric_result` of all nodes.
- each node holds a `metric_result` tensor of `N` samples as `N x
(metric, sample)`, sorted by metric, with different `metric` values
across nodes. E.g.:
- node 1 holds `[ [1,20], [1, 30], [2,10], [2, 30]]`, node 2 holds `[
[3,10], [3, 20], [3,15], [3, 40]]`, and node 3 holds `[ [4,20], [4, 30],
[5,25], [5, 50]]`.
- to convert the list `[(metric,sample)]` to a `dict{ metric =
[samples]}` each node iterates only its own dataset, as dictionary keys
do not overlap across nodes. In this case, node 1 builds `{ 1: [20, 30],
2: [10,30]}`, node 2 builds `{ 3: [10, 20, 15, 40] }`, and node 3 holds
`{ 4: [20, 30], 5: [25, 50]}`.
- To write the merged files: (1) rank 0 opens the file, (2) iteratively
receives buffers of values, dict keys and dict values from other ranks
and writes them, and (3) closes the file.

## Future work

Ideally, one could take this and do the curriculum setup on-the-fly when
calling deepspeed `initialize`, i.e. without writing/loading map-reduce
files and without forcing the user to call `.map()` and `.reduce()`
beforehand. It takes less than 10 seconds so it's totally feasible.

## References

- `file_write_ordered()` implements a sequential shared write similar to
[`MPI_File_write_ordered`](https://www.open-mpi.org/doc/v3.0/man3/MPI_File_write_ordered.3.php).
It is however adapted to communicate and write a list of tensors,
instead of a single tensor. And it is also adapted to have only rank 0
writing to the file, instead of using a shared pointer.
- `dist_sample_sort()` implements a distributed sample sort, as detailed
[here](https://brunomaga.github.io/Distributed-Sort) and illustrated
below. The ranges in step 3 guarantee disjoint subsets of keys (metric
values) across nodes.


![sample_sort](https://github.com/microsoft/DeepSpeed/assets/150697676/53828103-370f-4f3b-9074-3e3bb8603000)

---------

Co-authored-by: Logan Adams <114770087+loadams@users.noreply.github.com>
rraminen pushed a commit to ROCm/DeepSpeed that referenced this pull request May 9, 2024
…yzer. (microsoft#5169)

Minor improvements of
[microsoft#5129.
- Writes all buffers at once to the output file, instead of iteratively
(`indexed_dataset.py`, method `add_items()`).
- Fixes the wrong initialisation of `num_workers` and `worker_id` that
were being ignored when they were provided by the user.

---------

Co-authored-by: Conglong Li <conglong.li@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants