Skip to content

Commit

Permalink
Merge 729bdb3 into d142cb3
Browse files Browse the repository at this point in the history
  • Loading branch information
Carreau committed May 20, 2020
2 parents d142cb3 + 729bdb3 commit 87106d8
Showing 1 changed file with 77 additions and 71 deletions.
148 changes: 77 additions & 71 deletions numcodecs/tests/test_blosc.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,15 @@
np.random.randint(-2**63, -2**63 + 20, size=1000, dtype='i8').view('m8[m]'),
]

@pytest.fixture(scope='module', params=[True, False, None])
def use_threads(request):
return request.param

def test_encode_decode():
for arr, codec in itertools.product(arrays, codecs):
check_encode_decode(arr, codec)

@pytest.mark.parametrize('array', arrays)
@pytest.mark.parametrize('codec', codecs)
def test_encode_decode(array, codec):
check_encode_decode(array, codec)


def test_config():
Expand Down Expand Up @@ -88,30 +93,33 @@ def test_eq():
assert Blosc(cname='lz4') != 'foo'


def test_compress_blocksize():
def test_compress_blocksize_default(use_threads):
arr = np.arange(1000, dtype='i4')

for use_threads in True, False, None:
blosc.use_threads = use_threads
blosc.use_threads = use_threads

# default blocksize
enc = blosc.compress(arr, b'lz4', 1, Blosc.NOSHUFFLE)
_, _, blocksize = blosc.cbuffer_sizes(enc)
assert blocksize > 0

# default blocksize
enc = blosc.compress(arr, b'lz4', 1, Blosc.NOSHUFFLE)
_, _, blocksize = blosc.cbuffer_sizes(enc)
assert blocksize > 0
# explicit default blocksize
enc = blosc.compress(arr, b'lz4', 1, Blosc.NOSHUFFLE, 0)
_, _, blocksize = blosc.cbuffer_sizes(enc)
assert blocksize > 0

@pytest.mark.parametrize('bs', (2**7, 2**8))
def test_compress_blocksize(use_threads, bs):
arr = np.arange(1000, dtype='i4')

# explicit default blocksize
enc = blosc.compress(arr, b'lz4', 1, Blosc.NOSHUFFLE, 0)
_, _, blocksize = blosc.cbuffer_sizes(enc)
assert blocksize > 0
blosc.use_threads = use_threads

# custom blocksize
for bs in 2**7, 2**8:
enc = blosc.compress(arr, b'lz4', 1, Blosc.NOSHUFFLE, bs)
_, _, blocksize = blosc.cbuffer_sizes(enc)
assert blocksize == bs
enc = blosc.compress(arr, b'lz4', 1, Blosc.NOSHUFFLE, bs)
_, _, blocksize = blosc.cbuffer_sizes(enc)
assert blocksize == bs


def test_compress_complib():
def test_compress_complib(use_threads):
arr = np.arange(1000, dtype='i4')
expected_complibs = {
'lz4': 'LZ4',
Expand All @@ -121,48 +129,45 @@ def test_compress_complib():
'zlib': 'Zlib',
'zstd': 'Zstd',
}
for use_threads in True, False, None:
blosc.use_threads = use_threads
for cname in blosc.list_compressors():
enc = blosc.compress(arr, cname.encode(), 1, Blosc.NOSHUFFLE)
complib = blosc.cbuffer_complib(enc)
expected_complib = expected_complibs[cname]
assert complib == expected_complib
with pytest.raises(ValueError):
# capitalized cname
blosc.compress(arr, b'LZ4', 1)
with pytest.raises(ValueError):
# bad cname
blosc.compress(arr, b'foo', 1)


@pytest.mark.parametrize('dtype', ['i1', 'i2', 'i4', 'i8'])
def test_compress_metainfo(dtype, use_threads):
arr = np.arange(1000, dtype=dtype)
for shuffle in Blosc.NOSHUFFLE, Blosc.SHUFFLE, Blosc.BITSHUFFLE:
blosc.use_threads = use_threads
for cname in blosc.list_compressors():
enc = blosc.compress(arr, cname.encode(), 1, Blosc.NOSHUFFLE)
complib = blosc.cbuffer_complib(enc)
expected_complib = expected_complibs[cname]
assert complib == expected_complib
with pytest.raises(ValueError):
# capitalized cname
blosc.compress(arr, b'LZ4', 1)
with pytest.raises(ValueError):
# bad cname
blosc.compress(arr, b'foo', 1)


def test_compress_metainfo():
for dtype in 'i1', 'i2', 'i4', 'i8':
arr = np.arange(1000, dtype=dtype)
for shuffle in Blosc.NOSHUFFLE, Blosc.SHUFFLE, Blosc.BITSHUFFLE:
for use_threads in True, False, None:
blosc.use_threads = use_threads
for cname in blosc.list_compressors():
enc = blosc.compress(arr, cname.encode(), 1, shuffle)
typesize, did_shuffle, _ = blosc.cbuffer_metainfo(enc)
assert typesize == arr.dtype.itemsize
assert did_shuffle == shuffle


def test_compress_autoshuffle():
enc = blosc.compress(arr, cname.encode(), 1, shuffle)
typesize, did_shuffle, _ = blosc.cbuffer_metainfo(enc)
assert typesize == arr.dtype.itemsize
assert did_shuffle == shuffle


def test_compress_autoshuffle(use_threads):
arr = np.arange(8000)
for dtype in 'i1', 'i2', 'i4', 'i8', 'f2', 'f4', 'f8', 'bool', 'S10':
varr = arr.view(dtype)
for use_threads in True, False, None:
blosc.use_threads = use_threads
for cname in blosc.list_compressors():
enc = blosc.compress(varr, cname.encode(), 1, Blosc.AUTOSHUFFLE)
typesize, did_shuffle, _ = blosc.cbuffer_metainfo(enc)
assert typesize == varr.dtype.itemsize
if typesize == 1:
assert did_shuffle == Blosc.BITSHUFFLE
else:
assert did_shuffle == Blosc.SHUFFLE
blosc.use_threads = use_threads
for cname in blosc.list_compressors():
enc = blosc.compress(varr, cname.encode(), 1, Blosc.AUTOSHUFFLE)
typesize, did_shuffle, _ = blosc.cbuffer_metainfo(enc)
assert typesize == varr.dtype.itemsize
if typesize == 1:
assert did_shuffle == Blosc.BITSHUFFLE
else:
assert did_shuffle == Blosc.SHUFFLE


def test_config_blocksize():
Expand Down Expand Up @@ -196,28 +201,29 @@ def _decode_worker(enc):
return data


def test_multiprocessing():
@pytest.mark.parametrize('pool', (Pool, ThreadPool))
def test_multiprocessing(use_threads, pool):
data = np.arange(1000000)
enc = _encode_worker(data)

pool = pool(5)

try:
for use_threads in None, True, False:
blosc.use_threads = use_threads
blosc.use_threads = use_threads

# test with process pool and thread pool
for pool in Pool(5), ThreadPool(5):
# test with process pool and thread pool

# test encoding
enc_results = pool.map(_encode_worker, [data] * 5)
assert all([len(enc) == len(e) for e in enc_results])
# test encoding
enc_results = pool.map(_encode_worker, [data] * 5)
assert all([len(enc) == len(e) for e in enc_results])

# test decoding
dec_results = pool.map(_decode_worker, [enc] * 5)
assert all([data.nbytes == len(d) for d in dec_results])
# test decoding
dec_results = pool.map(_decode_worker, [enc] * 5)
assert all([data.nbytes == len(d) for d in dec_results])

# tidy up
pool.close()
pool.join()
# tidy up
pool.close()
pool.join()

finally:
blosc.use_threads = None # restore default
Expand Down

0 comments on commit 87106d8

Please sign in to comment.