Skip to content

Commit

Permalink
feat: Implement read coalescing algorithm (#1198)
Browse files Browse the repository at this point in the history
* Implement read coalescing algorithm

Tuning to be investigated

* style: pre-commit fixes

* Rename test file

* style: pre-commit fixes

* Ruff

* Drop usage of stateful file handle

* Use keyword args for start and end in fs.cat_file

* Add configuration options

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Jim Pivarski <jpivarski@users.noreply.github.com>
  • Loading branch information
3 people committed May 10, 2024
1 parent fcb5eec commit 13087b0
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 57 deletions.
132 changes: 132 additions & 0 deletions src/uproot/source/coalesce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""Read coalescing algorithms
Inspired in part by https://github.com/cms-sw/cmssw/blob/master/IOPool/TFileAdaptor/src/ReadRepacker.h
"""

from __future__ import annotations

import queue
from concurrent.futures import Future
from dataclasses import dataclass
from typing import Callable

import uproot.source.chunk


@dataclass
class CoalesceConfig:
max_range_gap: int = 32 * 1024
max_request_ranges: int = 1024
max_request_bytes: int = 10 * 1024 * 1024
min_first_request_bytes: int = 32 * 1024


DEFAULT_CONFIG = CoalesceConfig()


class SliceFuture:
def __init__(self, parent: Future, s: slice | int):
self._parent = parent
self._s = s

def add_done_callback(self, callback, *, context=None):
self._parent.add_done_callback(callback)

def result(self, timeout=None):
return self._parent.result(timeout=timeout)[self._s]


@dataclass
class RangeRequest:
start: int
stop: int
future: Future | None


@dataclass
class Cluster:
ranges: list[RangeRequest]

@property
def start(self):
# since these are built from sorted ranges, this is the min start
return self.ranges[0].start

@property
def stop(self):
return max(range.stop for range in self.ranges)

def __len__(self):
return self.stop - self.start

def set_future(self, future: Future):
for range in self.ranges:
local_start = range.start - self.start
local_stop = range.stop - self.start
range.future = SliceFuture(future, slice(local_start, local_stop))


@dataclass
class CoalescedRequest:
clusters: list[Cluster]

def ranges(self):
return [(cluster.start, cluster.stop) for cluster in self.clusters]

def set_future(self, future: Future):
for i, cluster in enumerate(self.clusters):
cluster.set_future(SliceFuture(future, i))


def _merge_adjacent(ranges: list[RangeRequest], config: CoalesceConfig):
sorted_ranges = sorted(ranges, key=lambda r: r.start)
cluster = Cluster([])
for current_range in sorted_ranges:
if cluster.ranges and current_range.start - cluster.stop > config.max_range_gap:
yield cluster
cluster = Cluster([])
cluster.ranges.append(current_range)
if cluster.ranges:
yield cluster


def _coalesce(ranges: list[RangeRequest], config: CoalesceConfig):
clusters: list[Cluster] = []
request_bytes: int = 0
first_request = True
for cluster in _merge_adjacent(ranges, config):
if clusters and (
len(clusters) + 1 >= config.max_request_ranges
or request_bytes + len(cluster) >= config.max_request_bytes
or (first_request and request_bytes >= config.min_first_request_bytes)
):
yield CoalescedRequest(clusters)
clusters = []
request_bytes = 0
first_request = False
clusters.append(cluster)
request_bytes += len(cluster)
if clusters:
yield CoalescedRequest(clusters)


def coalesce_requests(
ranges: list[tuple[int, int]],
submit_fn: Callable[[list[tuple[int, int]]], Future],
source: uproot.source.chunk.Source,
notifications: queue.Queue,
config: CoalesceConfig | None = None,
):
if config is None:
config = DEFAULT_CONFIG
all_requests = [RangeRequest(start, stop, None) for start, stop in ranges]
for merged_request in _coalesce(all_requests, config):
future = submit_fn(merged_request.ranges())
merged_request.set_future(future)

def chunkify(req: RangeRequest):
chunk = uproot.source.chunk.Chunk(source, req.start, req.stop, req.future)
req.future.add_done_callback(uproot.source.chunk.notifier(chunk, notifications))
return chunk

return list(map(chunkify, all_requests))
73 changes: 24 additions & 49 deletions src/uproot/source/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,14 @@
import uproot
import uproot.source.chunk
import uproot.source.futures


class PartFuture:
"""For splitting the result of fs._cat_ranges into its components"""

def __init__(self, parent_future: concurrent.futures.Future, part_index: int):
self._parent = parent_future
self._part_index = part_index

def add_done_callback(self, callback, *, context=None):
self._parent.add_done_callback(callback)

def result(self, timeout=None):
return self._parent.result(timeout=timeout)[self._part_index]
from uproot.source.coalesce import CoalesceConfig, coalesce_requests


class FSSpecSource(uproot.source.chunk.Source):
"""
Args:
file_path (str): A URL for the file to open.
coalesce_config (struct, optional): Configuration options for read coalescing
**kwargs (dict): any extra arguments to be forwarded to the particular
FileSystem instance constructor. This might include S3 access keys,
or HTTP headers, etc.
Expand All @@ -40,8 +28,11 @@ class FSSpecSource(uproot.source.chunk.Source):
to get many chunks in one request.
"""

def __init__(self, file_path: str, **options):
def __init__(
self, file_path: str, coalesce_config: CoalesceConfig | None = None, **options
):
super().__init__()
self._coalesce_config = coalesce_config
self._fs, self._file_path = fsspec.core.url_to_fs(
file_path, **self.extract_fsspec_options(options)
)
Expand All @@ -50,7 +41,6 @@ def __init__(self, file_path: str, **options):
self._async_impl = self._fs.async_impl

self._file = None
self._fh = None

self._open()

Expand All @@ -75,25 +65,20 @@ def __repr__(self):
return f"<{type(self).__name__} {path} at 0x{id(self):012x}>"

def __getstate__(self):
self._fh = None
state = dict(self.__dict__)
state.pop("_executor")
state.pop("_file")
state.pop("_fh")
return state

def __setstate__(self, state):
self.__dict__ = state
self._file = None
self._fh = None
self._open()

def __enter__(self):
self._fh = self._file.__enter__()
return self

def __exit__(self, exception_type, exception_value, traceback):
self._fh = None
self._file.__exit__(exception_type, exception_value, traceback)
self._executor.shutdown()

Expand All @@ -110,20 +95,16 @@ def chunk(self, start: int, stop: int) -> uproot.source.chunk.Chunk:
self._num_requests += 1
self._num_requested_chunks += 1
self._num_requested_bytes += stop - start
if self._fh:
self._fh.seek(start)
data = self._fh.read(stop - start)
else:
data = self._fs.cat_file(self._file_path, start, stop)
data = self._fs.cat_file(self._file_path, start=start, end=stop)
future = uproot.source.futures.TrivialFuture(data)
return uproot.source.chunk.Chunk(self, start, stop, future)

def chunks(
self, ranges: list[(int, int)], notifications: queue.Queue
self, ranges: list[tuple[int, int]], notifications: queue.Queue
) -> list[uproot.source.chunk.Chunk]:
"""
Args:
ranges (list of (int, int) 2-tuples): Intervals to fetch
ranges (list of tuple[int, int] 2-tuples): Intervals to fetch
as (start, stop) pairs in a single request, if possible.
notifications (``queue.Queue``): Indicator of completed
chunks. After each gets filled, it is ``put`` on the
Expand Down Expand Up @@ -171,29 +152,23 @@ async def async_wrapper_thread(blocking_func, *args, **kwargs):
# TODO: when python 3.8 is dropped, use `asyncio.to_thread` instead (also remove the try/except block above)
return await to_thread(blocking_func, *args, **kwargs)

paths = [self._file_path] * len(ranges)
starts = [start for start, _ in ranges]
ends = [stop for _, stop in ranges]
# _cat_ranges is async while cat_ranges is not.
coroutine = (
self._fs._cat_ranges(paths=paths, starts=starts, ends=ends)
if self._async_impl
else async_wrapper_thread(
self._fs.cat_ranges, paths=paths, starts=starts, ends=ends
def submit(request_ranges: list[tuple[int, int]]):
paths = [self._file_path] * len(request_ranges)
starts = [start for start, _ in request_ranges]
ends = [stop for _, stop in request_ranges]
# _cat_ranges is async while cat_ranges is not.
coroutine = (
self._fs._cat_ranges(paths=paths, starts=starts, ends=ends)
if self._async_impl
else async_wrapper_thread(
self._fs.cat_ranges, paths=paths, starts=starts, ends=ends
)
)
)

future = self._executor.submit(coroutine)
return self._executor.submit(coroutine)

chunks = []
for index, (start, stop) in enumerate(ranges):
chunk_future = PartFuture(future, index)
chunk = uproot.source.chunk.Chunk(self, start, stop, chunk_future)
chunk_future.add_done_callback(
uproot.source.chunk.notifier(chunk, notifications)
)
chunks.append(chunk)
return chunks
return coalesce_requests(
ranges, submit, self, notifications, config=self._coalesce_config
)

@property
def async_impl(self) -> bool:
Expand Down
8 changes: 0 additions & 8 deletions tests/test_0692_fsspec_reading.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ def test_open_fsspec_local():
)
def test_open_fsspec_s3(handler):
pytest.importorskip("s3fs")
if sys.version_info < (3, 11):
pytest.skip(
"https://github.com/scikit-hep/uproot5/pull/1012",
)

with uproot.open(
"s3://pivarski-princeton/pythia_ppZee_run17emb.picoDst.root:PicoDst",
Expand Down Expand Up @@ -461,10 +457,6 @@ def test_fsspec_globbing_xrootd_no_files(handler):
)
def test_fsspec_globbing_s3(handler):
pytest.importorskip("s3fs")
if sys.version_info < (3, 11):
pytest.skip(
"https://github.com/scikit-hep/uproot5/pull/1012",
)

iterator = uproot.iterate(
{"s3://pivarski-princeton/pythia_ppZee_run17emb.*.root": "PicoDst"},
Expand Down
39 changes: 39 additions & 0 deletions tests/test_1198_coalesce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import pytest
from uproot.source.coalesce import CoalesceConfig, RangeRequest, _coalesce, Future


@pytest.mark.parametrize(
"config",
[
CoalesceConfig(),
CoalesceConfig(max_range_gap=2, max_request_ranges=1),
],
ids=["default", "tiny"],
)
@pytest.mark.parametrize(
"ranges",
[
[(1, 3), (4, 6), (10, 20)],
[(1, 3), (10, 20), (4, 6), (9, 10)],
[(1, 3), (10, 20), (6, 15)],
[(1, 3), (10, 20), (6, 25)],
],
ids=["sorted", "jumbled", "overlapped", "nested"],
)
def test_coalesce(ranges, config):
data = b"abcdefghijklmnopqurstuvwxyz"

all_requests = [RangeRequest(start, stop, None) for start, stop in ranges]
nreq = 0
for merged_request in _coalesce(all_requests, config):
future = Future()
future.set_result([data[start:stop] for start, stop in merged_request.ranges()])
merged_request.set_future(future)
nreq += 1

if config.max_range_gap == 2:
assert nreq > 1

for req in all_requests:
assert req.future
assert req.future.result() == data[req.start : req.stop]

0 comments on commit 13087b0

Please sign in to comment.