Skip to content
2 changes: 1 addition & 1 deletion ffcv/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .loader import Loader
from .writer import DatasetWriter
__version__ = '0.0.2'
__version__ = '0.0.3rc1'

__all__ = ['Loader']
7 changes: 3 additions & 4 deletions ffcv/loader/epoch_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ def __init__(self, loader: 'Loader', order: Sequence[int]):

self.memory_bank_per_stage = defaultdict(list)

if IS_CUDA:
self.cuda_streams = [ch.cuda.Stream()
for _ in range(self.loader.batches_ahead + 2)]
self.cuda_streams = [(ch.cuda.Stream() if IS_CUDA else None)
for _ in range(self.loader.batches_ahead + 2)]

# Allocate all the memory
memory_allocations = {}
Expand Down Expand Up @@ -136,7 +135,7 @@ def run_pipeline(self, b_ix, batch_indices, batch_slot, cuda_event):
if first_stage:
first_stage = False
self.memory_context.end_batch(b_ix)
return tuple(args)
return tuple(x[:len(batch_indices)] for x in args)

def __next__(self):
result = self.output_queue.get()
Expand Down
10 changes: 9 additions & 1 deletion ffcv/loader/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def __init__(self,
os_cache: bool = DEFAULT_OS_CACHE,
order: ORDER_TYPE = OrderOption.SEQUENTIAL,
distributed: bool = False,
seed: int = 0, # For ordering of samples
seed: int = None, # For ordering of samples
indices: Sequence[int] = None, # For subset selection
pipelines: Mapping[str,
Sequence[Union[Operation, ch.nn.Module]]] = {},
Expand All @@ -103,6 +103,14 @@ def __init__(self,
recompile: bool = False, # Recompile at every epoch
):

if distributed and order == OrderOption.RANDOM and (seed is None):
print('Warning: no ordering seed was specified with distributed=True. '
'Setting seed to 0 to match PyTorch distributed sampler.')
seed = 0
elif seed is None:
tinfo = np.iinfo('int32')
seed = np.random.randint(0, tinfo.max)

# We store the original user arguments to be able to pass it to the
# filtered version of the datasets
self._args = {
Expand Down
2 changes: 1 addition & 1 deletion ffcv/traversal_order/random.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ def sample_order(self, epoch: int) -> Sequence[int]:

self.sampler.set_epoch(epoch)

return np.array(list(self.sampler))
return self.indices[np.array(list(self.sampler))]
2 changes: 1 addition & 1 deletion ffcv/traversal_order/sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ def sample_order(self, epoch: int) -> Sequence[int]:

self.sampler.set_epoch(epoch)

return np.array(list(self.sampler))
return self.indices[np.array(list(self.sampler))]
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ def pkgconfig(package, kw):
**extension_kwargs)

setup(name='ffcv',
version='0.0.2',
version='0.0.3rc1',
description=' FFCV: Fast Forward Computer Vision ',
author='MadryLab',
author_email='leclerc@mit.edu',
url='https://github.com/MadryLab/fastercv',
url='https://github.com/libffcv/ffcv',
license_files = ('LICENSE.txt',),
packages=find_packages(),
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down
2 changes: 0 additions & 2 deletions tests/test_augmentations.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from ffcv.pipeline.compiler import Compiler
from ffcv.transforms import *


SAVE_IMAGES = True
IMAGES_TMP_PATH = '/tmp/ffcv_augtest_output'
if SAVE_IMAGES:
Expand Down Expand Up @@ -74,7 +73,6 @@ def run_test(length, pipeline, compile=False):
assert_that(tot_indices).is_equal_to(len(my_dataset))
assert_that(tot_images).is_equal_to(len(my_dataset))


def test_cutout():
for comp in [True, False]:
run_test(100, [
Expand Down
50 changes: 34 additions & 16 deletions tests/test_traversal_orders.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,29 @@ def __getitem__(self, index):
return (index, np.sin(np.array([index])).view('<u1'))


def process_work(rank, world_size, fname, order, sync_fname, out_folder):
def process_work(rank, world_size, fname, order, sync_fname, out_folder, indices):
sync_url = f'file://{sync_fname}'
if world_size > 1:
init_process_group('nccl', sync_url, rank=rank, world_size=world_size)

loader = Loader(fname, 8, num_workers=2, order=order, drop_last=False,
distributed=world_size > 1)
distributed=world_size > 1, indices=indices)

result = []
for _ in range(3):
content = np.concatenate([x[0].numpy().reshape(-1).copy() for x in loader])
result.append(content)
result = np.stack(result)

np.save(path.join(out_folder, f"result-{rank}.npy"), result)


def prep_and_run_test(num_workers, order):
def prep_and_run_test(num_workers, order, with_indices=False):
length = 600
indices = None
if with_indices:
indices = np.random.choice(length, length//2, replace=False)

with TemporaryDirectory() as folder:
name = path.join(folder, 'dataset.beton')
sync_file = path.join(folder, 'share')
Expand All @@ -58,7 +63,7 @@ def prep_and_run_test(num_workers, order):

writer.from_indexed_dataset(dataset)

args = (num_workers, name, order, sync_file, folder)
args = (num_workers, name, order, sync_file, folder, indices)
if num_workers > 1:
spawn(process_work, nprocs=num_workers, args=args)
else:
Expand All @@ -71,19 +76,22 @@ def prep_and_run_test(num_workers, order):

results = np.concatenate(results, 1)

# For each epoch
for i in range(results.shape[0]):
if order == OrderOption.SEQUENTIAL and i < results.shape[0] - 1:
assert_that((results[i] == results[i + 1]).all()).is_true()
if order != OrderOption.SEQUENTIAL and i < results.shape[0] - 1:
assert_that((results[i] == results[i + 1]).all()).is_false()

epoch_content = Counter(results[i])
indices_gotten = np.array(sorted(list(epoch_content.keys())))
assert_that(np.all(np.arange(length) == indices_gotten)).is_true()
assert_that(min(epoch_content.values())).is_equal_to(1)
assert_that(max(epoch_content.values())).is_less_than_or_equal_to(2)


if not with_indices:
if order == OrderOption.SEQUENTIAL and i < results.shape[0] - 1:
assert_that((results[i] == results[i + 1]).all()).is_true()
if order != OrderOption.SEQUENTIAL and i < results.shape[0] - 1:
assert_that((results[i] == results[i + 1]).all()).is_false()

epoch_content = Counter(results[i])
indices_gotten = np.array(sorted(list(epoch_content.keys())))
assert_that(np.all(np.arange(length) == indices_gotten)).is_true()
assert_that(min(epoch_content.values())).is_equal_to(1)
assert_that(max(epoch_content.values())).is_less_than_or_equal_to(2)
else:
assert_that(set(results[i])).is_equal_to(set(indices))


def test_traversal_sequential_1():
prep_and_run_test(1, OrderOption.SEQUENTIAL)
Expand Down Expand Up @@ -123,3 +131,13 @@ def test_traversal_quasirandom_3():
@pytest.mark.skip()
def test_traversal_quasirandom_4():
prep_and_run_test(4, OrderOption.QUASI_RANDOM)

def test_traversal_sequential_distributed_with_indices():
prep_and_run_test(2, OrderOption.SEQUENTIAL, True)

def test_traversal_random_distributed_with_indices():
prep_and_run_test(2, OrderOption.RANDOM, True)

@pytest.mark.skip()
def test_traversal_quasi_random_distributed_with_indices():
prep_and_run_test(2, OrderOption.QUASI_RANDOM, True)