diff --git a/ffcv/__init__.py b/ffcv/__init__.py index cbbcbd22..7fdfb1d6 100644 --- a/ffcv/__init__.py +++ b/ffcv/__init__.py @@ -1,5 +1,5 @@ from .loader import Loader from .writer import DatasetWriter -__version__ = '0.0.2' +__version__ = '0.0.3rc1' __all__ = ['Loader'] diff --git a/ffcv/loader/epoch_iterator.py b/ffcv/loader/epoch_iterator.py index 4bc1b9bd..b54fa96b 100644 --- a/ffcv/loader/epoch_iterator.py +++ b/ffcv/loader/epoch_iterator.py @@ -46,9 +46,8 @@ def __init__(self, loader: 'Loader', order: Sequence[int]): self.memory_bank_per_stage = defaultdict(list) - if IS_CUDA: - self.cuda_streams = [ch.cuda.Stream() - for _ in range(self.loader.batches_ahead + 2)] + self.cuda_streams = [(ch.cuda.Stream() if IS_CUDA else None) + for _ in range(self.loader.batches_ahead + 2)] # Allocate all the memory memory_allocations = {} @@ -136,7 +135,7 @@ def run_pipeline(self, b_ix, batch_indices, batch_slot, cuda_event): if first_stage: first_stage = False self.memory_context.end_batch(b_ix) - return tuple(args) + return tuple(x[:len(batch_indices)] for x in args) def __next__(self): result = self.output_queue.get() diff --git a/ffcv/loader/loader.py b/ffcv/loader/loader.py index 4fcc8678..21cdd104 100644 --- a/ffcv/loader/loader.py +++ b/ffcv/loader/loader.py @@ -93,7 +93,7 @@ def __init__(self, os_cache: bool = DEFAULT_OS_CACHE, order: ORDER_TYPE = OrderOption.SEQUENTIAL, distributed: bool = False, - seed: int = 0, # For ordering of samples + seed: int = None, # For ordering of samples indices: Sequence[int] = None, # For subset selection pipelines: Mapping[str, Sequence[Union[Operation, ch.nn.Module]]] = {}, @@ -103,6 +103,14 @@ def __init__(self, recompile: bool = False, # Recompile at every epoch ): + if distributed and order == OrderOption.RANDOM and (seed is None): + print('Warning: no ordering seed was specified with distributed=True. ' + 'Setting seed to 0 to match PyTorch distributed sampler.') + seed = 0 + elif seed is None: + tinfo = np.iinfo('int32') + seed = np.random.randint(0, tinfo.max) + # We store the original user arguments to be able to pass it to the # filtered version of the datasets self._args = { diff --git a/ffcv/traversal_order/random.py b/ffcv/traversal_order/random.py index cef4ac24..25bc1708 100644 --- a/ffcv/traversal_order/random.py +++ b/ffcv/traversal_order/random.py @@ -24,4 +24,4 @@ def sample_order(self, epoch: int) -> Sequence[int]: self.sampler.set_epoch(epoch) - return np.array(list(self.sampler)) + return self.indices[np.array(list(self.sampler))] diff --git a/ffcv/traversal_order/sequential.py b/ffcv/traversal_order/sequential.py index 632a0059..c5bba224 100644 --- a/ffcv/traversal_order/sequential.py +++ b/ffcv/traversal_order/sequential.py @@ -27,4 +27,4 @@ def sample_order(self, epoch: int) -> Sequence[int]: self.sampler.set_epoch(epoch) - return np.array(list(self.sampler)) + return self.indices[np.array(list(self.sampler))] diff --git a/setup.py b/setup.py index 075765d5..978a6df9 100644 --- a/setup.py +++ b/setup.py @@ -37,11 +37,12 @@ def pkgconfig(package, kw): **extension_kwargs) setup(name='ffcv', - version='0.0.2', + version='0.0.3rc1', description=' FFCV: Fast Forward Computer Vision ', author='MadryLab', author_email='leclerc@mit.edu', - url='https://github.com/MadryLab/fastercv', + url='https://github.com/libffcv/ffcv', + license_files = ('LICENSE.txt',), packages=find_packages(), long_description=long_description, long_description_content_type='text/markdown', diff --git a/tests/test_augmentations.py b/tests/test_augmentations.py index 9671546d..f4a44530 100644 --- a/tests/test_augmentations.py +++ b/tests/test_augmentations.py @@ -18,7 +18,6 @@ from ffcv.pipeline.compiler import Compiler from ffcv.transforms import * - SAVE_IMAGES = True IMAGES_TMP_PATH = '/tmp/ffcv_augtest_output' if SAVE_IMAGES: @@ -74,7 +73,6 @@ def run_test(length, pipeline, compile=False): assert_that(tot_indices).is_equal_to(len(my_dataset)) assert_that(tot_images).is_equal_to(len(my_dataset)) - def test_cutout(): for comp in [True, False]: run_test(100, [ diff --git a/tests/test_traversal_orders.py b/tests/test_traversal_orders.py index 9b771a95..21c00b6e 100644 --- a/tests/test_traversal_orders.py +++ b/tests/test_traversal_orders.py @@ -29,24 +29,29 @@ def __getitem__(self, index): return (index, np.sin(np.array([index])).view(' 1: init_process_group('nccl', sync_url, rank=rank, world_size=world_size) loader = Loader(fname, 8, num_workers=2, order=order, drop_last=False, - distributed=world_size > 1) + distributed=world_size > 1, indices=indices) result = [] for _ in range(3): content = np.concatenate([x[0].numpy().reshape(-1).copy() for x in loader]) result.append(content) result = np.stack(result) + np.save(path.join(out_folder, f"result-{rank}.npy"), result) -def prep_and_run_test(num_workers, order): +def prep_and_run_test(num_workers, order, with_indices=False): length = 600 + indices = None + if with_indices: + indices = np.random.choice(length, length//2, replace=False) + with TemporaryDirectory() as folder: name = path.join(folder, 'dataset.beton') sync_file = path.join(folder, 'share') @@ -58,7 +63,7 @@ def prep_and_run_test(num_workers, order): writer.from_indexed_dataset(dataset) - args = (num_workers, name, order, sync_file, folder) + args = (num_workers, name, order, sync_file, folder, indices) if num_workers > 1: spawn(process_work, nprocs=num_workers, args=args) else: @@ -71,19 +76,22 @@ def prep_and_run_test(num_workers, order): results = np.concatenate(results, 1) + # For each epoch for i in range(results.shape[0]): - if order == OrderOption.SEQUENTIAL and i < results.shape[0] - 1: - assert_that((results[i] == results[i + 1]).all()).is_true() - if order != OrderOption.SEQUENTIAL and i < results.shape[0] - 1: - assert_that((results[i] == results[i + 1]).all()).is_false() - - epoch_content = Counter(results[i]) - indices_gotten = np.array(sorted(list(epoch_content.keys()))) - assert_that(np.all(np.arange(length) == indices_gotten)).is_true() - assert_that(min(epoch_content.values())).is_equal_to(1) - assert_that(max(epoch_content.values())).is_less_than_or_equal_to(2) - - + if not with_indices: + if order == OrderOption.SEQUENTIAL and i < results.shape[0] - 1: + assert_that((results[i] == results[i + 1]).all()).is_true() + if order != OrderOption.SEQUENTIAL and i < results.shape[0] - 1: + assert_that((results[i] == results[i + 1]).all()).is_false() + + epoch_content = Counter(results[i]) + indices_gotten = np.array(sorted(list(epoch_content.keys()))) + assert_that(np.all(np.arange(length) == indices_gotten)).is_true() + assert_that(min(epoch_content.values())).is_equal_to(1) + assert_that(max(epoch_content.values())).is_less_than_or_equal_to(2) + else: + assert_that(set(results[i])).is_equal_to(set(indices)) + def test_traversal_sequential_1(): prep_and_run_test(1, OrderOption.SEQUENTIAL) @@ -123,3 +131,13 @@ def test_traversal_quasirandom_3(): @pytest.mark.skip() def test_traversal_quasirandom_4(): prep_and_run_test(4, OrderOption.QUASI_RANDOM) + +def test_traversal_sequential_distributed_with_indices(): + prep_and_run_test(2, OrderOption.SEQUENTIAL, True) + +def test_traversal_random_distributed_with_indices(): + prep_and_run_test(2, OrderOption.RANDOM, True) + +@pytest.mark.skip() +def test_traversal_quasi_random_distributed_with_indices(): + prep_and_run_test(2, OrderOption.QUASI_RANDOM, True) \ No newline at end of file