Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype of object-store-based Store implementation #1661

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

kylebarron
Copy link

@kylebarron kylebarron commented Feb 8, 2024

Prototype of object-store based store.

object-store is a rust crate for interoperating with remote object stores like S3, GCS, Azure, etc. See the highlights section of its docs. It doesn't even try to implement a filesystem interface, instead focusing on the core atomic operations supported across object stores. This makes it a good candidate for use with a Zarr v3 Store.

object-store-python is a Python binding to object-store. With roeap/object-store-python#6, I added async methods to the library. So the underlying Rust binary will return a Python coroutine that can be awaited.

That and related PRs haven't been merged yet, but you can try this out locally by installing

pip install git+https://github.com/kylebarron/object-store-python.git@dev#subdirectory=object-store

note that you need the Rust compiler on your computer. Install that by following these docs.

TODO:

  • Implement multiple-range support in the underlying object-store-python library
  • Examples
  • Add unit tests and/or doctests in docstrings
  • Add docstrings and API docs for any new/modified user-facing classes and functions
  • New/modified features documented in docs/tutorial.rst
  • Changes documented in docs/release.rst
  • GitHub Actions have all passed
  • Test coverage is 100% (Codecov passes)

@jhamman
Copy link
Member

jhamman commented Feb 8, 2024

Amazing @kylebarron! I'll spend some time playing with this today.

@kylebarron
Copy link
Author

With roeap/object-store-python#9 it should be possible to fetch multiple ranges within a file concurrently with range coalescing (using get_ranges_async). Note that this object-store API accepts multiple ranges within one object, which is still not 100% aligned with the Zarr get_partial_values because that allows fetches across multiple objects.

That PR also adds a get_opts function which now supports "offset" and "suffix" ranges, of the sort Range:N- and Range:-N, which would allow removing the raise NotImplementedError on line 37.

@martindurant
Copy link
Member

martindurant/rfsspec#3

async def get_partial_values(
self, key_ranges: List[Tuple[str, Tuple[int, int]]]
) -> List[bytes]:
# TODO: use rust-based concurrency inside object-store
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

@kylebarron kylebarron Feb 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

object-store has a built-in function for this: get_ranges. With the caveat that it only manages multiple ranges in a single file.

get_ranges also automatically handles request merging for nearby ranges in a file.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I know, but mine already did the whole thing, so I am showing how I did that.

@normanrz
Copy link
Contributor

Great work @kylebarron!
What are everbody's thoughts on having this in zarr-python vs. spinning it out as a separate package?

@martindurant
Copy link
Member

What are everbody's thoughts on having this in zarr-python vs. spinning it out as a separate package?

I suggest we see whether it makes any improvements first, so it's author's choice for now.

@kylebarron
Copy link
Author

While @rabernat has seen some impressive perf improvements in some settings when making many requests with Rust's tokio runtime, which would possibly also trickle down to a Python binding, the biggest advantage I see is improved ease of use in installation.

A common hurdle I've seen is handling dependency management, especially around boto3, aioboto3, etc dependencies. Versions need to be compatible at runtime with any other libraries the user also has in their environment. And Python doesn't allow multiple versions of the same dependency at the same time in one environment. With a Python library wrapping a statically-linked Rust binary, you can remove all Python dependencies and remove this class of hardship.

The underlying Rust object-store crate is stable and under open governance via the Apache Arrow project. We'll just have to wait on some discussion in object-store-python for exactly where that should live.

I don't have an opinion myself on where this should live, but it should be on the order of 100 lines of code wherever it is (unless the v3 store api changes dramatically)

@jhamman
Copy link
Member

jhamman commented Feb 12, 2024

I suggest we see whether it makes any improvements first, so it's author's choice for now.

👍

What are everbody's thoughts on having this in zarr-python vs. spinning it out as a separate package?

I want to keep an open mind about what the core stores provided by Zarr-Python are. My current thinking is that we should just do a MemoryStore and a LocalFilesystemStore. Everything else can be opt-in by installing a 3rd party package. That said, I like having a few additional stores in the mix as we develop the store interface since it helps us think about the design more broadly.

@martindurant
Copy link
Member

A common hurdle I've seen is handling dependency management, especially around boto3, aioboto3, etc dependencies.

This is no longer an issue, s3fs has much more relaxed deps than it used to. Furthermore, it's very likely to be already part of an installation environment.

@normanrz
Copy link
Contributor

I want to keep an open mind about what the core stores provided by Zarr-Python are. My current thinking is that we should just do a MemoryStore and a LocalFilesystemStore. Everything else can be opt-in by installing a 3rd party package.

I agree with that. I think it is beneficial to keep the number of dependencies of core zarr-python small. But, I am open for discussion.

That said, I like having a few additional stores in the mix as we develop the store interface since it helps us think about the design more broadly.

Sure! That is certainly useful.

@jhamman jhamman added the V3 Affects the v3 branch label Feb 13, 2024
@itsgifnotjiff
Copy link

This is awesome work, thank you all!!!

Co-authored-by: Deepak Cherian <dcherian@users.noreply.github.com>
@kylebarron
Copy link
Author

The object-store-python package is not very well maintained roeap/object-store-python#24, so I took a few days to implement my own wrapper around the Rust object_store crate: https://github.com/developmentseed/object-store-rs

I'd like to update this PR soonish to use that library instead.

@martindurant
Copy link
Member

If the zarr group prefers object-store-rs, we can move it into the zarr-developers org, if you like. I would like to be involved in developing it, particularly if it can grow more explicit fsspec compatible functionality.

@kylebarron
Copy link
Author

kylebarron commented Oct 22, 2024

I have a few questions because the Store API has changed a bit since the spring.

  • There's a new BufferPrototype object. Is the BufferPrototype chosen by the store implementation or the caller? It would be very nice if this prototype could be chosen by the store implementation, because then we could return a RustBuffer object that implements the Python buffer protocol, but doesn't need to copy the buffer into Python memory.
  • Similarly for puts, is Buffer guaranteed to implement the buffer protocol? Contrary to fetching, we can't do zero-copy puts right now with object-store

I like that list now returns an AsyncGenerator. That aligns well with the underlying object-store rust API, but for technical reasons we can't expose that as an async iterable to Python yet (apache/arrow-rs#6587), even though we do expose the readable stream to Python as an async iterable.

@TomAugspurger
Copy link
Contributor

Is the BufferPrototype chosen by the store implementation or the caller? It would be very nice if this prototype could be chosen by the store implementation, because then we could return a RustBuffer object that implements the Python buffer protocol, but doesn't need to copy the buffer into Python memory.

This came up in the discussion at https://github.com/zarr-developers/zarr-python/pull/2426/files/5e0ffe80d039d9261517d96ce87220ce8d48e4f2#diff-bb6bb03f87fe9491ef78156256160d798369749b4b35c06d4f275425bdb6c4ad. By default, it's passed as default_buffer_prototype though I think the user can override at the call site or globally.

Does it look compatible with what you need?

@kylebarron
Copy link
Author

I think I'm confused why it's a parameter at all. Why shouldn't it return a protocol, and the store can implement whatever interface is most convenient to return data.

Put another way: when the store chooses the return interface, it can ensure no memory copies, and then the caller of the store can decide whether they need to copy the memory elsewhere.

@TomAugspurger
Copy link
Contributor

TomAugspurger commented Oct 22, 2024

Yeah, I'm not familiar with that. Looks like @madsbk added it in #1910, so presumably it's related to whether or not the data will end up on the GPU? I guess that's one bit of context the Store won't necessarily have, assuming it can place the data in host or device memory, and so it being a parameter might be necessary.

@d-v-b
Copy link
Contributor

d-v-b commented Oct 22, 2024

I do think making the concrete return type of store.get(key, prototype) depend on its prototype argument is a bit of an API smell. If we had no other constraints, the obvious play would be to let store.get return what the underlying storage medium returns, and ask the caller of store.get to convert that return value into the buffer of choice. I am guessing there is some reason today why we don't do this, but I wonder what abstractions we should add / change to remove this reason.

@kylebarron kylebarron mentioned this pull request Oct 22, 2024
6 tasks
@kylebarron
Copy link
Author

It makes sense that we'll always need a copy for CPU -> GPU, but I'd like to avoid situations where a store must copy data for CPU -> CPU. Right now that could be unavoidable depending on the buffer class the user passes in. Are we saying that the user needs to know the copy semantics of the underlying store?

@martindurant
Copy link
Member

It makes sense that we'll always need a copy for CPU -> GPU

Actually no, I fully expect that the rapids team should be able to make a direct object-store/NIC->GPU store class and also to do filter decoding on the GPU ( https://docs.rapids.ai/api/kvikio/stable/zarr/ ). Whether any of that ends up here, is another matter.

@kylebarron
Copy link
Author

Sure, I really meant to say "if the store loads data into the CPU, then we'll need to make a copy for CPU to GPU". I'm not surprised that it's possible to make direct to GPU readers.

@jhamman
Copy link
Member

jhamman commented Oct 22, 2024

@kylebarron - in terms of testing this, you should take a look at how we're doing this for other stores.

Basically, we've created a reusable test harness in zarr.testing.store.StoreTests. You can subclass that and it will run a bunch of store-only tests for you.

class TestRemoteStoreS3(StoreTests[RemoteStore, cpu.Buffer]):
store_cls = RemoteStore
buffer_cls = cpu.Buffer
@pytest.fixture
def store_kwargs(self, request) -> dict[str, str | bool]:
fs, path = fsspec.url_to_fs(
f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False, asynchronous=True
)
return {"fs": fs, "path": path, "mode": "r+"}
@pytest.fixture
def store(self, store_kwargs: dict[str, str | bool]) -> RemoteStore:
return self.store_cls(**store_kwargs)
async def get(self, store: RemoteStore, key: str) -> Buffer:
# make a new, synchronous instance of the filesystem because this test is run in sync code
new_fs = fsspec.filesystem(
"s3", endpoint_url=store.fs.endpoint_url, anon=store.fs.anon, asynchronous=False
)
return self.buffer_cls.from_bytes(new_fs.cat(f"{store.path}/{key}"))
async def set(self, store: RemoteStore, key: str, value: Buffer) -> None:
# make a new, synchronous instance of the filesystem because this test is run in sync code
new_fs = fsspec.filesystem(
"s3", endpoint_url=store.fs.endpoint_url, anon=store.fs.anon, asynchronous=False
)
new_fs.write_bytes(f"{store.path}/{key}", value.to_bytes())

@madsbk
Copy link
Contributor

madsbk commented Oct 23, 2024

The idea with BufferPrototype is to let the user tell how he/she wants the data to the whole stack. Stores and codecs can then optimize for a specific buffer type or just call .as_numpy_array() to get host memory access (zero-copy if the data is already on in host memory). Similarly, they can use .from_bytes() to create a new Buffer from a memoryview (zero-copy). See MemoryStore.get().

@kylebarron you should be able to create a Buffer from a RustBuffer zero-copy like:

async def get(
        self,
        key: str,
        prototype: BufferPrototype,
        byte_range: tuple[int | None, int | None] | None = None,
    ) -> Buffer | None:

    the_rust_buffer: RustBuffer = # load data into a rust buffer
    return prototype.buffer.from_buffer(memoryview(the_rust_buffer))

Now, if the user request a GPU buffer, a later codec can decide to move the data to the GPU and maybe use nvCOMP to decompress the data etc.

@kylebarron
Copy link
Author

Can someone detail the semantics of ByteRangeRequest? It's a type hint of

ByteRangeRequest: TypeAlias = tuple[int | None, int | None]

But that type hint on its own isn't fully descriptive, and I can't find any documentation about it. This is what I think it means:

  • Tuple[int, int]: This is a byte range starting with the first int and ending (exclusive) with the second int. This is a range, not a start and a length, right?
  • Tuple[None, None]. I assume this is invalid?
  • Tuple[int, None]: This is an "offset" request? All the bytes after the first int?
  • Tuple[None, int]: This is what I don't really know. Is this the same as [0, int]? Or is this a suffix request saying the last int bytes of the file?

@kylebarron
Copy link
Author

The idea with BufferPrototype is to let the user tell how he/she wants the data to the whole stack.

I'm not really a fan of this API, but I don't know the GPU side well enough to propose something else.

@martindurant
Copy link
Member

This is what I think it means:

That is certainly what it means when used in fsspec; None in the first place is the same as 0 and None in the second place is the same as "end"/"size". Note that they can can be negative, so a suffix range would be (-N, None).

I can't guarantee if the same convention is used here, but zarr blocks are either whole (None, None) or the exact range (start, stop) is known.

@kylebarron
Copy link
Author

I can't guarantee if the same convention is used here, but zarr blocks are either whole (None, None) or the exact range (start, stop) is known.

If zarr blocks are either whole or known, then shouldn't the type hint for the store be

ByteRangeRequest: TypeAlias = tuple[int, int] | None

?

@kylebarron
Copy link
Author

@d-v-b it looks like you added the type hint in #2065, can you shed some light on this?

@martindurant
Copy link
Member

or maybe tuple[int, int] | tuple[None, None] to fit fsspec's convention. I don't think there's any plausible use for suffix "the last N bytes" (fsspec uses this for things like parquet footers).

@normanrz
Copy link
Contributor

The suffix request is required for sharding. In shard files, the index containing the byte ranges of the chunks is, by default, at the end of the file. The size of the index can be statically determined from the array metadata. The size of the shard file can not be inferred in the general case. To avoid a preflight request to determine the file size, the suffix request is required. Most HTTP servers including Object Storage services support suffix requests.

@d-v-b
Copy link
Contributor

d-v-b commented Oct 24, 2024

@d-v-b it looks like you added the type hint in #2065, can you shed some light on this?

I introduced that type so that we could have exactly this conversation -- prior to its definition, we had various functions across the codebase that were taking a byte range parameter, but the type of that parameter wasn't defined in a central place. I'm not attached to this particular type! We can totally change it to something nicer, provided the semantics of that type covers all required use cases.

@kylebarron
Copy link
Author

The suffix request is required for sharding.

Is there an example of a suffix request somewhere in this repo, so we can see how the range is passed as an argument to the store?

Most HTTP servers including Object Storage services support suffix requests.

Except for Azure 😢. The default implementation of the object-store crate is to use suffix requests for other stores, but two requests for azure.

@kylebarron
Copy link
Author

we had various functions across the codebase that were taking a byte range parameter, but the type of that parameter wasn't defined in a central place.

I think it's worth considering changing it to a dataclass, because the semantics of the tuple are not always clear. And elsewhere in the codebase, a "chunk slice" refers to start and length, not start and end.

@d-v-b
Copy link
Contributor

d-v-b commented Oct 24, 2024

we had various functions across the codebase that were taking a byte range parameter, but the type of that parameter wasn't defined in a central place.

I think it's worth considering changing it to a dataclass, because the semantics of the tuple are not always clear. And elsewhere in the codebase, a "chunk slice" refers to start and length, not start and end.

Agreed, we definitely need to bump up the literacy of this type. I opened #2437 for this discussion.

@normanrz
Copy link
Contributor

The suffix request is required for sharding.

Is there an example of a suffix request somewhere in this repo, so we can see how the range is passed as an argument to the store?

https://github.com/zarr-developers/zarr-python/blob/main/src/zarr/codecs/sharding.py#L700-L702

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
V3 Affects the v3 branch
Projects
Status: In review
Development

Successfully merging this pull request may close these issues.

9 participants