Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove mandatory index key from output of metric_function in DataAnalysis map operation #5112

Merged
merged 14 commits into from Feb 15, 2024

Conversation

bm-synth
Copy link
Contributor

@bm-synth bm-synth commented Feb 10, 2024

When performing the map operation required for the curriculum learning, the output of metric_function requires an index field:

    def update_metric_results(self, data, metric_types, metric_dtypes, metric_functions, metric_results):
        for m_idx in range(len(metric_types)):
            [...]
            if metric_type == 'single_value_per_sample':
                for row in range(metric_values.size()[0]):
                    metric_result["sample_to_metric_builder"].add_item(metric_values[row].reshape(-1))
                    metric_result["metric_to_sample_dict"][metric_values[row].item()].append(
                        data['index'][row][0].item()). ##<------- data['index']??

There is no mention to this in the documentation, where it specifies that the output of metric_function should be a dict/DataFrame (?) with an index key/column. To makes things worse, on top of that, there is no way for an user to be able to specify a proper index value for each sample, because the distribution of samples across workers/threads is not know, as it's done inside DataAnalysis:

    def run_map_helper(self, thread_id):
        start_idx, end_idx = self.thread_splits[thread_id][0], \
            self.thread_splits[thread_id][1]
        logger.info(f"worker {self.worker_id} thread {thread_id}: start working " \
            f"on data subset {start_idx} to {end_idx}")
        thread_dataset = Subset(self.dataset, list(range(start_idx, end_idx)))
        sampler = BatchSampler(SequentialSampler(thread_dataset), batch_size=self.batch_size, drop_last=False)

Since by design you picked a SequentialSampler, then you know beforehand the global id of each each sample of each batch of each thread of each worker by looking at

self.worker_splits, self.thread_splits = split_dataset(self.dataset, self.num_workers, self.worker_id,
                                                               self.num_threads)
start_idx, end_idx = thread_splits[t_idx_reduce][0], thread_splits[t_idx_reduce][1]

and you can populate that index value correctly, instead of asking the user to provide it.

This PR removes the need for 'index' key in data and uses instead the batch, thread, and worker ids to compute the global index of each sample.

@bm-synth bm-synth changed the title remove 'index' key from output of metric_function in DataAnalysis remove mandatory index key from output of metric_function in DataAnalysis map operation Feb 10, 2024
@bm-synth bm-synth changed the title remove mandatory index key from output of metric_function in DataAnalysis map operation remove mandatory index key from output of metric_function in DataAnalysis map operation Feb 10, 2024
@bm-synth bm-synth changed the title remove mandatory index key from output of metric_function in DataAnalysis map operation remove mandatory index key from output of metric_function in DataAnalysis map operation [ONGOING TESTING] Feb 10, 2024
@bm-synth bm-synth marked this pull request as ready for review February 10, 2024 21:35
@bm-synth
Copy link
Contributor Author

@microsoft-github-policy-service agree

@bm-synth bm-synth changed the title remove mandatory index key from output of metric_function in DataAnalysis map operation [ONGOING TESTING] remove mandatory index key from output of metric_function in DataAnalysis map operation Feb 10, 2024
@conglongli
Copy link
Contributor

@bm-synth Could you resolve the conflicts? Thanks.

@bm-synth
Copy link
Contributor Author

@bm-synth Could you resolve the conflicts? Thanks.

@conglongli done

@conglongli
Copy link
Contributor

@bm-synth Could you resolve the conflicts? Thanks.

@conglongli done

@bm-synth Thanks you. On the other hand, after reading this PR's details, I'm concerning that your PR might not be able to replace the index key. The index key is because that user's dataset may have shuffling feature, so we have to ask user to always provide an index to indicate "inside the data, what is the exact index of this sample". Otherwise we could make a wrong connection between the data sample and the curriculum difficulty value. This PR basically assumes that the data is always in-order, which might not be always the case. You can refer to how I do the data analysis at here https://github.com/microsoft/Megatron-DeepSpeed/blob/6d4c535eeae782daa22583fd8abac7cec3bb60f2/examples_deepspeed/data_efficiency/gpt/ds_analyze_gpt_data_map.sh#L66 where I have to add a "--return-data-index" flag to return the actual index.

@conglongli
Copy link
Contributor

@bm-synth To further clarify: in Megatron-DeepSpeed and Megatron-LM, the dataset is shuffled even before reaching sampler https://github.com/microsoft/Megatron-DeepSpeed/blob/6d4c535eeae782daa22583fd8abac7cec3bb60f2/megatron/data/gpt_dataset.py#L597. This is why even if we use a SequentialSampler for data analysis, the data could still be shuffled. Thus an index key provided by user is needed.

@conglongli
Copy link
Contributor

@bm-synth After some more thinking, I think there is still values in your approach and it should work in many cases. So my proposal is that: we still keep the index key and use it when user provides it. Otherwise we use your approach.

@bm-synth
Copy link
Contributor Author

bm-synth commented Feb 15, 2024

@bm-synth To further clarify: in Megatron-DeepSpeed and Megatron-LM, the dataset is shuffled even before reaching sampler https://github.com/microsoft/Megatron-DeepSpeed/blob/6d4c535eeae782daa22583fd8abac7cec3bb60f2/megatron/data/gpt_dataset.py#L597. This is why even if we use a SequentialSampler for data analysis, the data could still be shuffled. Thus an index key provided by user is needed.

@conglongli ok i saw it in the megatron source code:

        if args.return_data_index:
            sample_dict.update({'index': np.array([orig_idx], dtype=np.int64)})

where orig_idx is defined as:

class GPTDataset(torch.utils.data.Dataset):

    def __getitem__(self, idx):
        args = get_args()
        orig_idx = idx

so orig_idx seems to be the index pre-shuffling, ie before being shuffled by the dataloader, ie in a Dataset.
And that makes sense, for the map reduce we're just mapping and outputting the difficulties for the samples, the order is irrelevant.

This is a quick code I just wrote to test my theory (try with shuffle=False and shuffle=True):

import torch
class Dataset(torch.utils.data.Dataset):
    def __init__(self):
        self.values = list(range(11))

    def __len__(self):
        return 11

    def __getitem__(self, idx):
        return idx, self.values[idx]

s=Dataset()
loader= torch.utils.data.DataLoader(s,
                                     batch_size=1, shuffle=True,
                                     num_workers=0)

for i in iter(loader):
    print(i)

with shuffle=True:

[tensor([3]), tensor([3])]
[tensor([1]), tensor([1])]
[tensor([0]), tensor([0])]
[tensor([5]), tensor([5])]
[tensor([6]), tensor([6])]
[tensor([10]), tensor([10])]
[tensor([7]), tensor([7])]
[tensor([9]), tensor([9])]
[tensor([8]), tensor([8])]
[tensor([2]), tensor([2])]
[tensor([4]), tensor([4])]

and with shuffle=False:

[tensor([0]), tensor([0])]
[tensor([1]), tensor([1])]
[tensor([2]), tensor([2])]
[tensor([3]), tensor([3])]
[tensor([4]), tensor([4])]
[tensor([5]), tensor([5])]
[tensor([6]), tensor([6])]
[tensor([7]), tensor([7])]
[tensor([8]), tensor([8])]
[tensor([9]), tensor([9])]
[tensor([10]), tensor([10])]

and this shows that the idx in __getitem__ in the Dataset class is the global idx, pre-shuffling.

Also thinking about it, your DataAnalyzer only takes a dataset, and then you must pass it to deepspeed.initialize() that will return a DataLoader. Usually shuffling is specified in the user dataloader (DataLoader(dataset, sampler=None,...)). So this should still work. However

in Megatron-DeepSpeed and Megatron-LM, the dataset is shuffled even before reaching sampler

I looked at that particular code and I believe that it changes the problem, this is a "non-standard" shuffling procedure done outside DataLoader, and adding the index parameter only suits your Megatron case. (Im not 100% sure of this tbh, is it?)

So i added a new commit where i support:

  • the megatron use case: if index field exists in the dataset items;
  • any other user-defined ordering, if sample_indices is provided when constructing DataAnalyzer;
  • as default behaviour, the indices given by the original order;

@conglongli conglongli added this pull request to the merge queue Feb 15, 2024
Merged via the queue into microsoft:master with commit 2d0a6bc Feb 15, 2024
12 checks passed
@bm-synth bm-synth deleted the data_analysis_remove_key_index branch February 15, 2024 16:32
github-merge-queue bot pushed a commit that referenced this pull request Feb 16, 2024
Added missing `ininstance` check in
[#5112.

---------

Co-authored-by: Conglong Li <conglong.li@gmail.com>
mauryaavinash95 pushed a commit to mauryaavinash95/DeepSpeed that referenced this pull request Feb 17, 2024
…aAnalysis` map operation (microsoft#5112)

When performing the map operation required for the curriculum learning,
the output of `metric_function` requires an `index` field:
```
    def update_metric_results(self, data, metric_types, metric_dtypes, metric_functions, metric_results):
        for m_idx in range(len(metric_types)):
            [...]
            if metric_type == 'single_value_per_sample':
                for row in range(metric_values.size()[0]):
                    metric_result["sample_to_metric_builder"].add_item(metric_values[row].reshape(-1))
                    metric_result["metric_to_sample_dict"][metric_values[row].item()].append(
                        data['index'][row][0].item()). ##<------- data['index']??
```

There is no mention to this in the documentation, where it specifies
that the output of `metric_function` should be a dict/DataFrame (?) with
an `index` key/column. To makes things worse, on top of that, there is
no way for an user to be able to specify a proper `index` value for each
sample, because the distribution of samples across workers/threads is
not know, as it's done inside `DataAnalysis`:
```
    def run_map_helper(self, thread_id):
        start_idx, end_idx = self.thread_splits[thread_id][0], \
            self.thread_splits[thread_id][1]
        logger.info(f"worker {self.worker_id} thread {thread_id}: start working " \
            f"on data subset {start_idx} to {end_idx}")
        thread_dataset = Subset(self.dataset, list(range(start_idx, end_idx)))
        sampler = BatchSampler(SequentialSampler(thread_dataset), batch_size=self.batch_size, drop_last=False)
```

Since by design you picked a `SequentialSampler`, then you know
beforehand the global id of each each sample of each batch of each
thread of each worker by looking at
```
self.worker_splits, self.thread_splits = split_dataset(self.dataset, self.num_workers, self.worker_id,
                                                               self.num_threads)
start_idx, end_idx = thread_splits[t_idx_reduce][0], thread_splits[t_idx_reduce][1]
```
and you can populate that index value correctly, instead of asking the
user to provide it.

This PR removes the need for `'index'` key in `data` and uses instead
the batch, thread, and worker ids to compute the global index of each
sample.
mauryaavinash95 pushed a commit to mauryaavinash95/DeepSpeed that referenced this pull request Feb 17, 2024
Added missing `ininstance` check in
[microsoft#5112.

---------

Co-authored-by: Conglong Li <conglong.li@gmail.com>
rraminen pushed a commit to ROCm/DeepSpeed that referenced this pull request May 9, 2024
…aAnalysis` map operation (microsoft#5112)

When performing the map operation required for the curriculum learning,
the output of `metric_function` requires an `index` field:
```
    def update_metric_results(self, data, metric_types, metric_dtypes, metric_functions, metric_results):
        for m_idx in range(len(metric_types)):
            [...]
            if metric_type == 'single_value_per_sample':
                for row in range(metric_values.size()[0]):
                    metric_result["sample_to_metric_builder"].add_item(metric_values[row].reshape(-1))
                    metric_result["metric_to_sample_dict"][metric_values[row].item()].append(
                        data['index'][row][0].item()). ##<------- data['index']??
```

There is no mention to this in the documentation, where it specifies
that the output of `metric_function` should be a dict/DataFrame (?) with
an `index` key/column. To makes things worse, on top of that, there is
no way for an user to be able to specify a proper `index` value for each
sample, because the distribution of samples across workers/threads is
not know, as it's done inside `DataAnalysis`:
```
    def run_map_helper(self, thread_id):
        start_idx, end_idx = self.thread_splits[thread_id][0], \
            self.thread_splits[thread_id][1]
        logger.info(f"worker {self.worker_id} thread {thread_id}: start working " \
            f"on data subset {start_idx} to {end_idx}")
        thread_dataset = Subset(self.dataset, list(range(start_idx, end_idx)))
        sampler = BatchSampler(SequentialSampler(thread_dataset), batch_size=self.batch_size, drop_last=False)
```

Since by design you picked a `SequentialSampler`, then you know
beforehand the global id of each each sample of each batch of each
thread of each worker by looking at
```
self.worker_splits, self.thread_splits = split_dataset(self.dataset, self.num_workers, self.worker_id,
                                                               self.num_threads)
start_idx, end_idx = thread_splits[t_idx_reduce][0], thread_splits[t_idx_reduce][1]
```
and you can populate that index value correctly, instead of asking the
user to provide it.

This PR removes the need for `'index'` key in `data` and uses instead
the batch, thread, and worker ids to compute the global index of each
sample.
rraminen pushed a commit to ROCm/DeepSpeed that referenced this pull request May 9, 2024
Added missing `ininstance` check in
[microsoft#5112.

---------

Co-authored-by: Conglong Li <conglong.li@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants