Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ValueError: cannot reshape array of size 24 into shape (8,newaxis,8) in Dataloader #535

Open
germanjke opened this issue Dec 15, 2023 · 16 comments

Comments

@germanjke
Copy link

germanjke commented Dec 15, 2023

Hi, looks like some new version of llm-foundry (updated from master) have lags in last week-two.
I have error like this

train_loader:
  dataset:
    max_seq_len: 2048
    shuffle: true
    shuffle_seed: 17
    streams:
      stream:
        local: 1
        remote: 2
        repeat: 1.0
        split: train
  drop_last: true
  name: text
  num_workers: 8
/usr/lib/python3/dist-packages/composer/trainer/trainer.py:1886 in fit       │
│                                                                              │
│   1883 │   │   │   self.state.scaler = ClosureGradScaler() if self._use_clos │
│   1884 │   │                                                                 │
│   1885 │   │   self.first_batch_complete = False                             │
│ ❱ 1886 │   │   self._train_loop()                                            │
│   1887 │                                                                     │
│   1888 │   def close(self):                                                  │
│   1889 │   │   """Shutdown the trainer.                                      │
│                                                                              │
│ /usr/lib/python3/dist-packages/composer/trainer/trainer.py:2007 in           │
│ _train_loop                                                                  │
│                                                                              │
│   2004 │   │   use_grad_scaling = self._use_grad_scaling(self.state.precisio │
│   2005 │   │                                                                 │
│   2006 │   │   if self.spin_dataloaders:                                     │
│ ❱ 2007 │   │   │   self._spin_dataloaders_to_cur_epoch()                     │
│   2008 │   │                                                                 │
│   2009 │   │   if self.state.timestamp.batch_in_epoch == 0 and self._rng_sta │
│   2010 │   │   │   # Only restore the rng state here if the step in the curr │
│                                                                              │
│ /usr/lib/python3/dist-packages/composer/trainer/trainer.py:1957 in           │
│ _spin_dataloaders_to_cur_epoch                                               │
│                                                                              │
│   1954 │   │   │   if isinstance(dataloader, DataLoader) and isinstance(data │
│   1955 │   │   │   │   dataloader.sampler.set_epoch(0)                       │
│   1956 │   │   │   if evaluator.label not in eval_state:                     │
│ ❱ 1957 │   │   │   │   for _ in dataloader:                                  │
│   1958 │   │   │   │   │   break                                             │
│   1959 │   │                                                                 │
│   1960 │   │   # spin the train dataloader's sampler to get to the state of  │
│                                                                              │
│ /usr/lib/python3/dist-packages/torch/utils/data/dataloader.py:630 in         │
│ __next__                                                                     │
│                                                                              │
│    627 │   │   │   if self._sampler_iter is None:                            │
│    628 │   │   │   │   # TODO(https://github.com/pytorch/pytorch/issues/7675 │
│    629 │   │   │   │   self._reset()  # type: ignore[call-arg]               │
│ ❱  630 │   │   │   data = self._next_data()                                  │
│    631 │   │   │   self._num_yielded += 1                                    │
│    632 │   │   │   if self._dataset_kind == _DatasetKind.Iterable and \      │
│    633 │   │   │   │   │   self._IterableDataset_len_called is not None and  │
│                                                                              │
│ /usr/lib/python3/dist-packages/torch/utils/data/dataloader.py:1345 in        │
│ _next_data                                                                   │
│                                                                              │
│   1342 │   │   │   │   self._task_info[idx] += (data,)                       │
│   1343 │   │   │   else:                                                     │
│   1344 │   │   │   │   del self._task_info[idx]                              │
│ ❱ 1345 │   │   │   │   return self._process_data(data)                       │
│   1346 │                                                                     │
│   1347 │   def _try_put_index(self):                                         │
│   1348 │   │   assert self._tasks_outstanding < self._prefetch_factor * self │
│                                                                              │
│ /usr/lib/python3/dist-packages/torch/utils/data/dataloader.py:1371 in        │
│ _process_data                                                                │
│                                                                              │
│   1368 │   │   self._rcvd_idx += 1                                           │
│   1369 │   │   self._try_put_index()                                         │
│   1370 │   │   if isinstance(data, ExceptionWrapper):                        │
│ ❱ 1371 │   │   │   data.reraise()                                            │
│   1372 │   │   return data                                                   │
│   1373 │                                                                     │
│   1374 │   def _mark_worker_as_unavailable(self, worker_id, shutdown=False): │
│                                                                              │
│ /usr/lib/python3/dist-packages/torch/_utils.py:694 in reraise                │
│                                                                              │
│   691 │   │   │   # If the exception takes multiple arguments, don't try to  │
│   692 │   │   │   # instantiate since we don't know how to                   │
│   693 │   │   │   raise RuntimeError(msg) from None                          │
│ ❱ 694 │   │   raise exception                                                │
│   695                                                                        │
│   696                                                                        │
│   697 def _get_available_device_type():                                      │
╰──────────────────────────────────────────────────────────────────────────────╯
ValueError: Caught ValueError in DataLoader worker process 0.
Original Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/torch/utils/data/_utils/worker.py", line 
308, in _worker_loop
    data = fetcher.fetch(index)
  File "/usr/lib/python3/dist-packages/torch/utils/data/_utils/fetch.py", line 
32, in fetch
    data.append(next(self.dataset_iter))
  File "/usr/lib/python3/dist-packages/streaming/base/dataset.py", line 1392, in
__iter__
    sample_ids = self._get_work(world, epoch, sample_in_epoch)
  File "/usr/lib/python3/dist-packages/streaming/base/dataset.py", line 934, in 
_get_work
    epoch_sample_ids = generate_work(self.batching_method, self, world, epoch,
  File "/usr/lib/python3/dist-packages/streaming/base/batching/__init__.py", 
line 43, in generate_work
    return get(dataset, world, epoch, sample_in_epoch)
  File "/usr/lib/python3/dist-packages/streaming/base/batching/random.py", line 
54, in generate_work_random_batching
    big_ids = get_partitions(dataset.partition_algo, dataset.epoch_size,
  File "/usr/lib/python3/dist-packages/streaming/base/partition/__init__.py", 
line 55, in get_partitions
    return get(num_samples, num_canonical_nodes, num_physical_nodes, 
ranks_per_node,
  File "/usr/lib/python3/dist-packages/streaming/base/partition/relaxed.py", 
line 65, in get_partitions_relaxed
    return get_partitions_orig(num_samples, num_canonical_nodes, 
num_physical_nodes,
  File "/usr/lib/python3/dist-packages/streaming/base/partition/orig.py", line 
133, in get_partitions_orig
    ids = ids.reshape(num_physical_nodes, -1, ranks_per_node)
ValueError: cannot reshape array of size 24 into shape (8,newaxis,8)

Changing drop_last to False doesn't helps
If I will change num_workers to some value will I lose some efficiency? Always launched with 8.

@germanjke
Copy link
Author

I'm ok with

git branch
* (HEAD detached at 6c41241)
composer==0.15.0
mosaicml-streaming==0.6.1

And not ok with

* (HEAD detached at 4638092)
mosaicml-streaming==0.7.1

Dont know where problem is?

@germanjke
Copy link
Author

I cant launch with num_workers: 0, cause prefetch_factor
With num_workers: 1 same error as in title

@karan6181
Copy link
Collaborator

Hey @germanjke, sorry for the late response. Can you please share the below values for us to debug this issue?

num_samples (int): Dataset size.
num_canonical_nodes (int): Number of canonical nodes.
num_physical_nodes (int): Number of physical nodes.
ranks_per_node (int): Number of ranks per node.
workers_per_rank (int): Number of worker partitions per rank.
batch_size (int, optional): The batch size of its DataLoader, which affects how the dataset is partitioned over the workers.
drop_first (int): Number of samples seen already, which are dropped. This will be zero if you are not resuming from the checkpoint.
initial_physical_nodes (int, optional): Number of physical nodes at the start of training. Defaults to ``None``.

@germanjke
Copy link
Author

Hey @karan6181 do you know how I can take this values from index.json or maybe from llm-foundry wandb log?

@germanjke
Copy link
Author

train_loader
dataset
max_seq_len
2048
shuffle
true
shuffle_seed
17
streams (45 collapsed)
drop_last
false
name
"text"
num_workers
8

@germanjke
Copy link
Author

composer_commit_hash
"None"
composer_version
"0.17.1"
console_log_interval
"100ba"
device_eval_batch_size
16
device_train_batch_size
32
device_train_grad_accum
2
device_train_microbatch_size
16

@karan6181
Copy link
Collaborator

Hey @germanjke, let me help you get that information. Also, what streaming dataset version are you using?

num_samples (int): Open index.json and add the values from all entries of samples.
num_canonical_nodes (int): From the above issue description, you are not passing any values. Hence, it would be default.
num_physical_nodes (int): How many nodes are you running this on?
ranks_per_node (int): Number of GPUs per node?
workers_per_rank (int): This is 8 based on the above value. num_workers: 8.
batch_size (int, optional): What batch_size value are passing it to StreamingDataset? is it 16 or 32?
drop_first (int): Number of samples seen already, which are dropped. This could be zero if you are not resuming from the checkpoint.
initial_physical_nodes (int, optional): Number of physical nodes at the start of training. This could be None if you are not resuming from the checkpoint.

An easier method to find out those values is to clone the streaming dataset, create a test branch, add a print statement at this line to print all the params, push a commit, install that custom branch, and run your job.

@germanjke
Copy link
Author

germanjke commented Dec 20, 2023

Hi @karan6181
Thank you for advice, I'm was doing to reproduce this, just copied your file and paste print there.
And everything works fine now!

[batch=1/10000]:
         Train time/epoch: 0
         Train time/batch: 0
         Train time/sample: 0
         Train time/batch_in_epoch: 0
         Train time/sample_in_epoch: 0
         Train time/token: 0
         Train time/token_in_epoch: 0
         Train memory/current_allocated_mem: 11.1450
         Train memory/current_active_mem: 11.1450
         Train memory/current_inactive_mem: 3.7532
         Train memory/current_reserved_mem: 63.3420
         Train memory/peak_allocated_mem: 45.1670
         Train memory/peak_active_mem: 45.6680
         Train memory/peak_inactive_mem: 14.9810
         Train memory/peak_reserved_mem: 63.3420
         Train memory/alloc_retries: 0
         Train trainer/device_train_microbatch_size: 16
         Train loss/train/total: 7.5101
         Train metrics/train/LanguageCrossEntropy: 7.5101
         Train metrics/train/LanguagePerplexity: 1826.4814
         Train lr-DecoupledAdamW/group0: 0.0000
         Train time/train: 0.0211
         Train time/val: 0.0000
         Train time/total: 0.0211

So the only change I have changed my orig.py file by yours, my llm foundry version is

* (HEAD detached at 4638092)

So I guess some changes was just recently just in this file?

Streaming version is the same mosaicml-streaming==0.7.1

@germanjke
Copy link
Author

Also here is my prints, I have 3 validations dataloaders and 1 train:

num_samples: 21,
num_physical_nodes: 8,
ranks_per_node: 8,
workers_per_rank: 8,
batch_size: 16,
drop_first: 0,
initial_physical_nodes: None
/usr/lib/python3/dist-packages/streaming/base/partition/orig.py:163: UserWarning: Attempting to partition 3 samples per physical node over 8 gpus. This will result in many samples being repeated, and depending on your batching method, batches being completely dropped. Check if your dataset has the expected number of samples.
  warnings.warn(f'Attempting to partition {ids.shape[1]} samples per physical node ' +
num_samples: 402,
num_physical_nodes: 8,
ranks_per_node: 8,
workers_per_rank: 8,
batch_size: 16,
drop_first: 0,
initial_physical_nodes: None
num_samples: 1088115,
num_physical_nodes: 8,
ranks_per_node: 8,
workers_per_rank: 8,
batch_size: 16,
drop_first: 0,
initial_physical_nodes: None
num_samples: 121149865,
num_physical_nodes: 8,
ranks_per_node: 8,
workers_per_rank: 8,
batch_size: 32,
drop_first: 0,
initial_physical_nodes: None

@germanjke
Copy link
Author

germanjke commented Dec 20, 2023

Can we deliver this version of file to llm-foundry to build it from setup.py from llm-foundry repo?
I'm using llm-foundry submodule

@germanjke
Copy link
Author

germanjke commented Dec 20, 2023

old version file, here I have bugs

# Copyright 2023 MosaicML Streaming authors
# SPDX-License-Identifier: Apache-2.0

"""Apportion shards/samples to nodes/ranks/workers for elastically deterministic sample order."""

import logging
import math
from typing import Optional

import numpy as np
from numpy.typing import NDArray

logger = logging.getLogger(__name__)


def get_partitions_orig(num_samples: int,
                        num_canonical_nodes: int,
                        num_physical_nodes: int,
                        ranks_per_node: int,
                        workers_per_rank: int,
                        batch_size: Optional[int] = None,
                        drop_first: int = 0,
                        initial_physical_nodes: Optional[int] = None) -> NDArray[np.int64]:
    """Partition the given number of samples to nodes, ranks, and workers.

    Either canonical or physical nodes must be evenly divisible by the other.

    It is suggested to set num_canonical_nodes higher than your expected number of physical nodes,
    because scaling your number of nodes below that level may result in more shards being used
    across node boundaries due to preserving the same global sample order.

    Args:
        num_samples (int): Dataset size.
        num_canonical_nodes (int): Number of canonical nodes.
        num_physical_nodes (int): Number of physical nodes.
        ranks_per_node (int): Number of ranks per node.
        workers_per_rank (int): Number of worker partitions per rank.
        batch_size (int, optional): Batch size of its DataLoader, which affects how the dataset is
            partitioned over the workers. Defaults to ``None``.
        drop_first (int): Number of samples seen already, which are dropped. Defaults to ``0``.
        initial_physical_nodes (int, optional): Number of physical nodes at the start of training.
            Defaults to ``None``.

    Returns:
        NDArray[np.int64]: Partitions of shape (physical nodes, ranks per node, workers per rank,
            batches per worker, batch size).
    """
    if num_samples <= drop_first:
        raise ValueError(f'Resuming further into the dataset ({drop_first}) than it has samples ' +
                         f'({num_samples})')

    if num_canonical_nodes < num_physical_nodes:
        if num_physical_nodes % num_canonical_nodes:
            raise ValueError('Either canonical or physical nodes must be evenly divisible by ' +
                             'the other, otherwise striping slices of shards over nodes may ' +
                             'lead to each node downloading all shards')
    elif num_physical_nodes < num_canonical_nodes:
        if num_canonical_nodes % num_physical_nodes:
            raise ValueError('Either canonical or physical nodes must be evenly divisible by ' +
                             'the other, otherwise striping slices of shards over nodes may ' +
                             'lead to each node downloading all shards')

    batch_size = batch_size or 1

    # If drop_first isn't a multiple of num_physical_nodes, round down to make it divisible.
    if drop_first % num_physical_nodes:
        logger.warning(
            '`drop_first` was not divisible by `num_physical_nodes`. Rounding it down ' +
            'to make it divisible.')
        drop_first -= drop_first % num_physical_nodes

    # Divide the full dataset sample range into a sample range per canonical node.
    samples_per_canonical_node = (num_samples + num_canonical_nodes - 1) // num_canonical_nodes
    node_ratio = 0
    padding = 0
    if num_canonical_nodes < num_physical_nodes:
        node_ratio = num_physical_nodes // num_canonical_nodes
        overflow = samples_per_canonical_node % node_ratio
        if overflow:
            padding = node_ratio - overflow
    padded_samples_per_canonical_node = samples_per_canonical_node + padding

    # Create the initial sample ID matrix.
    #
    # ids: (canonical nodes, padded samples per canonical node).
    ids = np.arange(num_canonical_nodes * padded_samples_per_canonical_node, dtype=np.int64)
    ids = ids.reshape(num_canonical_nodes, padded_samples_per_canonical_node)

    # Adjust row offsets to ignore the padding.
    #
    # row_offsets: (canonical nodes, 1).
    row_offsets = np.arange(num_canonical_nodes) * padding
    row_offsets = np.expand_dims(row_offsets, 1)
    ids -= row_offsets

    # Reconfigure where each row starts iterating for irregular-sized rows.
    #
    # row_starts: (canonical nodes, 1).
    row_starts = np.arange(num_canonical_nodes) * num_samples // num_canonical_nodes
    row_starts = np.expand_dims(row_starts, 1)
    ids += row_starts - ids[:, :1]

    # For short rows (length not evenly divisible), repeat the last ID to get even length.
    #
    # row_stops: (canonical nodes, 1).
    row_stops = np.arange(1, 1 + num_canonical_nodes) * num_samples // num_canonical_nodes
    row_stops = np.expand_dims(row_stops, 1)
    are_rows_short = row_stops - row_starts < samples_per_canonical_node
    ids[:, samples_per_canonical_node - 1:samples_per_canonical_node] -= are_rows_short

    # If padding we needed, repeat samples to populate it.
    if padding:
        ids[:, -padding:] = ids[:, -padding - node_ratio + 1 - padding:-padding - node_ratio + 1]

    # Flatten, drop samples that have already been seen, reshape back.
    #
    # ids: (physical nodes, samples per node).
    ids = ids.transpose()
    ids = ids.flatten()
    ids = ids[drop_first:]
    ids = ids.reshape(-1, num_physical_nodes)
    ids = ids.transpose()

    # Interleave the node sample ranges over each node's ranks, padding by repeating the last
    # sample.
    #
    # ids: (physical nodes, samples per rank, ranks per node).
    overflow = ids.shape[1] % ranks_per_node
    if overflow:
        underflow = ranks_per_node - overflow
        last = ids[:, -ranks_per_node - underflow + 1:-ranks_per_node + 1]
        ids = np.concatenate([ids, last], 1)
    ids = ids.reshape(num_physical_nodes, -1, ranks_per_node)

    # Pad with -1 adequately for reshaping across workers.
    #
    # ids: (physical nodes, samples per rank, ranks per node).
    overflow = ids.shape[1] % workers_per_rank
    rounded_num_samples = math.ceil(
        ids.shape[1] / (workers_per_rank * batch_size)) * (workers_per_rank * batch_size)
    overflow = rounded_num_samples - ids.shape[1]
    if overflow:
        last = np.full((num_physical_nodes, overflow, ranks_per_node), -1, np.int64)
        ids = np.concatenate([ids, last], 1)

    # Interleave each rank's padded samples across its workers.
    #
    # ids: (physical nodes, ranks per node, workers per rank, batches per worker, batch size).
    ids = ids.transpose(0, 2, 1)
    ids = ids.reshape(num_physical_nodes, ranks_per_node, -1, workers_per_rank, batch_size)
    return ids.transpose(0, 1, 3, 2, 4)

@germanjke
Copy link
Author

germanjke commented Dec 20, 2023

this one is good

@germanjke
Copy link
Author

germanjke commented Dec 21, 2023

not relevant comment (edited), resume training if fine with new file

@germanjke
Copy link
Author

@karan6181 What do you think, it's some conflicts in llm-foundry0.4.0? Myabe need quick fix or something?

@karan6181
Copy link
Collaborator

Hey @germanjke, sorry for my late response. Can you try the latest streaming dataset version? We have fixed some issues recently.

@germanjke
Copy link
Author

Hi @karan6181 thank you everything works fine on last streaming and llm foundry versions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants