Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement read coalescing algorithm #1198

Merged
merged 10 commits into from
May 10, 2024
Merged

Conversation

nsmith-
Copy link
Collaborator

@nsmith- nsmith- commented Apr 17, 2024

This PR implements two read optimization strategies:

Coalesce nearby byte ranges into one read range

For example, two ranges in a request [(100, 140), (142, 160)] becomes one range [(100, 160)] that is then split on the receiving side. The motivation is that, on the server side, experience from CMSSW with xrootd shows that sometimes servers handle the two ranges separately in backend calls in a way that lowers overall throughput.

Split very large requests into smaller chunks

Since #1162 we went from a situation where there was one request per range using fs.cat_file to one request for several ranges using fs.cat_ranges. This was motivated by the fact that we observed a large overhead with xrootd for separate read requests due to needing to open and close a (statueful 馃槷) xrootd file handle. Arguably, this was fixed in a different way with CoffeaTeam/fsspec-xrootd#54 but one might debate that we have reason to prefer a vector read call (used in the fsspec-xrootd implementation of cat_ranges) over individual read calls in xrootd.

One of the consequences of this change is the future that notifies the downstream work will wait until all data is received. In the case where we are requesting several TBaskets for several branches, this can become a large read request (Megabytes). If we separate a bulk multi-range request into a few smaller (but still substantial compared to the ranges in the request) request, we can do work on (e.g. decompress) a subset of the expected data while the rest is in flight. For example, a 20MB request might be composed of two hundred 100k ranges, and waiting for the whole thing adds overall latency:

Before:
0                                                     1
1     2     3     4     5     6     7     8     9     0     1     2
|--        read 20MB          --||--decompress 20MB--|

After:
0                                                     1
1     2     3     4     5     6     7     8     9     0     1     2
|-- read 10MB --||-- read 10MB--|
                |-d 10MB-|      |-d 10MB-|

Tuning

The PR introduces a configuration tunable via, e.g.

import uproot
from uproot.source.coalesce import CoalesceConfig

url = "https://raw.githubusercontent.com/CoffeaTeam/coffea/master/tests/samples/nano_dy.root"
config = CoalesceConfig(
    max_range_gap=32 * 1024,
    max_request_ranges=1024,
    max_request_bytes=10 * 1024 * 1024,
    min_first_request_bytes=32 * 1024,
)

with uproot.open(url, coalesce_config=config) as fin:
    a = fin["Events"].arrays()

The defaults are not tuned at the moment. Further studies will be needed to choose good defaults.
edit: now that I think about it, min_first_request_bytes is probably not super important. We probably want to set it the same as max_request_bytes and both should be something maybe a factor 10 smaller than the total size of the data to read in a single source.chunks() call.

@nsmith-
Copy link
Collaborator Author

nsmith- commented Apr 17, 2024

The removal of the stateful handle is motivated by #1157 (comment)

@nsmith-
Copy link
Collaborator Author

nsmith- commented Apr 22, 2024

Only in python 3.11+ do we see

tests/test_0692_fsspec_reading.py::test_open_fsspec_s3[FSSpecSource] FAILED [ 70%]
...
E               ValueError: not a ROOT file: first four bytes are b'\xec_wu'
E               in file s3://pivarski-princeton/pythia_ppZee_run17emb.picoDst.root

looking into it...

edit: because in other versions the test is skipped:

tests/test_0692_fsspec_reading.py::test_open_fsspec_s3[FSSpecSource] SKIPPED [ 70%]
tests/test_0692_fsspec_reading.py::test_open_fsspec_s3[S3Source] SKIPPED [ 70%]
tests/test_0692_fsspec_reading.py::test_open_fsspec_s3[None] SKIPPED     [ 70%]

disabled in versions less than 3.11 in #1012

@nsmith-
Copy link
Collaborator Author

nsmith- commented May 6, 2024

The problem is the signature of fs.cat_file. For the file

import fsspec

fs, path = fsspec.url_to_fs("s3://pivarski-princeton/pythia_ppZee_run17emb.picoDst.root", anon=True)

If I create a file-object handle and read the first 4 bytes, I get:

fh = fs.open(path)
fh.seek(0)
assert fh.read(4) == b"root"

if I instead use cat_file, I find:

data = fs.cat_file(path, 0, 4)
assert len(data) == 4  # fails, len(data) is 978299156
assert data == b"root" # also fails, data[:4] is b'\x00\x00\xf1l'

So there is some mismatch in the behavior that I was initially puzzled by. Then I realized that, depending on the implementation, fs.cat_file can have different signatures and the s3fs one has a different order of keyword arguments. I made a UX issue in fsspec/filesystem_spec#1600 and used keywords here.

@nsmith- nsmith- requested a review from jpivarski May 7, 2024 12:56
Copy link
Member

@jpivarski jpivarski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it, src/uproot/source/coalesce.py is a suite of utility functions used only by FSSpecSource to merge nearby but not necessarily adjacent byte requests鈥攄ownload a bit too much data, but have (far?) fewer requests and responses in flight, which is a net savings, depending on how a network's throughput and latency compare.

Presumably, it would need to be tuned for each network. How are the default settings? (It looks to me like the coalesce functions are always used by FSSpecSource, so this will affect the default case.) The most conservative default is to only coalesce adjacent byte ranges, but the most common network scenario (grad student in a basement, trying to get data from CERN) is more latency-bound than throughput-bound. Maybe the default could accept 25% over-read or so.

I see that this also only affects FSSpecSource. It could, in principle, be applied to the pre-FSSpecSources, but there's less reason to do that. FSSpecSource has the many-requests-in-flight model built into it, and the old sources attempt to do vector reads (only one request in flight).

I'm in favor of this. If @lobis is available, it would be great to hear what you think! If we don't hear from you by Thursday, I think we should go ahead with this PR.

@nsmith-
Copy link
Collaborator Author

nsmith- commented May 7, 2024

The defaults are in spirit similar to the CMSSW defaults, though the use case is certainly different. I was hoping to gain some experience in the wild how different these are.

As for implementing it in pre-fsspec sources, indeed that is an option, and one I think may be useful. For example, even in the case of vector reads (which fsspec is now doing if the protocol supports, through the cat_ranges interface) there may be value in issuing separate reasonably-sized vector read requests rather than one large one.

@jpivarski
Copy link
Member

It's Thursday; I think we should go ahead with this PR. I'll update to main and then merge.

@jpivarski jpivarski enabled auto-merge (squash) May 10, 2024 00:03
@jpivarski jpivarski merged commit 13087b0 into scikit-hep:main May 10, 2024
24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants