Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LRU cache for decoded chunks #278

Open
alimanfoo opened this issue Jul 20, 2018 · 20 comments
Open

LRU cache for decoded chunks #278

alimanfoo opened this issue Jul 20, 2018 · 20 comments

Comments

@alimanfoo
Copy link
Member

Via @calclavia from #265 (comment) breaking this out as a separate issue:

I've a related use case that I believe would benefit from introducing chunk-based decompressed cache. I use zarr for storing data that will be used for training neural networks. For this use case, often times you want to sample random (or almost random) rows from the dataset. If the sampling is mostly localized within a chunk, it would be great if the LRUCache could cache an entire chunk so we can take advantage of spatial locality.

For example, I would like to same data points [1, 5, 8, 3, 2], and because these all reside in the same compressed chunk (cached by LRU), only reading the first sample should be slow, and the rest should be already cached in memory.

N.B., this differs from the LRUStoreCache class already implemented, because that caches the encoded chunk data, but the proposal here is to add a layer for caching the decoded chunk data (and therefore avoid having to decode the chunk multiple times).

@shikharsg
Copy link
Member

shikharsg commented Aug 27, 2018

As I understand it, all the mutable mappings implemented are oblivious to compression/decompression as all the compression/decompression is handled by the zarr.Array class.

I was able to implement an interface like this here, with a new parameter called encode_decode_by_store(if this parameter is True, both compression and decompression will be handled by the chunk_store) to pass to array creation and opening:

base_store = zarr.DirectoryStore('data/foo')
lru_store = zarr.LRUStoreCache(store=base_store, max_size=2**32, compressor=LZMA())
g = zarr.Group(store=base_store, chunk_store=lru_store, encode_decode_by_store=True)
z = g.create_dataset('bar', shape=(100, 100),compressor=LZMA(), encode_decode_by_store=True)

and when opening:

base_store = zarr.DirectoryStore('data/foo')
lru_store = zarr.LRUStoreCache(store=base_store, max_size=2**32, compressor=LZMA())
z = zarr.Group(store=base_store, chunk_store=lru_store, encode_decode_by_store=True)['bar']

But this seems like too much work on the user, as he has to specify the compressor too many times in different places.

How about I try implementing something like this?

base_store = zarr.DirectoryStore('data/foo')
lru_store = zarr.LRUStoreCache(store=base_store, max_size=2**32)
g = zarr.Group(store=base_store, chunk_store=lru_store)
z = g.create_dataset('bar', shape=(100, 100),compressor=LZMA(), encode_decode_by_store=True)

Now when encode_decode_by_store=True is passed into create_dataset, zarr.create will check if the chunk_store supports compression/decompression, and if not an error is raised(which would say something like "the underlying store does not implement compression"). It'll do this by checking if the chunk_store has a method called (say) ,set_compressor, using which the compressor of the store can be set. And while opening it'll just be:

base_store = zarr.DirectoryStore('data/foo')
lru_store = zarr.LRUStoreCache(store=base_store, max_size=2**32)
z = zarr.Group(store=base_store, chunk_store=lru_store, encode_decode_by_store=True)['bar']

Here the Array class will check if the underlying store implements compression/decompression by checking the existence of the same set_compressor method of the store.

@martindurant
Copy link
Member

It seems to me that the simplest thing you can do it

lru_store = zarr.LRUStoreCache(store=base_store, max_size=2**32, compressor=...)
..
z = g.create_dataset('bar', shape=(100, 100), compressor=None)

i.e., if the compression is handled by the store (either the back-end, or caching layer), then zarr doesn't need to know. The disadvantage here, is that the metadata will indicate that the data is uncompressed, so loading it via another store won't work.

@shikharsg
Copy link
Member

Well I don't think it would be good if it was unopenable by other stores. How about we keep the compression/decompression by the zarr.Array class optional using the encode_decode_by_store parameter, and if this parameter has value True, we make sure the zarr.Array class' chunk_store attribute implements compression/decompression?

@alimanfoo
Copy link
Member Author

alimanfoo commented Aug 28, 2018 via email

@shikharsg
Copy link
Member

shikharsg commented Aug 28, 2018

If I've understood you correct, do you mean that the mutable mapping(either the store or the chunk_cache) won't implement encode/decode at all? And in _chunk_getitem(), before trying to get from store and then decode, we would first try to get from chunk_cache(if one has been provided) and if we get a KeyError, we try getting from the store and then decode?

@alimanfoo
Copy link
Member Author

alimanfoo commented Aug 28, 2018 via email

@martindurant
Copy link
Member

Yes, I agree - @alimanfoo 's idea seems a good deal simpler than mine :)

@shikharsg
Copy link
Member

I have managed to implement(I think) this in a fork here, see diff with master here. Note: haven't implemented this for 0-d arrays yet.

Did the test of ChunkCache with Array as the usual child of TestArray:

 class TestArrayWithChunkCache(TestArray):
    @staticmethod
    def create_array(read_only=False, **kwargs):
        store = dict()
        kwargs.setdefault('compressor', Zlib(level=1))
        cache_metadata = kwargs.pop('cache_metadata', True)
        cache_attrs = kwargs.pop('cache_attrs', True)
        init_array(store, **kwargs)
        return Array(store, read_only=read_only, cache_metadata=cache_metadata,
                     cache_attrs=cache_attrs, chunk_cache=ChunkCache(max_size=None)) 

These 3 tests fail(running ubuntu 16, python 3.5) for reasons I don't know yet(apologies if the errors are hard to read):

TestArrayWithChunkCache.test_dtypes

self = <zarr.tests.test_core.TestArrayWithChunkCache testMethod=test_dtypes>

def test_dtypes(self):

    # integers
    for dtype in 'u1', 'u2', 'u4', 'u8', 'i1', 'i2', 'i4', 'i8':
        z = self.create_array(shape=10, chunks=3, dtype=dtype)
        assert z.dtype == np.dtype(dtype)
        a = np.arange(z.shape[0], dtype=dtype)
        z[:] = a
        assert_array_equal(a, z[:])

    # floats
    for dtype in 'f2', 'f4', 'f8':
        z = self.create_array(shape=10, chunks=3, dtype=dtype)
        assert z.dtype == np.dtype(dtype)
        a = np.linspace(0, 1, z.shape[0], dtype=dtype)
        z[:] = a
        assert_array_almost_equal(a, z[:])

    # datetime, timedelta
    for base_type in 'Mm':
        for resolution in 'D', 'us', 'ns':
            dtype = '{}8[{}]'.format(base_type, resolution)
            z = self.create_array(shape=100, dtype=dtype, fill_value=0)
            assert z.dtype == np.dtype(dtype)
            a = np.random.randint(0, np.iinfo('u8').max, size=z.shape[0],
                                  dtype='u8').view(dtype)
          z[:] = a

zarr/tests/test_core.py:884:
zarr/core.py:1104: in setitem
self.set_basic_selection(selection, value, fields=fields)
zarr/core.py:1199: in set_basic_selection
return self._set_basic_selection_nd(selection, value, fields=fields)
zarr/core.py:1490: in _set_basic_selection_nd
self._set_selection(indexer, value, fields=fields)
zarr/core.py:1538: in _set_selection
self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)
zarr/core.py:1670: in _chunk_setitem
fields=fields)
zarr/core.py:1744: in _chunk_setitem_nosync
self._chunk_cache[ckey] = np.copy(chunk)
zarr/storage.py:1999: in setitem
self._cache_value(key, value)
zarr/storage.py:1955: in _cache_value
value_size = buffer_size(value)

v = array([ '14085111320240194-10-23', '-13259135811763820-12-02',
'6427807593149837-05-23', '-22834526793892519-...'13291045639810559-03-07',
'19280470240624792-05-02', '-1766211532600209-01-17'],
dtype='datetime64[D]')

def buffer_size(v):
    from array import array as _stdlib_array
    if PY2 and isinstance(v, _stdlib_array):  # pragma: py3 no cover
        # special case array.array because does not support buffer
        # interface in PY2
        return v.buffer_info()[1] * v.itemsize
    else:  # pragma: py2 no cover
      v = memoryview(v)

E ValueError: cannot include dtype 'M' in a buffer

zarr/util.py:323: ValueError

TestArrayWithChunkCache.test_object_arrays

self = <zarr.tests.test_core.TestArrayWithChunkCache testMethod=test_object_arrays>

def test_object_arrays(self):

    # an object_codec is required for object arrays
    with pytest.raises(ValueError):
        self.create_array(shape=10, chunks=3, dtype=object)

    # an object_codec is required for object arrays, but allow to be provided via
    # filters to maintain API backwards compatibility
    with pytest.warns(FutureWarning):
        self.create_array(shape=10, chunks=3, dtype=object, filters=[MsgPack()])

    # create an object array using msgpack
    z = self.create_array(shape=10, chunks=3, dtype=object, object_codec=MsgPack())
    z[0] = 'foo'
    assert z[0] == 'foo'
    z[1] = b'bar'
  assert z[1] == 'bar'  # msgpack gets this wrong

E AssertionError: assert b'bar' == 'bar'
zarr/tests/test_core.py:909: AssertionError

TestArrayWithChunkCache.test_object_arrays_vlen_array

self = <zarr.tests.test_core.TestArrayWithChunkCache testMethod=test_object_arrays_vlen_array>

def test_object_arrays_vlen_array(self):

    data = np.array([np.array([1, 3, 7]),
                     np.array([5]),
                     np.array([2, 8, 12])] * 1000, dtype=object)

    def compare_arrays(expected, actual, item_dtype):
        assert isinstance(actual, np.ndarray)
        assert actual.dtype == object
        assert actual.shape == expected.shape
        for ev, av in zip(expected.flat, actual.flat):
            assert isinstance(av, np.ndarray)
            assert_array_equal(ev, av)
            assert av.dtype == item_dtype

    codecs = VLenArray(int), VLenArray('<u4')
    for codec in codecs:
        z = self.create_array(shape=data.shape, dtype=object, object_codec=codec)
        z[0] = np.array([4, 7])
        assert_array_equal(np.array([4, 7]), z[0])
        z[:] = data
        a = z[:]
        assert a.dtype == object
      compare_arrays(data, a, codec.dtype)

zarr/tests/test_core.py:1040:

expected = array([array([1, 3, 7]), array([5]), array([ 2, 8, 12]), ...,
array([1, 3, 7]), array([5]), array([ 2, 8, 12])], dtype=object)
actual = array([array([1, 3, 7]), array([5]), array([ 2, 8, 12]), ...,
array([1, 3, 7]), array([5]), array([ 2, 8, 12])], dtype=object), item_dtype = dtype('uint32')

def compare_arrays(expected, actual, item_dtype):
    assert isinstance(actual, np.ndarray)
    assert actual.dtype == object
    assert actual.shape == expected.shape
    for ev, av in zip(expected.flat, actual.flat):
        assert isinstance(av, np.ndarray)
        assert_array_equal(ev, av)
      assert av.dtype == item_dtype

E AssertionError: assert dtype('int64') == dtype('uint32')
E + where dtype('int64') = array([1, 3, 7]).dtype

zarr/tests/test_core.py:1030: AssertionError

@alimanfoo
Copy link
Member Author

alimanfoo commented Aug 29, 2018 via email

@shikharsg
Copy link
Member

An update on the above 3 errors. All 3 happen because I am trying to implement "write cache" too, i.e. if one writes to a certain chunk, instead of invalidating the relevant chunk key in the ChunkCache(which would happen if one wanted only read cache), that chunk being written to is cached in the ChunkCache object.
Here is the explanation of the errors:

  • the first one TestArrayWithChunkCache.test_dtypes with this error: ValueError: cannot include dtype 'M' in a buffer is a legit bug, which @alimanfoo fixed in numcodecs here
  • second one TestArrayWithChunkCache.test_object_arrays which gives error on these two lines:
     z[1] = b'bar'
     assert z[1] == 'bar'  # msgpack gets this wrong
    the error being: AssertionError: assert b'bar' == 'bar', is again a problem with msgpack.
  • the third one TestArrayWithChunkCache.test_object_arrays_vlen_array I'll explain in little detail. We have a variable length numpy array and create a zarr array with VLenArray('<u4') codec:
    data = np.array([np.array([1, 3, 7]), np.array([5]), np.array([2, 8, 12])] * 1000, dtype=object)
    z = create_array(shape=data.shape, dtype=object, object_codec=VLenArray('<u4'))
    z[:] = data
    The dtype of each sub array in z should be unit32 but because I try to implement write cache, when I do z[:] = data, the data does not go through the encode-decode functions, and so the error when comparing the dtypes is: AssertionError: assert dtype('int64') == dtype('uint32')

All 3 errors above will not exist if we just implement read cache and no write cache, which makes me think if I should do away with the idea of write cache altogether?

@alimanfoo
Copy link
Member Author

alimanfoo commented Sep 27, 2018 via email

@shikharsg
Copy link
Member

Hi, I've just made another small change to the chunk cache part(here). Turns out that an np.copy wasn't needed in a place and it was slowing down the retrieval of data quite a bit. Would it be appropriate for me to submit a PR now for everyone to better see the diff?

@alimanfoo
Copy link
Member Author

alimanfoo commented Oct 9, 2018 via email

@jakirkham
Copy link
Member

Just to clarify a point here...

  • the first one TestArrayWithChunkCache.test_dtypes with this error: ValueError: cannot include dtype 'M' in a buffer is a legit bug, which @alimanfoo fixed in numcodecs here

This isn't so much a bug as an unsupported format (i.e. datetime64, also timedelta64) in Python proper. Under the hood we use the buffer protocol to serialize data efficiently. The buffer format must be a valid struct format string, which can contain any format character in this list. None of which includes datetime64 or timedelta64 types. NumPy supports this type, but Python does not. To clarify this point, please see the following code...

In [1]: import struct                                                           

In [2]: import numpy as np                                                      

In [3]: np.dtype(np.datetime64).char                                            
Out[3]: 'M'

In [4]: struct.Struct(np.dtype(np.datetime64).char)                             
---------------------------------------------------------------------------
error                                     Traceback (most recent call last)
<ipython-input-4-3bff20f12f13> in <module>
----> 1 struct.Struct(np.dtype(np.datetime64).char)

error: bad char in struct format

So what we do is follow this suggestion and cast to int64 as a workaround for end users, which is a valid format. The result is the following...

In [1]: import struct                                                           

In [2]: import numpy as np                                                      

In [3]: np.dtype(np.int64).char                                                 
Out[3]: 'l'

In [4]: struct.Struct(np.dtype(np.int64).char)                                  
Out[4]: <Struct at 0x11d4aedf8>

@ssanderson
Copy link

I've a related use case that I believe would benefit from introducing chunk-based decompressed cache. I use zarr for storing data that will be used for training neural networks. For this use case, often times you want to sample random (or almost random) rows from the dataset. If the sampling is mostly localized within a chunk, it would be great if the LRUCache could cache an entire chunk so we can take advantage of spatial locality.

I hit this issue with exactly this same use-case today. I'm trying to use a zarr array to store data that's used to train a tensorflow model, and I want to be able to stream data from the array in a (pseudo-)random. The data I'm using is relatively compressible, so it would be nice to be able to use a compression filter, but the overhead of decompressing each block is prohibitive right now. I was expecting that LRUStoreCache would solve my problem, but it turns out that the overhead of decompressing each block is prohibitive.

@joshmoore
Copy link
Member

@ssanderson : have you seen #306? If you end up trying it out, let us know how it goes.

@ssanderson
Copy link

ssanderson commented Aug 26, 2021

@joshmoore I saw that #306 is open, but it wasn't clear to me what the status/stability of that PR is.

For my use-case, I'm only ever chunking and querying along the first axis, so I ended up implementing a simple chunk caching scheme just using functools.lru_cache: Paraphrasing slightly, my workaround looks like this:

class MyStorage:

    def __init__(self, zarr_array):
        self._zarr_array = zarr_array
        self._chunksize = zarr_array.chunks[0]

    def load(self, i):
        """Load the ith element from the managed array."""
        chunknum, chunk_idx = divmod(i, self._chunksize)
        return self._load_chunk(chunknum)[chunkidx]

    @functools.lru_cache(cache_size)
    def _load_chunk(self, chunknum):
        return self._zarr_array[self._chunksize * chunknum:self._chunksize * (chunknum + 1)]

This seems to work quite well, and it isn't too complicated, but it does seem unfortunate that I have to know how zarr arranges its data internally to get good performance, so I'd definitely be interested in removing my hack in favor of #306 when it lands.

@joshmoore
Copy link
Member

My understanding of #306 is that it needs some testing and review. I've updated it against the latest Zarr release (v2.9.3) if you would like to give it a try.

@FarzanT
Copy link

FarzanT commented May 20, 2023

Hi, is this feature abandoned? I think avoiding the slow-downs caused by decompression when training neural networks (as mentioned in the original issue) would increase Zarr's utility in such applications.
The MyStorage class approach works well, but I wish Zarr had a simple switch for keeping in-RAM chunks compressed or decompressed.

@rabernat
Copy link
Contributor

Hi @FarzanT and thanks for your interest! This feature is still of great interest to the Zarr community. It would be great to revive #306.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants