Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v3] Generalized NDArray support #1751

Closed
jhamman opened this issue Apr 5, 2024 · 7 comments
Closed

[v3] Generalized NDArray support #1751

jhamman opened this issue Apr 5, 2024 · 7 comments
Labels
V3 Related to compatibility with V3 spec
Milestone

Comments

@jhamman
Copy link
Member

jhamman commented Apr 5, 2024

Recent discussions at the Refactor meeting have indicated indicated that adding generalized array support to the v3 roadmap would be useful.

@akshaysubr - can I ask you to take this ticket and push it forward?

@jhamman jhamman added the V3 Related to compatibility with V3 spec label Apr 5, 2024
@jhamman jhamman added this to the 3.0.0 milestone Apr 5, 2024
@akshaysubr
Copy link

Would propose renaming this issue to Generalized NDArray support to disambiguate between zarr arrays and the underlying ndarray datastructures used.

Proposal for adding generalized ndarray support to zarr

  • Create a native zarr NDArray class for typing and to interface with existing protocols. This includes:

    • Buffer protocol (bytes arrays, etc.)
    • __array_interface__ (Numpy)
    • __cuda_array_interface__ (PyTorch, CuPy, etc.)
    • DLPack (TensorFlow, JAX, etc.)
    • Raw pointers
  • What this generalized ndarray class might look like:

    namespace zarr
    {
    
    namespace py = pybind11;
    using namespace py::literals;
    
    class Array
    {
    public:
    Array(zarrArrayInfo_t* array_info, int device_id);
    Array(py::object o, intptr_t cuda_stream = 0);
    
    py::dict array_interface() const;
    py::dict cuda_interface() const;
    
    py::tuple shape() const;
    py::tuple strides() const; // Strides of axes in bytes
    py::object dtype() const;
    
    zarrArrayBufferKind_t getBufferKind() const; // Device or Host buffer
    py::capsule dlpack(py::object stream) const; // Export to DLPack
    
    py::object cpu(); // Move array to CPU
    py::object cuda(bool synchronize, int device_id) const; // Move array to GPU
    
    const zarrArrayInfo_t& getArrayInfo() const
    {
    return array_info_;
    };
    static void exportToPython(py::module& m);
    };
    
    } // namespace zarr
  • Interoparability with Numpy

    ascending = np.arange(0, 4096, dtype=np.int32)
    zarray_h = zarr.ndarray.as_array(ascending)
    
    print(ascending.__array_interface__)
    print(zarray_h.__array_interface__)
    print(zarray_h.__cuda_array_interface__)
    print(zarray_h.buffer_size)
    print(zarray_h.buffer_kind)
    print(zarray_h.ndim)
    print(zarray_h.dtype)
  • Interoparability with Cupy

    data_gpu = cp.array(ascending)
    zarray_d = zarr.ndarray.as_array(data_gpu)
    print(data_gpu.__cuda_array_interface__)
    print(zarray_d.__cuda_array_interface__)
    print(zarray_d.buffer_kind)
    print(zarray_d.ndim)
    print(zarray_d.dtype)
  • Convert CPU to GPU

    zarray_d_cnv = zarray_h.cuda()
    print(zarray_d_cnv.__cuda_array_interface__)
  • Convert GPU to CPU

    zarray_h_cnv = zarray_d.cpu()
    print(zarray_h_cnv.__array_interface__)
  • Anything that supports the buffer protocol

    with open('file.txt', "rb") as f:
        text = f.read()
    
    zarray_txt_h = zarr.ndarray.as_array(text)
    print (zarray_txt_h.__array_interface__)
    
    zarray_txt_d = zarray_txt_h.cuda()
    print(zarray_txt_d.__cuda_array_interface__)

How to adapt zarr v3 implementations using this interface

Changes to codec pipelines

No change to how the BatchedCodecPipeline (or any other CodecPipeline works):

        for bb_codec, chunk_spec_batch in bb_codecs_with_spec[::-1]:
            chunk_bytes_batch = await bb_codec.decode_batch(
                zip(chunk_bytes_batch, chunk_spec_batch), runtime_configuration
            )

        ab_codec, chunk_spec_batch = ab_codec_with_spec
        chunk_array_batch = await ab_codec.decode_batch(
            zip(chunk_bytes_batch, chunk_spec_batch), runtime_configuration
        )

        for aa_codec, chunk_spec_batch in aa_codecs_with_spec[::-1]:
            chunk_array_batch = await aa_codec.decode_batch(
                zip(chunk_array_batch, chunk_spec_batch), runtime_configuration
            )

Changes to codecs

The codec definitions would have to change a bit though. These would all accept this new zarr.ndarray object and return objects of the same type. So just byte streams would also be just a 1D zarr.ndarray with dtype U1.

class ArrayBytesCodec(Codec):
    @abstractmethod
    async def decode(
        self,
        chunk_bytes: zarr.ndarray,
        chunk_spec: ArraySpec,
        runtime_configuration: RuntimeConfiguration,
    ) -> zarr.ndarray:
        pass

    @abstractmethod
    async def encode(
        self,
        chunk_array: zarr.ndarray,
        chunk_spec: ArraySpec,
        runtime_configuration: RuntimeConfiguration,
    ) -> Optional[zarr.ndarray]:
        pass
class ArrayBytesCodec(Codec):
    @abstractmethod
    async def decode(
        self,
        chunk_bytes: zarr.ndarray,
        chunk_spec: ArraySpec,
        runtime_configuration: RuntimeConfiguration,
    ) -> zarr.ndarray:
        pass

    @abstractmethod
    async def encode(
        self,
        chunk_array: zarr.ndarray,
        chunk_spec: ArraySpec,
        runtime_configuration: RuntimeConfiguration,
    ) -> Optional[zarr.ndarray]:
        pass

Changes to stores

Stores would also need to accept/return zarr.ndarray objects instead of just bytes objects.

class Store(ABC):
    @abstractmethod
    async def get(
        self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None
    ) -> Optional[zarr.ndarray]:
        """Retrieve the value associated with a given key.

        Parameters
        ----------
        key : str
        byte_range : tuple[int, Optional[int]], optional

        Returns
        -------
        zarr.ndarray
        """
        ...

    @abstractmethod
    async def set(self, key: str, value: zarr.ndarray) -> None:
        """Store a (key, value) pair.

        Parameters
        ----------
        key : str
        value : zarr.ndarray
        """
        ...

User facing APIs using this interface

Option 1: Return/accept zarr.ndarray objects, force users to convert to desired type using one of the above interoperability methods

in = np.arange(100)
in_z = zarr.ndarray.as_array(in)

z = zarr.array(in_z)
out = z[:5]  # zarr.ndarray
out_np = np.asarray(out)

Option 2: Add ameta_array parameter to the ArrayMetaData

This way, the meta array used when creating an array gets encoded (assuming we define a JSON encoder for it) and stored in the ZARR_JSON file and using AsyncArray.open(...) will repopulate the metadata with the same meta_array used at creation time. However, this might not be ideal since the compute environment for creating zarr arrays might be different from that when reading (CPU jobs for data pre-processing, GPU jobs for training on that data, for example). So maybe the ability to override that meta_array at read time would be useful:

    @classmethod
    async def open(
        cls,
        store: StoreLike,
        meta_array: Optional[ArrayLike] = None,
        runtime_configuration: RuntimeConfiguration = RuntimeConfiguration(),
    ) -> AsyncArray:
        store_path = make_store_path(store)
        zarr_json_bytes = await (store_path / ZARR_JSON).get()
        assert zarr_json_bytes is not None
        zarr_json = json.loads(zarr_json_bytes)
        if meta_array:
            zarr_json['meta_array'] = meta_array
        return cls.from_dict(
            store_path,
            zarr_json,
            runtime_configuration=runtime_configuration,
        )

This will then allow handling the conversion between zarr.ndarray and meta_array (say np.ndarray) internally and transparent to the user.

@jhamman jhamman changed the title [v3] Generalized array support [v3] Generalized NDArray support Apr 10, 2024
@normanrz
Copy link
Contributor

Thanks @akshaysubr!

Your proposal looks good. If there is a way that we could achieve the same results in just-Python that would be preferred. I don't know enough whether that is possible with the currently available library APIs.

With the v3 refactoring, we aim to keep the end-user API as close to v2 as possible. Option 1 would be a significant change in that users would need to write np.asarray(z[:5]) instead of just z[:5].
That is why option 2 seems more appealing. However, it would not be spec-compliant to persist meta_array in the json. Therefore, it should be user-provided by create and open. It could even be part of the runtime_configuration because it doesn't affect the storage of the array data.

@akshaysubr
Copy link

Thanks for the feedback @normanrz. I think we should be able to do this in pure python but would need to add cupy as a dependency to zarr. That can maybe be made optional for CPU only installs and raise an appropriate exception if trying to access GPU functionality?

@normanrz
Copy link
Contributor

Thanks for the feedback @normanrz. I think we should be able to do this in pure python but would need to add cupy as a dependency to zarr. That can maybe be made optional for CPU only installs and raise an appropriate exception if trying to access GPU functionality?

Thaks sounds great! Adding cupy as an optional dependency would be a good solution.

@madsbk
Copy link
Contributor

madsbk commented Apr 23, 2024

I propose we take it a step further and implement an output-by-caller policy. That is, all components in Zarr-python take an output buffer/array argument and write their results in said buffer/array instead of returning a new buffer/array.

Con

  • Every component needs to have a function that returns the output-size (exact or an over-bound) given a specific input.

Pros

  • A caller can read multiple chunks into a contiguous memory buffer without any copying.
  • A caller can read into existing buffers such as communication buffers, shared memory buffers, CUDA managed memory, etc.
  • The caller decides how the memory is allocated, e.g. thought a memory pool.
  • The caller devices the memory type such as CUDA device memory and host memory.
  • Components can optimize based on the input and output type.

I suggest that we introduce a NDBufferView that can be used both as the chunk_array argument (like @akshaysubr suggest above) and as the output argument.

Changes to codecs and stores

class ArrayBytesCodec(Codec):
    @abstractmethod
    async def decode(
        self,
        chunk_input: NDBufferView,
        chunk_output: NDBufferView,
        chunk_spec: ArraySpec,
        runtime_configuration: RuntimeConfiguration,
    ) -> None:
        pass

    @abstractmethod
    async def encode(
        self,
        chunk_input: NDBufferView,
        chunk_output: NDBufferView,
        chunk_spec: ArraySpec,
        runtime_configuration: RuntimeConfiguration,
    ) -> None:
        pass
class Store(ABC):
    @abstractmethod
    async def get(
        self, 
        key: str, 
        output: NDBufferView,
        byte_range: Optional[Tuple[int, Optional[int]]] = None
    ) -> None:
        ...

    @abstractmethod
    async def set(self, key: str, value: NDBufferView) -> None:
        ...

User facing APIs using this interface

The user can set the output buffer explicit when calling a .setitem() method on the zarr-array or use the array's default allocator to provide the output buffer implicitly.

@akshaysubr
Copy link

@madsbk The output-by-caller policy is definitely very useful especially from a copy optimization and a memory allocator standpoint. The one con that you called out is where I am stuck right now. This challenge was also discussed in this numcodecs issue: zarr-developers/numcodecs#316

For each codec, we can implement what you're suggesting for the write/encode path by having each compressor provide an upper bound for the compressed size given the uncompressed size. Almost all compressors do this already and we'd just have to expose that in the Codec API. But for the read/decode path, this is much harder to do since we don't keep track of the compression ratio or compressed size at each codec stage in the codec pipeline. We can potentially solve that by adding this additional metadata through a zarr spec extension, but in the existing version of the spec, that doesn't seem feasible.

This decode path is a general issue for compressors and finding a good longer term solution for that would be very useful. For example, the numcodecs LZ4 codec currently adds the compressed size as a 4 byte header, but this makes the LZ4 codec streams incompatible with other LZ4 decompressor implementations. The Gzip codec speculatively allocates some memory and pauses decompression to allocate a larger buffer and copy over data before continuing decompression. Most of the GPU codec implementations decompress once without writing anything out just to find the output size so the caller can allocate that much memory and then re-run decompression with output.

@madsbk
Copy link
Contributor

madsbk commented Jun 5, 2024

I think we can close this issue now that #1910 has been merged?

@normanrz normanrz closed this as completed Jun 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
V3 Related to compatibility with V3 spec
Projects
Status: Done
Development

No branches or pull requests

4 participants