From 200fe5f07eda15f13c654163cb3f12510abc94d2 Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Sat, 16 May 2026 16:05:15 +0100 Subject: [PATCH 1/2] Run the encoders in-process MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drops the separate multiprocessing.Process encoder-server and the AF_UNIX wire protocol. The VczReader and per-fh BedEncoder / BgenEncoder instances now live in the FUSE handler process. encoder.read, encoder.close, and reader teardown run on worker threads via trio.to_thread.run_sync so the pyfuse3 trio task stays responsive. The 30s per-read timeout and 2s aclose timeout are preserved — a slow read still surfaces EIO to the kernel rather than blocking the consumer indefinitely. On read timeout the worker thread is abandoned (abandon_on_cancel=True) and the handle is marked dead; aclose drains the abandoned thread via a threading.Event before closing the encoder, or logs a warning and leaks if the encoder is permanently wedged. The pyfuse3 mount Operations layer is largely untouched — it depends on a Protocol (renamed EncoderClientProto -> EncoderHostProto) that the new EncoderHost / StreamHandle satisfy. The accompanying tests (test_encoder_ops, test_plink_apps, test_bgen_apps) track the rename. Net diff: -2335 LOC across the deleted encoder_{client,server,protocol} modules and their dedicated test files. The glibc-arena fragmentation tuning called out in notes/memory_rss_investigation.md (MALLOC_ARENA_MAX, malloc_trim) is deferred to a separate change. --- biofuse/cli.py | 52 +-- biofuse/encoder_client.py | 391 ---------------- biofuse/encoder_host.py | 262 +++++++++++ biofuse/encoder_ops.py | 69 ++- biofuse/encoder_protocol.py | 132 ------ biofuse/encoder_server.py | 337 -------------- biofuse/formats.py | 22 +- tests/test_bgen_apps.py | 41 +- tests/test_encoder_client.py | 532 --------------------- tests/test_encoder_host.py | 477 +++++++++++++++++++ tests/test_encoder_ops.py | 137 +++--- tests/test_encoder_protocol.py | 124 ----- tests/test_encoder_server.py | 812 --------------------------------- tests/test_formats.py | 2 +- tests/test_plink_apps.py | 24 +- 15 files changed, 909 insertions(+), 2505 deletions(-) delete mode 100644 biofuse/encoder_client.py create mode 100644 biofuse/encoder_host.py delete mode 100644 biofuse/encoder_protocol.py delete mode 100644 biofuse/encoder_server.py delete mode 100644 tests/test_encoder_client.py create mode 100644 tests/test_encoder_host.py delete mode 100644 tests/test_encoder_protocol.py delete mode 100644 tests/test_encoder_server.py diff --git a/biofuse/cli.py b/biofuse/cli.py index 80f7bb8..ccbdde9 100644 --- a/biofuse/cli.py +++ b/biofuse/cli.py @@ -4,14 +4,13 @@ import pathlib import signal import sys -import tempfile from functools import wraps import click import trio import vcztools -from biofuse import access_log, encoder_client, encoder_ops, formats, fuse_adapter +from biofuse import access_log, encoder_host, encoder_ops, formats, fuse_adapter logger = logging.getLogger(__name__) @@ -76,11 +75,12 @@ def biofuse_main(): def mount_plink(vcz_url, mount_dir, basename, access_log_path, **kwargs): """Mount a PLINK 1.9 view of VCZ_URL at MOUNT_DIR. - Spawns a plink-server subprocess that owns the ``VczReader`` and - serves ``.bed`` reads over an ``AF_UNIX`` socket. ``.bim`` and - ``.fam`` are precomputed once at mount time and held in the FUSE - process's memory; only ``.bed`` reads cross the wire. ``--no-bim`` - and ``--no-fam`` suppress the corresponding sidecar from the mount. + The FUSE handler owns the ``VczReader`` and one fresh + :class:`vcztools.BedEncoder` per open ``.bed`` fh; ``encoder.read`` + runs on worker threads so the pyfuse3 main loop stays responsive. + ``.bim`` and ``.fam`` are precomputed once at mount time and held + in memory; only ``.bed`` reads invoke the encoder. ``--no-bim`` and + ``--no-fam`` suppress the corresponding sidecar from the mount. The bcftools-view-style filter / backend / log options are inherited from ``vcztools view-plink``; see ``vcztools view-plink --help`` for the @@ -105,10 +105,11 @@ def mount_plink(vcz_url, mount_dir, basename, access_log_path, **kwargs): def mount_bgen(vcz_url, mount_dir, basename, access_log_path, **kwargs): """Mount an Oxford BGEN view of VCZ_URL at MOUNT_DIR. - Spawns a bgen-server subprocess that owns the ``VczReader`` and - serves ``.bgen`` reads over an ``AF_UNIX`` socket. ``.sample`` and - ``.bgen.bgi`` are precomputed once at mount time and held in the - FUSE process's memory; only ``.bgen`` reads cross the wire. + The FUSE handler owns the ``VczReader`` and one fresh + :class:`vcztools.BgenEncoder` per open ``.bgen`` fh; ``encoder.read`` + runs on worker threads so the pyfuse3 main loop stays responsive. + ``.sample`` and ``.bgen.bgi`` are precomputed once at mount time + and held in memory; only ``.bgen`` reads invoke the encoder. ``--no-sample-file`` and ``--no-bgi`` suppress the corresponding sidecar; ``--no-header-samples`` drops the sample identifiers from the ``.bgen`` header block. ``--unphased`` ignores the input's @@ -144,18 +145,15 @@ def _run_mount( log_path = pathlib.Path(access_log_path) if access_log_path is not None else None resolved_basename = basename if basename is not None else _default_basename(vcz_url) - with tempfile.TemporaryDirectory(prefix="biofuse-") as sock_dir: - sock_path = pathlib.Path(sock_dir) / f"{spec.name}.sock" - trio.run( - _amount, - spec, - vcz_url, - str(mount_dir_path), - resolved_basename, - opts, - log_path, - sock_path, - ) + trio.run( + _amount, + spec, + vcz_url, + str(mount_dir_path), + resolved_basename, + opts, + log_path, + ) async def _amount( @@ -165,17 +163,15 @@ async def _amount( basename: str, opts, log_path: pathlib.Path | None, - sock_path: pathlib.Path, ) -> None: - async with await encoder_client.EncoderClient.start( + async with await encoder_host.EncoderHost.start( vcz_url, - sock_path, spec, opts=opts, - ) as client: + ) as host: with access_log.AccessLogger(log_path) as access_logger: ops = encoder_ops.EncoderOps( - client, basename, spec, access_logger=access_logger + host, basename, spec, access_logger=access_logger ) async with fuse_adapter.mount(ops, mount_dir): click.echo(f"mounted at {mount_dir}", err=True) diff --git a/biofuse/encoder_client.py b/biofuse/encoder_client.py deleted file mode 100644 index 27d39a5..0000000 --- a/biofuse/encoder_client.py +++ /dev/null @@ -1,391 +0,0 @@ -"""Parent-side async client for the encoder-server subprocess. - -:class:`EncoderClient` connects briefly at startup to fetch the -precomputed static-sidecar bytes (in the format spec's declared -order) and the streaming-file size, then for each streaming-file -``open()`` from the FUSE layer it spins up a fresh -:class:`StreamConnection` over a new ``AF_UNIX`` socket. Each -:class:`StreamConnection` is its own conversation with its own -server-side thread and encoder. -""" - -import errno -import logging -import multiprocessing as mp -import pathlib -import socket -import time -from collections.abc import Callable - -import trio -import vcztools - -from biofuse import encoder_protocol, encoder_server, formats - -logger = logging.getLogger(__name__) - - -_SHUTDOWN_GRACE_SECONDS = 5.0 -_CONNECT_RETRY_SLEEP_S = 0.05 -_CONNECT_DEADLINE_S = 10.0 - -# Per-operation deadlines for the parent → server protocol. The FUSE -# handler must never await indefinitely on the worker; on expiry we -# surface an ``OSError`` to the FUSE layer so the kernel sees a real -# I/O error and unblocks the consumer's syscall instead of pinning it -# in uninterruptible sleep. -_REQUEST_TIMEOUT_S = 30.0 -_OPEN_TIMEOUT_S = 5.0 -_ACLOSE_TIMEOUT_S = 2.0 - - -class StreamConnection: - """One streaming-file reader: a dedicated socket to the encoder-server. - - Reads on the same connection are serialised via an internal - ``trio.Lock`` because the wire protocol is request/reply - synchronous on a single socket. Different :class:`StreamConnection` - instances are fully independent — they live in different server - threads and do not contend with each other. - """ - - def __init__( - self, - stream: trio.SocketStream, - *, - on_aclose: Callable[[float, float], None] | None = None, - ) -> None: - self._stream = stream - self._lock = trio.Lock() - self._closed = False - self._on_aclose = on_aclose - - async def read(self, off: int, size: int) -> bytes: - if self._closed: - raise OSError(errno.EIO, "stream connection is closed") - request = encoder_protocol.pack_read_request(off, size) - with trio.move_on_after(_REQUEST_TIMEOUT_S) as cs: - async with self._lock: - if self._closed: - raise OSError(errno.EIO, "stream connection is closed") - await self._stream.send_all(request) - status_buf = await _recv_exact( - self._stream, encoder_protocol.REPLY_STATUS_SIZE - ) - status = encoder_protocol.parse_status(status_buf) - if status < 0: - raise encoder_protocol.status_to_error(status) - if status == 0: - return b"" - return await _recv_exact(self._stream, status) - # Reached only if ``move_on_after`` caught a Cancelled — the - # inner block always returns or raises through. Mark the - # connection dead so other tasks queued on ``self._lock`` wake - # to an immediate EIO instead of repeating the wait against a - # known-broken socket. - if not cs.cancelled_caught: # pragma: no cover - defensive - raise RuntimeError("encoder-server read fall-through") - self._closed = True - with trio.CancelScope(shield=True): - with trio.move_on_after(_ACLOSE_TIMEOUT_S): - try: - await self._stream.aclose() - except (trio.BrokenResourceError, OSError) as exc: - logger.debug("aclose after timeout raised: %s", exc) - raise OSError(errno.EIO, "encoder-server request timed out") - - async def aclose(self) -> None: - if self._closed: - return - self._closed = True - t_start = time.monotonic() - with trio.CancelScope(shield=True): - with trio.move_on_after(_ACLOSE_TIMEOUT_S) as cs: - try: - await self._stream.send_eof() - except ( - trio.ClosedResourceError, - trio.BrokenResourceError, - OSError, - ) as exc: - logger.debug("send_eof on stream connection raised: %s", exc) - await self._stream.aclose() - if cs.cancelled_caught: - logger.debug( - "stream connection aclose timed out after %.1fs", - _ACLOSE_TIMEOUT_S, - ) - if self._on_aclose is not None: - try: - self._on_aclose(t_start, time.monotonic()) - except Exception as exc: # noqa: BLE001 - never let logging blow up cleanup - logger.debug("on_aclose hook raised: %s", exc) - - -class EncoderClient: - """Parent-side client for one mounted encoder-server subprocess. - - Construct via :meth:`EncoderClient.start` (an async classmethod - that spawns the server, runs the metadata handshake, and returns - the ready client). The instance is also an async context manager; - ``aclose()`` signals the server to stop, joins the subprocess, - and unlinks the listener socket file. - - Attributes populated by the handshake: - - - ``static_files``: dict mapping each suffix returned by - ``spec.static_suffixes(opts)`` to its precomputed bytes. - - ``stream_size``: total byte size of the streaming file. - """ - - def __init__(self, spec: formats.FormatSpec, opts) -> None: - self.spec = spec - self.opts = opts - self.static_files: dict[str, bytes] = {} - self.stream_size: int = 0 - self._proc: mp.process.BaseProcess | None = None - self._socket_path: pathlib.Path | None = None - self._stop_sock: socket.socket | None = None - self._closed = False - - @classmethod - async def start( - cls, - vcz_url: str, - socket_path: pathlib.Path, - spec: formats.FormatSpec, - *, - opts=None, - ) -> "EncoderClient": - """Spawn the server, run the metadata handshake, return client. - - The parent creates the listener and the stop-signal socketpair - itself, then hands both to the child. Multiprocessing's socket - reduction dups the fds across the spawn boundary; the parent - closes its own copies once the child has started. - - ``opts`` is the ``vcztools.ViewPlinkOptions`` (for - ``spec.name == "plink"``) or ``vcztools.ViewBgenOptions`` - dataclass parsed from the CLI. It carries the bcftools-style - filtering options forwarded to ``opts.make_reader`` in the - worker, plus the per-mount feature toggles (``no_bim``, - ``no_bgi``, …) and the log-level / log-file settings the - worker applies on startup. Defaults to a fresh instance of the - format's options dataclass (every field at its dataclass - default) when ``None``. - """ - if opts is None: - if spec.name == "plink": - opts = vcztools.ViewPlinkOptions() - else: - opts = vcztools.ViewBgenOptions() - socket_path = pathlib.Path(socket_path) - socket_path.parent.mkdir(parents=True, exist_ok=True) - if socket_path.exists(): - socket_path.unlink() - listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - listener.bind(str(socket_path)) - listener.listen(64) - parent_stop, child_stop = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) - ctx = mp.get_context("spawn") - proc: mp.process.BaseProcess = ctx.Process( - target=encoder_server._server_main, - args=(listener, child_stop, vcz_url, spec, opts), - name=f"biofuse-{spec.name}-server", - ) - try: - proc.start() - finally: - listener.close() - child_stop.close() - - self = cls(spec, opts) - self._proc = proc - self._socket_path = socket_path - self._stop_sock = parent_stop - try: - await self._handshake() - except BaseException as exc: - # If the subprocess has exited on its own during startup - # the handshake failure is a downstream symptom (refused - # connect, RST mid frame, premature EOF). Surface a single - # clean OSError so the CLI prints one error line and - # points the user at the server's own log. Check - # ``is_alive`` before ``aclose`` — the latter signals stop - # and joins the child, after which ``exitcode`` is set - # regardless of why the child exited. - child_died_on_own = not proc.is_alive() - await self.aclose() - if child_died_on_own and not isinstance(exc, KeyboardInterrupt): - raise OSError( - errno.EIO, - f"{spec.name}-server exited during startup; " - "see log above for details", - ) from exc - raise - return self - - async def __aenter__(self) -> "EncoderClient": - return self - - async def __aexit__(self, exc_type, exc, tb) -> None: - await self.aclose() - - async def open_stream( - self, - *, - on_aclose: Callable[[float, float], None] | None = None, - ) -> StreamConnection: - """Open a new dedicated socket for one streaming-file reader.""" - stream = await self._connect_stream() - return StreamConnection(stream, on_aclose=on_aclose) - - async def aclose(self) -> None: - """Tear down the server. Idempotent. - - Closes the parent end of the stop-signal socketpair so the - server's accept loop wakes via ``select`` and exits, joins - the subprocess (escalating to SIGTERM / SIGKILL after - ``_SHUTDOWN_GRACE_SECONDS``), and unlinks the listener path. - """ - if self._closed: - return - self._closed = True - if self._stop_sock is not None: - try: - self._stop_sock.close() - except OSError: - pass - self._stop_sock = None - if self._proc is not None: - await trio.to_thread.run_sync(self._sync_join_proc) - if self._socket_path is not None and self._socket_path.exists(): - try: - self._socket_path.unlink() - except OSError as exc: - logger.debug("unlink socket path raised: %s", exc) - - # -- internals ------------------------------------------------------ - - async def _handshake(self) -> None: - stream = await self._connect_stream(retry_until_listening=True) - try: - await stream.send_all(encoder_protocol.pack_get_metadata_request()) - status_buf = await _recv_exact(stream, encoder_protocol.REPLY_STATUS_SIZE) - status = encoder_protocol.parse_status(status_buf) - if status < 0: - raise encoder_protocol.status_to_error(status) - prefix = await _recv_exact(stream, encoder_protocol.META_PREFIX_SIZE) - n_static, stream_size = encoder_protocol.parse_metadata_prefix(prefix) - expected_suffixes = self.spec.static_suffixes(self.opts) - if n_static != len(expected_suffixes): - raise OSError( - errno.EIO, - f"{self.spec.name}-server reported {n_static} static files; " - f"client expected {len(expected_suffixes)}", - ) - sizes_buf = await _recv_exact( - stream, n_static * encoder_protocol.META_SIZE_ENTRY_SIZE - ) - sizes = encoder_protocol.parse_static_sizes(sizes_buf, n_static) - self.static_files = { - suffix: await _recv_exact(stream, size) - for suffix, size in zip(expected_suffixes, sizes, strict=True) - } - self.stream_size = stream_size - finally: - try: - await stream.send_eof() - except ( - trio.ClosedResourceError, - trio.BrokenResourceError, - OSError, - ): - pass - await stream.aclose() - - async def _connect_stream( - self, *, retry_until_listening: bool = False - ) -> trio.SocketStream: - """Open a fresh ``AF_UNIX`` connection to the server. - - With ``retry_until_listening=True`` the call retries briefly - while the child is still bringing up its accept loop, bounded - by ``_CONNECT_DEADLINE_S``. If the child has already exited - (typically due to a startup failure that ``_server_main`` - caught and logged) the retry loop bails out immediately - instead of waiting out the deadline. - """ - assert self._socket_path is not None - path = str(self._socket_path) - deadline = trio.current_time() + _CONNECT_DEADLINE_S - last_exc: BaseException | None = None - while True: - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.setblocking(False) - trio_sock = trio.socket.from_stdlib_socket(sock) - try: - with trio.fail_after(_OPEN_TIMEOUT_S): - await trio_sock.connect(path) - return trio.SocketStream(trio_sock) - except trio.TooSlowError as exc: - trio_sock.close() - raise OSError( - errno.EIO, - f"encoder-server connect timed out after {_OPEN_TIMEOUT_S:.1f}s", - ) from exc - except (FileNotFoundError, ConnectionRefusedError, OSError) as exc: - trio_sock.close() - last_exc = exc - if not retry_until_listening: - raise - if self._proc is not None and not self._proc.is_alive(): - raise OSError( - errno.EIO, - f"{self.spec.name}-server exited during startup; " - "see log above for details", - ) from last_exc - if trio.current_time() > deadline: - raise OSError( - errno.EIO, - f"{self.spec.name}-server not listening at {path}: {exc}", - ) from last_exc - await trio.sleep(_CONNECT_RETRY_SLEEP_S) - - def _sync_join_proc(self) -> None: - proc = self._proc - if proc is None: - return - if proc.is_alive(): - proc.join(timeout=_SHUTDOWN_GRACE_SECONDS) - if proc.is_alive(): - logger.warning("%s-server did not exit; sending SIGTERM", self.spec.name) - proc.terminate() - proc.join(timeout=_SHUTDOWN_GRACE_SECONDS) - if proc.is_alive(): - logger.warning( - "%s-server still alive after SIGTERM; killing", self.spec.name - ) - proc.kill() - proc.join(timeout=_SHUTDOWN_GRACE_SECONDS) - - -async def _recv_exact(stream: trio.SocketStream, n: int) -> bytes: - """Read exactly ``n`` bytes off ``stream``, with a guaranteed checkpoint. - - Raises :class:`OSError(EIO)` on EOF, which the caller surfaces to - the FUSE layer as ``FUSEError(EIO)``. - """ - if n == 0: - await trio.lowlevel.checkpoint() - return b"" - buf = bytearray(n) - view = memoryview(buf) - got = 0 - while got < n: - chunk = await stream.receive_some(n - got) - if len(chunk) == 0: - raise OSError(errno.EIO, "encoder-server closed socket") - view[got : got + len(chunk)] = chunk - got += len(chunk) - return bytes(buf) diff --git a/biofuse/encoder_host.py b/biofuse/encoder_host.py new file mode 100644 index 0000000..e9f5f13 --- /dev/null +++ b/biofuse/encoder_host.py @@ -0,0 +1,262 @@ +"""In-process host for the per-format encoders. + +Replaces the previous ``encoder_client`` / ``encoder_server`` / +``encoder_protocol`` stack: the ``VczReader`` and the per-fh +:class:`~vcztools.format_encoder.FormatEncoder` instances live in the +FUSE handler process. Heavy blocking work (encoder construction, +``encoder.read``, ``encoder.close``, and reader teardown) is dispatched +to worker threads via :func:`trio.to_thread.run_sync` so the pyfuse3 +trio task is free to schedule other FUSE requests. + +:class:`EncoderHost` mirrors the public surface the FUSE Operations +layer expects (``static_files``, ``stream_size``, ``open_stream`` -> +:class:`StreamHandle`) so :class:`biofuse.encoder_ops.EncoderOps` is +agnostic to whether the encoder lives in-process or out-of-process. + +Per-read timeout handling matters because the trio thread cannot kill +a worker thread. The read path uses ``abandon_on_cancel=True`` so the +trio task wakes immediately on timeout, but the worker thread keeps +running ``encoder.read`` to completion. :class:`StreamHandle` tracks +the thread via a ``threading.Event`` that the worker itself sets in a +``finally`` block, and :meth:`StreamHandle.aclose` drains that event +before calling ``encoder.close`` — concurrent close+read on the same +encoder is unsafe. +""" + +import errno +import logging +import threading +import time +from collections.abc import Callable + +import trio + +from biofuse import formats + +logger = logging.getLogger(__name__) + + +# Per-request deadline for ``encoder.read``. Slow reads under high I/O +# load do legitimately happen; surfacing EIO to the FUSE syscall is +# better than pinning the consumer in uninterruptible sleep. +_REQUEST_TIMEOUT_S = 30.0 + +# Outer deadline for ``StreamHandle.aclose`` — covers draining any +# abandoned worker thread plus the ``encoder.close`` call. If the +# abandoned thread is permanently wedged we log a warning and leak the +# encoder rather than hang unmount indefinitely. +_ACLOSE_TIMEOUT_S = 2.0 + + +class StreamHandle: + """One streaming-file reader. + + Owns one :class:`~vcztools.format_encoder.FormatEncoder`. Reads + serialise on an internal ``trio.Lock`` — :class:`FormatEncoder` + mutates iterator state in place, so at most one worker thread may + enter ``encoder.read`` at a time. ``read`` and ``aclose`` + off-thread the blocking calls via :func:`trio.to_thread.run_sync`. + + On a per-read timeout the handle is marked closed and subsequent + reads return ``OSError(EIO)`` immediately; the abandoned worker + thread eventually returns, at which point :meth:`aclose` may close + the encoder. + """ + + def __init__( + self, + encoder, + *, + on_aclose: Callable[[float, float], None] | None = None, + ) -> None: + self._encoder = encoder + self._lock = trio.Lock() + self._closed = False + self._aclose_called = False + self._on_aclose = on_aclose + # Cleared on read entry, set by the worker thread on return. + # Used by ``aclose`` to drain any abandoned worker before + # closing the encoder. Initial state = idle. + self._thread_done = threading.Event() + self._thread_done.set() + + async def read(self, off: int, size: int) -> bytes: + if self._closed: + raise OSError(errno.EIO, "stream handle is closed") + async with self._lock: + if self._closed: + raise OSError(errno.EIO, "stream handle is closed") + self._thread_done.clear() + encoder = self._encoder + thread_done = self._thread_done + + def call() -> bytes: + try: + return encoder.read(off, size) + finally: + thread_done.set() + + with trio.move_on_after(_REQUEST_TIMEOUT_S) as cs: + try: + return await trio.to_thread.run_sync(call, abandon_on_cancel=True) + except OSError: + raise + except Exception as exc: + # Non-OSError encoder failures (``ValueError`` for + # haploid PLINK, ``NotImplementedError`` for mixed + # ploidy BGEN, …) become ``OSError(EIO)`` so the + # FUSE layer surfaces a real I/O error to the + # kernel rather than crashing the trio task. + logger.error("encoder.read raised; converting to EIO: %s", exc) + logger.debug("encoder.read traceback", exc_info=True) + raise OSError(errno.EIO, f"encoder.read failed: {exc}") from exc + if cs.cancelled_caught: + # Mark dead so subsequent reads on this fh return EIO + # immediately rather than queueing behind a known-stuck + # worker. The abandoned thread will eventually set + # ``_thread_done``; ``aclose`` waits for it. + self._closed = True + raise OSError(errno.EIO, "encoder read timed out") + raise RuntimeError("encoder read fall-through") # pragma: no cover + + async def aclose(self) -> None: + if self._aclose_called: + return + self._aclose_called = True + self._closed = True + t_start = time.monotonic() + # Wait for any abandoned worker thread to finish before + # calling ``encoder.close``. The encoder's own + # ``ThreadPoolExecutor`` is shut down inside ``close``; + # in-flight tasks must drain first. ``Event.wait`` carries + # its own timeout because cancelling ``to_thread.run_sync`` + # from outside cannot interrupt a thread blocked on a + # ``threading.Event``. ``abandon_on_cancel=True`` shields the + # trio task from a still-running drain thread on shutdown. + drained = await trio.to_thread.run_sync( + self._thread_done.wait, + _ACLOSE_TIMEOUT_S, + abandon_on_cancel=True, + ) + if drained: + await trio.to_thread.run_sync(self._encoder.close, abandon_on_cancel=True) + else: + logger.warning( + "encoder did not finish draining within %.1fs; " + "leaking encoder and its thread pool", + _ACLOSE_TIMEOUT_S, + ) + if self._on_aclose is not None: + try: + self._on_aclose(t_start, time.monotonic()) + except Exception as exc: # noqa: BLE001 - never let logging blow up cleanup + logger.debug("on_aclose hook raised: %s", exc) + + +class EncoderHost: + """Parent-process host for one mounted view. + + Construct via :meth:`EncoderHost.start` — an async classmethod + that opens the reader, materialises the variant filter, and builds + the static-sidecar bytes on a worker thread, returning a ready + host. The instance is also an async context manager; ``aclose`` + closes the reader. + + Attributes populated by ``start``: + + - ``static_files``: dict mapping each suffix returned by + ``spec.static_suffixes(opts)`` to its precomputed bytes. + - ``stream_size``: total byte size of the streaming file. + """ + + def __init__(self, spec: formats.FormatSpec, opts) -> None: + self.spec = spec + self.opts = opts + self.static_files: dict[str, bytes] = {} + self.stream_size: int = 0 + self._reader = None + self._closed = False + + @classmethod + async def start( + cls, + vcz_url: str, + spec: formats.FormatSpec, + *, + opts, + ) -> "EncoderHost": + """Open the reader and build the static-sidecar bytes.""" + self = cls(spec, opts) + try: + await trio.to_thread.run_sync(self._sync_start, vcz_url) + except BaseException: + await self.aclose() + raise + return self + + def _sync_start(self, vcz_url: str) -> None: + reader = self.opts.make_reader(vcz_url) + try: + reader.materialise_variant_filter() + expected_suffixes = self.spec.static_suffixes(self.opts) + static_files = self.spec.build_static_files(reader, self.opts) + missing = set(expected_suffixes) - set(static_files) + extra = set(static_files) - set(expected_suffixes) + if missing or extra: + raise ValueError( + f"{self.spec.name}: build_static_files returned keys " + f"{sorted(static_files)}; expected {list(expected_suffixes)}" + ) + with self.spec.encoder_factory(reader, self.opts) as encoder: + stream_size = int(encoder.total_size) + except BaseException: + reader.__exit__(None, None, None) + raise + # Keep ordering in the static_files dict equal to the spec's + # declared order — the FUSE adapter does not depend on this + # but it keeps logs / diagnostics stable. + self.static_files = { + suffix: static_files[suffix] for suffix in expected_suffixes + } + self.stream_size = stream_size + self._reader = reader + + async def __aenter__(self) -> "EncoderHost": + return self + + async def __aexit__(self, exc_type, exc, tb) -> None: + await self.aclose() + + async def open_stream( + self, + *, + on_aclose: Callable[[float, float], None] | None = None, + ) -> StreamHandle: + """Construct a fresh encoder and wrap it in a :class:`StreamHandle`. + + Encoder construction runs in a worker thread because it + touches the reader's metadata caches and spins up a + ``ThreadPoolExecutor``. + """ + if self._reader is None: + raise OSError(errno.EIO, "encoder host is closed") + encoder = await trio.to_thread.run_sync(self._sync_open_encoder) + return StreamHandle(encoder, on_aclose=on_aclose) + + def _sync_open_encoder(self): + cm = self.spec.encoder_factory(self._reader, self.opts) + encoder = cm.__enter__() + # We rely on ``encoder.close`` (called by ``StreamHandle.aclose``) + # to release the encoder's resources; the context-manager + # protocol on ``FormatEncoder`` is a thin wrapper around it. + return encoder + + async def aclose(self) -> None: + """Close the reader. Idempotent.""" + if self._closed: + return + self._closed = True + reader = self._reader + self._reader = None + if reader is not None: + await trio.to_thread.run_sync(reader.__exit__, None, None, None) diff --git a/biofuse/encoder_ops.py b/biofuse/encoder_ops.py index c45eb3d..0e617a0 100644 --- a/biofuse/encoder_ops.py +++ b/biofuse/encoder_ops.py @@ -6,21 +6,18 @@ - the streaming file suffix (``.bed`` / ``.bgen``) and its ``streaming_kind`` dispatch key; - the static-sidecar suffixes (``.bim``/``.fam`` for PLINK, - ``.sample``/``.bgen.bgi`` for BGEN), served from cached bytes the - client fetched once at mount time. - -The static-sidecar bytes are fetched once at mount time and held in -memory by the FUSE adapter; reads against them are served directly -from the cached bytes without crossing process boundaries. - -Each streaming-file ``open()`` from the kernel allocates a fresh -:class:`biofuse.encoder_client.StreamConnection` — a dedicated socket -to the encoder-server subprocess, where one server thread with one -encoder runs synchronously for the lifetime of the connection. -``read()`` on that fh forwards straight to the per-fh socket; the + ``.sample``/``.bgen.bgi`` for BGEN), built once at mount time and + held in the host's memory. + +Static-sidecar reads are served directly from the cached bytes. Each +streaming-file ``open()`` from the kernel allocates a fresh +:class:`biofuse.encoder_host.StreamHandle` — a dedicated +:class:`~vcztools.format_encoder.FormatEncoder` whose blocking work +runs on worker threads via :func:`trio.to_thread.run_sync`, leaving +the pyfuse3 trio task free for other FUSE requests. ``read()`` on +that fh dispatches one ``encoder.read`` call per request; the kernel's parallel readahead requests on a single fh serialise on the -``StreamConnection``'s internal lock. ``release()`` closes the socket; -the server thread sees EOF and exits. +``StreamHandle``'s internal lock. ``release()`` closes the encoder. """ import errno @@ -53,23 +50,23 @@ _STATIC_KIND = "static" -class _StreamConnectionProto(Protocol): +class _StreamHandleProto(Protocol): async def read(self, off: int, size: int) -> bytes: ... async def aclose(self) -> None: ... -class EncoderClientProto(Protocol): - """The slice of :class:`biofuse.encoder_client.EncoderClient` that +class EncoderHostProto(Protocol): + """The slice of :class:`biofuse.encoder_host.EncoderHost` that :class:`EncoderOps` depends on. - Defined as a Protocol so tests can inject a fake without spawning - a subprocess. + Defined as a Protocol so tests can inject a fake without opening + a real reader. """ static_files: dict[str, bytes] stream_size: int - async def open_stream(self) -> _StreamConnectionProto: ... + async def open_stream(self) -> _StreamHandleProto: ... class EncoderOps(pyfuse3.Operations): @@ -77,20 +74,20 @@ class EncoderOps(pyfuse3.Operations): Parameters ---------- - client - An :class:`EncoderClient` already past its metadata handshake - (or any object satisfying :class:`EncoderClientProto`). The - caller owns the client's lifetime. + host + An :class:`~biofuse.encoder_host.EncoderHost` whose ``start`` + has populated ``static_files`` and ``stream_size`` (or any + object satisfying :class:`EncoderHostProto`). The caller owns + the host's lifetime. basename Stem used for the exposed files: ``{basename}{spec.streaming_suffix}`` - plus one ``{basename}{suffix}`` per entry of the static-files - dict the client received during the metadata handshake. + plus one ``{basename}{suffix}`` per entry of ``host.static_files``. spec The active :class:`~biofuse.formats.FormatSpec`. Its ``streaming_suffix`` is exposed as the streaming filename; ``streaming_kind`` is the dispatch key for read routing. - The set of static suffixes is read off ``client.static_files``, - which is the post-options filtered set the server actually + The set of static suffixes is read off ``host.static_files``, + which is the post-options filtered set the host actually produced. max_open_stream Maximum number of concurrent open streaming-file fhs. New @@ -106,7 +103,7 @@ class EncoderOps(pyfuse3.Operations): def __init__( self, - client: EncoderClientProto, + host: EncoderHostProto, basename: str, spec: formats.FormatSpec, *, @@ -114,7 +111,7 @@ def __init__( access_logger: access_log_mod.AccessLogger | None = None, ) -> None: super().__init__() - self._client = client + self._host = host self._basename = basename self._spec = spec self._access_logger = access_logger @@ -127,13 +124,13 @@ def __init__( # the streaming file's kind is the spec's ``streaming_kind``; # all static files share the same ``_STATIC_KIND`` dispatch key. # ``_name_to_suffix`` maps each static filename to its suffix - # for lookups into ``client.static_files``. + # for lookups into ``host.static_files``. stream_name = f"{basename}{spec.streaming_suffix}" manifest: list[tuple[str, str, int]] = [ - (stream_name, spec.streaming_kind, client.stream_size) + (stream_name, spec.streaming_kind, host.stream_size) ] self._name_to_suffix: dict[str, str] = {} - for suffix, body in client.static_files.items(): + for suffix, body in host.static_files.items(): name = f"{basename}{suffix}" manifest.append((name, _STATIC_KIND, len(body))) self._name_to_suffix[name] = suffix @@ -153,7 +150,7 @@ def __init__( self._next_fh = 1 self._fh_to_kind: dict[int, str] = {} self._fh_to_name: dict[int, str] = {} - self._fh_to_conn: dict[int, _StreamConnectionProto] = {} + self._fh_to_conn: dict[int, _StreamHandleProto] = {} def _build_attrs(self, inode: int) -> pyfuse3.EntryAttributes: attrs = pyfuse3.EntryAttributes() @@ -234,7 +231,7 @@ async def open(self, inode, flags, ctx=None): self._record_event("limiter_wait", name, fh, t_limiter_start, t_limiter_end) try: on_aclose = self._make_aclose_recorder(name, fh) - conn = await self._client.open_stream(on_aclose=on_aclose) + conn = await self._host.open_stream(on_aclose=on_aclose) except OSError as exc: self._stream_limiter.release_on_behalf_of(fh) raise pyfuse3.FUSEError(exc.errno or errno.EIO) from exc @@ -272,7 +269,7 @@ async def read(self, fh, off, size): t_start = time.monotonic() if kind == _STATIC_KIND: suffix = self._name_to_suffix[name] - data = self._read_static(self._client.static_files[suffix], off, size) + data = self._read_static(self._host.static_files[suffix], off, size) elif kind == self._spec.streaming_kind: assert conn is not None try: diff --git a/biofuse/encoder_protocol.py b/biofuse/encoder_protocol.py deleted file mode 100644 index a6fd763..0000000 --- a/biofuse/encoder_protocol.py +++ /dev/null @@ -1,132 +0,0 @@ -"""Wire protocol for the encoder-server subprocess. - -A minimal request/reply protocol exchanged over a connected -``AF_UNIX`` ``SOCK_STREAM`` socket. Each socket carries one -synchronous conversation: the parent sends a request, awaits the -reply, then sends the next request. There is no ``seq`` id and no -``fh`` field — the *socket is the channel*. - -Two request shapes ------------------- - -``M`` (1 byte) — get metadata. No payload. - Reply body: ```` then - ``n_static`` × ```` then the concatenated static-file - bodies in the order declared by the format spec. ``status`` on - success is the total body length in bytes. - -``R off:Q size:Q`` (17 bytes) — read ``size`` bytes at ``off`` from - the connection's streaming file. Reply body is the data bytes; - ``status`` is the body length. - -Reply layout (common) ---------------------- - -```` followed by ``status`` bytes of body when -``status >= 0``. ``status < 0`` is an error reply: ``-status`` is a -POSIX errno and no body follows. -""" - -import errno -import struct - -TAG_GET_METADATA = b"M" -TAG_READ = b"R" - -_REQ_READ = struct.Struct("`` entries, then the concatenated static bodies. -_META_PREFIX = struct.Struct(" bytes: - return TAG_GET_METADATA - - -def pack_read_request(off: int, size: int) -> bytes: - return TAG_READ + _REQ_READ.pack(off, size) - - -def parse_read_payload(buf: bytes) -> tuple[int, int]: - """Decode the 16-byte READ payload into ``(off, size)``.""" - return _REQ_READ.unpack(buf) - - -# -- reply encoding ----------------------------------------------------- - - -def pack_metadata_reply(static_bodies: list[bytes], stream_size: int) -> bytes: - """Pack a variable-arity metadata reply. - - ``static_bodies`` is the per-static-file payload, in the spec's - declared order. The reply body is the prefix + per-static sizes + - the concatenated bodies; ``status`` is the body length. - """ - n_static = len(static_bodies) - prefix = _META_PREFIX.pack(n_static, stream_size) - sizes = b"".join(_META_SIZE_ENTRY.pack(len(b)) for b in static_bodies) - body = prefix + sizes + b"".join(static_bodies) - return _REPLY_STATUS.pack(len(body)) + body - - -def pack_read_reply(data: bytes) -> bytes: - return _REPLY_STATUS.pack(len(data)) + data - - -def pack_error_reply(err: int) -> bytes: - if err <= 0: - raise ValueError(f"err must be a positive errno (got {err})") - return _REPLY_STATUS.pack(-err) - - -# -- reply decoding ----------------------------------------------------- - - -def parse_status(buf: bytes) -> int: - (status,) = _REPLY_STATUS.unpack(buf) - return status - - -def parse_metadata_prefix(buf: bytes) -> tuple[int, int]: - """Decode the ```` prefix of a metadata reply.""" - return _META_PREFIX.unpack(buf) - - -def parse_static_sizes(buf: bytes, n_static: int) -> tuple[int, ...]: - """Decode the per-static-file size array following the prefix.""" - return tuple( - _META_SIZE_ENTRY.unpack_from(buf, offset=i * META_SIZE_ENTRY_SIZE)[0] - for i in range(n_static) - ) - - -# -- shared helpers ----------------------------------------------------- - - -def status_to_error(status: int) -> OSError: - """Build an ``OSError`` from a negative reply status.""" - if status >= 0: - raise ValueError(f"status must be negative (got {status})") - err = -status - return OSError(err, f"encoder-server reported errno {err}") - - -def errno_for_exception(exc: BaseException) -> int: - """Pick a positive errno for any exception raised in the server. - - ``OSError`` is honoured directly; everything else becomes ``EIO``. - """ - if isinstance(exc, OSError) and exc.errno is not None: - return exc.errno - return errno.EIO diff --git a/biofuse/encoder_server.py b/biofuse/encoder_server.py deleted file mode 100644 index 6c7c5cb..0000000 --- a/biofuse/encoder_server.py +++ /dev/null @@ -1,337 +0,0 @@ -"""Encoder-server subprocess. - -A standalone server that owns one ``VczReader`` and serves byte-range -requests for one output format (PLINK 1 binary / Oxford BGEN) over an -``AF_UNIX`` listening socket. The owning :class:`~biofuse.formats.FormatSpec` -selects: - -- which static sidecar files to build on demand for the metadata - handshake (``.bim``/``.fam`` for PLINK; ``.sample``/``.bgen.bgi`` - for BGEN), via :meth:`FormatSpec.build_static_files`; -- which encoder class to construct per accepted connection, via - :meth:`FormatSpec.encoder_factory`. - -Each accepted connection runs in its own daemon thread with its own -encoder, so different consumers of the same streaming file get -independent state and run concurrently. - -Wire protocol is :mod:`biofuse.encoder_protocol` — synchronous, -per-socket, no seq ids: the socket *is* the channel. Server threads -serve one request at a time on their socket and exit cleanly when the -parent half-closes. -""" - -import logging -import select -import socket -import threading -import time - -from biofuse import encoder_protocol, formats - -logger = logging.getLogger(__name__) - - -class _ServerSession: - """Server-side state shared across all connection threads. - - Holds the ``VczReader``, the active format spec, and the options - dataclass that selected this mount's feature flags - (``--no-bim``/``--no-bgi``/``--no-header-samples``/…). The static - sidecar bytes and the streaming-file size are built on demand - inside the metadata handshake (see ``_make_metadata_reply``) - rather than cached here — the parent fuse handler is the canonical - owner of that metadata after the handshake completes. Immutable - after construction; safe to read concurrently from any thread - without locking. - """ - - def __init__( - self, - reader, - spec: formats.FormatSpec, - opts, - ) -> None: - self.reader = reader - self.spec = spec - self.opts = opts - - -def _recv_exact_sync(sock: socket.socket, n: int) -> bytes: - """Read exactly ``n`` bytes off ``sock``. Returns ``b""`` on clean EOF. - - Raises :class:`EOFError` if EOF arrives mid-frame. - """ - if n == 0: - return b"" - buf = bytearray(n) - view = memoryview(buf) - got = 0 - while got < n: - chunk = sock.recv_into(view[got:], n - got) - if chunk == 0: - if got == 0: - return b"" - raise EOFError(f"socket closed mid-frame after {got}/{n} bytes") - got += chunk - return bytes(buf) - - -def _make_error_reply(exc: BaseException, context: str) -> bytes: - """Pack an errno reply for ``exc`` and log the cause. - - Non-``OSError`` causes log a one-line ERROR with the errno and - the exception text, plus a DEBUG-only traceback. Used by both - the encoder-construction failure path and the per-dispatch - exception handler. - """ - err = encoder_protocol.errno_for_exception(exc) - if not isinstance(exc, OSError): - logger.error("encoder-server %s; replying with errno %d: %s", context, err, exc) - logger.debug("encoder-server %s traceback", context, exc_info=True) - return encoder_protocol.pack_error_reply(err) - - -def _make_metadata_reply(session: "_ServerSession") -> bytes: - """Build the reply for a ``TAG_GET_METADATA`` request. - - The static sidecars and the streaming-file size are built on - demand from the format spec, serialised into the reply, then - dropped: the parent fuse handler owns the canonical copy after - the handshake. Encoder construction is I/O-free so reading - ``total_size`` here is cheap. - """ - expected_suffixes = session.spec.static_suffixes(session.opts) - static_files = session.spec.build_static_files(session.reader, session.opts) - missing = set(expected_suffixes) - set(static_files) - extra = set(static_files) - set(expected_suffixes) - if missing or extra: - raise ValueError( - f"{session.spec.name}: build_static_files returned keys " - f"{sorted(static_files)}; expected {list(expected_suffixes)}" - ) - with session.spec.encoder_factory(session.reader, session.opts) as encoder: - stream_size = int(encoder.total_size) - ordered_bodies = [static_files[suffix] for suffix in expected_suffixes] - return encoder_protocol.pack_metadata_reply(ordered_bodies, stream_size) - - -def _make_read_reply(conn_sock: socket.socket, encoder, tname: str) -> bytes | None: - """Build the reply for a ``TAG_READ`` request, or ``None`` on - truncated payload (caller terminates the connection).""" - payload = _recv_exact_sync(conn_sock, encoder_protocol.REQ_READ_PAYLOAD_SIZE) - if len(payload) < encoder_protocol.REQ_READ_PAYLOAD_SIZE: - return None - off, size = encoder_protocol.parse_read_payload(payload) - t_read = time.monotonic() - data = encoder.read(off, size) - logger.debug( - "%s: encoder.read off=%d size=%d in %.3fs", - tname, - off, - size, - time.monotonic() - t_read, - ) - return encoder_protocol.pack_read_reply(data) - - -def _send_reply(conn_sock: socket.socket, reply: bytes) -> bool: - """Send ``reply`` on the socket; return ``True`` on success and - ``False`` if the send raised ``OSError`` (already logged).""" - try: - conn_sock.sendall(reply) - return True - except OSError as exc: - logger.warning("encoder-server send failed: %s", exc) - return False - - -def _serve_connection( - conn_sock: socket.socket, - session: "_ServerSession", - encoder, - tname: str, -) -> None: - """Run the tag-dispatch loop for one connected client. - - Exits cleanly on EOF, unknown tag, truncated payload, or a send - failure. Exceptions raised inside a dispatch case are converted - to errno replies and the loop continues to the next request. - """ - while True: - try: - tag_buf = _recv_exact_sync(conn_sock, 1) - except EOFError as exc: - logger.warning("encoder-server frame error: %s", exc) - return - if len(tag_buf) == 0: - return - tag = bytes(tag_buf) - try: - if tag == encoder_protocol.TAG_GET_METADATA: - reply = _make_metadata_reply(session) - elif tag == encoder_protocol.TAG_READ: - reply = _make_read_reply(conn_sock, encoder, tname) - else: - logger.warning( - "encoder-server: unknown tag %r; closing connection", tag - ) - return - except Exception as exc: # noqa: BLE001 - any error becomes errno reply - reply = _make_error_reply(exc, "dispatch raised") - if reply is None: - return - if not _send_reply(conn_sock, reply): - return - - -def _handle_connection(conn_sock: socket.socket, session: _ServerSession) -> None: - """Run one client connection synchronously. Returns on EOF. - - A fresh encoder (PLINK ``BedEncoder`` / BGEN ``BgenEncoder``, - selected by the session's :class:`~biofuse.formats.FormatSpec`) is - constructed up front for every connection and its lifetime is bound - to the connection via a ``with`` block. If construction fails, an - errno reply is written to the socket so the client surfaces a real - ``OSError`` rather than an unexplained EOF. - """ - tname = threading.current_thread().name - logger.debug("%s: conn accepted", tname) - with conn_sock: - try: - t_enc = time.monotonic() - encoder_cm = session.spec.encoder_factory(session.reader, session.opts) - except Exception as exc: # noqa: BLE001 - any error becomes errno reply - _send_reply( - conn_sock, _make_error_reply(exc, "encoder construction failed") - ) - return - with encoder_cm as encoder: - logger.debug( - "%s: encoder created in %.3fs", tname, time.monotonic() - t_enc - ) - _serve_connection(conn_sock, session, encoder, tname) - logger.debug("%s: conn thread exit", tname) - - -def serve_forever( - listener_sock: socket.socket, - stop_sock: socket.socket, - session: _ServerSession, -) -> None: - """Accept loop. Spawns one daemon thread per connection. - - Returns when ``stop_sock`` becomes readable (the parent closed - its end of the socketpair). Closes ``listener_sock`` on the way - out. Connection threads are daemon threads; they exit on their - own once their client closes the socket. We do not join them on - shutdown — the process exit reaps them. - """ - try: - while True: - try: - readable, _, _ = select.select([listener_sock, stop_sock], [], []) - except OSError as exc: - logger.warning("encoder-server select failed: %s", exc) - return - if stop_sock in readable: - return - try: - conn_sock, _ = listener_sock.accept() - except OSError as exc: - logger.warning("encoder-server accept failed: %s", exc) - return - t = threading.Thread( - target=_handle_connection, - args=(conn_sock, session), - name=f"{session.spec.name}-conn", - daemon=True, - ) - try: - t.start() - except RuntimeError as exc: - # Per-process thread budget exhausted (e.g. cgroup pids - # limit). Decline this connection cleanly so the server - # stays alive; the client sees an EOF on the half-open - # socket and surfaces it as ``OSError`` to the FUSE - # layer. - logger.warning( - "encoder-server: thread.start() failed (%s); active=%d", - exc, - threading.active_count(), - ) - try: - conn_sock.close() - except OSError: - pass - continue - finally: - try: - listener_sock.close() - except OSError: - pass - - -def _server_main( - listener_sock: socket.socket, - stop_sock: socket.socket, - vcz_url: str, - spec: formats.FormatSpec, - opts, -) -> None: - """Subprocess entry point invoked via ``multiprocessing.Process``. - - The two sockets are handle-passed by ``multiprocessing`` (the - reduction machinery dups the fds into the child). The reader is - used as a context manager so its shared ``ThreadPoolExecutor`` - (one pool per reader, drawn on by every encoder / - ``ReadaheadPipeline``) is drained on the way out. - - ``opts`` is the ``vcztools.ViewPlinkOptions`` / ``vcztools.ViewBgenOptions`` - dataclass parsed from the CLI. Its nested ``log`` field configures - logging in the subprocess so its own ``logger.debug`` / ``logger.info`` - output reaches the same sink as the parent, and its ``make_reader`` - method opens ``vcz_url`` with the bcftools-style filtering options - (regions, samples, …) and the storage / readahead settings. - - Any exception raised before ``serve_forever`` starts (reader - construction, ``_ServerSession`` construction) is caught here so - multiprocessing's default handler does not print a traceback. The - cause is logged at ERROR (visible at default verbosity); the - traceback only surfaces at DEBUG. Static-sidecar build errors - (e.g. multi-allelic input rejection) now surface inside - ``_handle_connection`` on the first metadata request rather than - at session construction; the handler converts them to errno - replies so the parent sees a clean ``OSError`` from - ``_handshake``. - """ - opts.log.apply() - try: - with opts.make_reader(vcz_url) as reader: - # Bcftools-style filters (--max-alleles, --types, --include, - # …) configure a per-variant predicate via - # ``reader.set_variant_filter``; the format encoders refuse - # readers in that state. Resolve the predicate now into a - # fixed surviving-variant chunk plan so each connection's - # encoder sees a plain reader. No-op when no variant filter - # is configured. - reader.materialise_variant_filter() - session = _ServerSession(reader, spec, opts) - try: - serve_forever(listener_sock, stop_sock, session) - finally: - try: - stop_sock.close() - except OSError: - pass - except Exception as exc: # noqa: BLE001 - cleanly surface any startup failure - logger.error("%s-server startup failed: %s", spec.name, exc) - logger.debug("%s-server startup traceback", spec.name, exc_info=True) - try: - listener_sock.close() - except OSError: - pass - try: - stop_sock.close() - except OSError: - pass diff --git a/biofuse/formats.py b/biofuse/formats.py index f547761..00ebbd8 100644 --- a/biofuse/formats.py +++ b/biofuse/formats.py @@ -1,16 +1,16 @@ -"""Format specs for the encoder-server stack. +"""Format specs for the encoder-host stack. A :class:`FormatSpec` bundles everything the format-agnostic ``encoder_*`` modules need to serve one output format: - the suffix of the streaming file (``.bed`` / ``.bgen``), -- a callable returning the static-sidecar suffixes the parent should - expect for a given options dataclass, +- a callable returning the static-sidecar suffixes the host should + produce for a given options dataclass, - a builder that produces those static bytes from a ``VczReader`` plus options, - a factory that constructs one :class:`vcztools.BedEncoder` / - :class:`vcztools.BgenEncoder` per server-side connection, also - parameterised by the options dataclass. + :class:`vcztools.BgenEncoder` per streaming fh, also parameterised + by the options dataclass. Both :class:`vcztools.BedEncoder` and :class:`vcztools.BgenEncoder` extend :class:`vcztools.format_encoder.FormatEncoder`, so they share @@ -23,13 +23,13 @@ ``--no-fam`` suppress the corresponding PLINK sidecars, and ``--no-sample-file`` / ``--no-bgi`` suppress the BGEN sidecars. ``--no-header-samples`` flips ``embed_header_samples=False`` on the -``BgenEncoder`` so the per-connection ``.bgen`` stream omits the -sample identifiers from its header block. +``BgenEncoder`` so the per-fh ``.bgen`` stream omits the sample +identifiers from its header block. For BGEN the ``.bgen.bgi`` sidecar is a SQLite database that :func:`vcztools.write_bgi` writes to a filesystem path. We materialise -it to a tempfile at session-init time, read the bytes back, and hold -them in the server process's memory alongside the ``.sample`` text. +it to a tempfile at host startup, read the bytes back, and hold them +in the host's memory alongside the ``.sample`` text. """ import dataclasses @@ -43,7 +43,7 @@ @dataclasses.dataclass(frozen=True) class FormatSpec: - """One output format the encoder-server stack can serve.""" + """One output format the encoder-host stack can serve.""" name: str """Short identifier used in CLI / log lines (``"plink"`` / ``"bgen"``).""" @@ -69,7 +69,7 @@ class FormatSpec: encoder_factory: Callable """``(reader, opts) -> FormatEncoder``: construct one fresh encoder - for one server connection, parameterised by ``opts``.""" + for one streaming fh, parameterised by ``opts``.""" def _plink_static_suffixes(opts) -> tuple[str, ...]: diff --git a/tests/test_bgen_apps.py b/tests/test_bgen_apps.py index c74660b..7990880 100644 --- a/tests/test_bgen_apps.py +++ b/tests/test_bgen_apps.py @@ -1,10 +1,9 @@ """End-to-end tests for BGEN mounts. -Mirrors :mod:`test_plink_apps`: mount a VCZ as a BGEN fileset via the -real encoder-server subprocess, then read the mounted files. Where -``plink2`` is on PATH, ``TestPlinkTwo`` also exercises the mount as a -BGEN reader and compares its output to a side-by-side copy of the -encoder's bytes. +Mirrors :mod:`test_plink_apps`: mount a VCZ as a BGEN fileset, then +read the mounted files. Where ``plink2`` is on PATH, ``TestPlinkTwo`` +also exercises the mount as a BGEN reader and compares its output to +a side-by-side copy of the encoder's bytes. """ import contextlib @@ -19,7 +18,7 @@ import trio import vcztools -from biofuse import access_log, encoder_client, encoder_ops, formats, fuse_adapter +from biofuse import access_log, encoder_host, encoder_ops, formats, fuse_adapter def _open_reader(path) -> object: @@ -66,12 +65,13 @@ async def fx_mounted_bgen(tmp_path, fx_medium_vcz): expected = _encoder_bytes(fx_medium_vcz.path) log = access_log.AccessLogger() - sock_path = tmp_path / "bgen.sock" - async with await encoder_client.EncoderClient.start( - str(fx_medium_vcz.path), sock_path, formats.BGEN_SPEC - ) as client: + async with await encoder_host.EncoderHost.start( + str(fx_medium_vcz.path), + formats.BGEN_SPEC, + opts=vcztools.ViewBgenOptions(), + ) as host: ops = encoder_ops.EncoderOps( - client, "medium", formats.BGEN_SPEC, access_logger=log + host, "medium", formats.BGEN_SPEC, access_logger=log ) async with fuse_adapter.mount(ops, str(mnt)): await _wait_for_mount(mnt) @@ -92,18 +92,21 @@ async def _arun(cmd) -> None: async def _mount_bgen(tmp_path, vcz, opts=None): """Mount ``vcz`` as a BGEN fileset; yield ``(mnt, basename)``. - ``opts`` is the ``vcztools.ViewBgenOptions`` dataclass the - encoder-server runs under; defaults to a fresh ``ViewBgenOptions()`` - (every field at its dataclass default). + ``opts`` is the ``vcztools.ViewBgenOptions`` dataclass the host + runs under; defaults to a fresh ``ViewBgenOptions()`` (every field + at its dataclass default). """ mnt = tmp_path / "mnt" mnt.mkdir() basename = vcz.path.stem - sock_path = tmp_path / "bgen.sock" - async with await encoder_client.EncoderClient.start( - str(vcz.path), sock_path, formats.BGEN_SPEC, opts=opts - ) as client: - ops = encoder_ops.EncoderOps(client, basename, formats.BGEN_SPEC) + if opts is None: + opts = vcztools.ViewBgenOptions() + async with await encoder_host.EncoderHost.start( + str(vcz.path), + formats.BGEN_SPEC, + opts=opts, + ) as host: + ops = encoder_ops.EncoderOps(host, basename, formats.BGEN_SPEC) async with fuse_adapter.mount(ops, str(mnt)): await _wait_for_mount(mnt) yield mnt, basename diff --git a/tests/test_encoder_client.py b/tests/test_encoder_client.py deleted file mode 100644 index 6e2274e..0000000 --- a/tests/test_encoder_client.py +++ /dev/null @@ -1,532 +0,0 @@ -"""Tests for EncoderClient and StreamConnection. - -Most tests pair the trio client with a thread-based ``serve_forever`` -running on a real ``AF_UNIX`` listener under tmp_path. Real-subprocess -tests use the parametrised :data:`fx_spec` to exercise both PLINK and -BGEN through the spawn handshake. -""" - -import dataclasses -import errno -import multiprocessing as mp -import pathlib -import random -import socket -import threading - -import pytest -import trio -import vcztools - -from biofuse import encoder_client, encoder_server, formats - - -@pytest.fixture(params=[formats.PLINK_SPEC, formats.BGEN_SPEC], ids=["plink", "bgen"]) -def fx_spec(request): - return request.param - - -@pytest.fixture -def fx_opts(fx_spec): - if fx_spec.name == "plink": - return vcztools.ViewPlinkOptions() - return vcztools.ViewBgenOptions() - - -@pytest.fixture -def fx_reader(fx_small_vcz): - return vcztools.ViewPlinkOptions().make_reader(str(fx_small_vcz.path)) - - -@pytest.fixture -def fx_expected(fx_reader, fx_spec, fx_opts): - """Static-body / stream-size reference for the active spec. - - Built directly from the spec to avoid the BGEN / SQLite ``.bgi`` - non-determinism caveats documented in :mod:`test_encoder_server`. - """ - static = fx_spec.build_static_files(fx_reader, fx_opts) - with fx_spec.encoder_factory(fx_reader, fx_opts) as encoder: - stream_size = int(encoder.total_size) - return fx_spec, static, stream_size - - -class _ThreadServer: - """Drop-in stand-in for the ``multiprocessing.Process`` worker. - - Runs ``encoder_server.serve_forever`` on a thread bound to a real - ``AF_UNIX`` listener at ``socket_path``. Exposes the small subset - of the ``mp.Process`` API that ``EncoderClient.aclose`` needs. - """ - - def __init__( - self, - reader, - spec: formats.FormatSpec, - opts, - socket_path: pathlib.Path, - ) -> None: - self.exitcode: int | None = None - self.session = encoder_server._ServerSession(reader, spec, opts) - self._listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self._listener.bind(str(socket_path)) - self._listener.listen(16) - self._parent_stop, self._child_stop = socket.socketpair( - socket.AF_UNIX, socket.SOCK_STREAM - ) - self._thread = threading.Thread( - target=encoder_server.serve_forever, - args=(self._listener, self._child_stop, self.session), - daemon=True, - ) - self._thread.start() - - def is_alive(self) -> bool: - return self._thread.is_alive() - - def join(self, timeout: float | None = None) -> None: - self._thread.join(timeout=timeout) - if not self._thread.is_alive(): - self.exitcode = 0 - - def terminate(self) -> None: - try: - self._parent_stop.close() - except OSError: - pass - - def kill(self) -> None: - self.terminate() - - @property - def parent_stop(self) -> socket.socket: - return self._parent_stop - - -async def _client_with_thread_server( - reader, spec: formats.FormatSpec, opts, socket_path: pathlib.Path -) -> encoder_client.EncoderClient: - server = _ThreadServer(reader, spec, opts, socket_path) - self = encoder_client.EncoderClient.__new__(encoder_client.EncoderClient) - self.spec = spec - self.opts = opts - self.static_files = {} - self.stream_size = 0 - self._proc = server - self._socket_path = socket_path - self._stop_sock = server.parent_stop - self._closed = False - try: - await self._handshake() - except BaseException: - await self.aclose() - raise - return self - - -@pytest.fixture -async def fx_client(fx_reader, fx_spec, fx_opts, tmp_path): - socket_path = tmp_path / "encoder.sock" - client = await _client_with_thread_server(fx_reader, fx_spec, fx_opts, socket_path) - try: - yield client - finally: - await client.aclose() - - -class TestHandshake: - async def test_metadata_populated(self, fx_client, fx_expected): - spec, expected_static, expected_stream_size = fx_expected - assert fx_client.spec is spec - assert fx_client.stream_size == expected_stream_size - assert set(fx_client.static_files) == set(expected_static) - for suffix in expected_static: - assert len(fx_client.static_files[suffix]) == len(expected_static[suffix]) - - -class TestStreamConnection: - async def test_full_stream_read_matches_stream_size(self, fx_client): - conn = await fx_client.open_stream() - try: - data = await conn.read(0, fx_client.stream_size * 2) - assert len(data) == fx_client.stream_size - finally: - await conn.aclose() - - async def test_chunked_stream_read(self, fx_client): - conn = await fx_client.open_stream() - try: - chunks = [] - offset = 0 - while True: - data = await conn.read(offset, 4096) - if len(data) == 0: - break - chunks.append(data) - offset += len(data) - assert sum(len(c) for c in chunks) == fx_client.stream_size - finally: - await conn.aclose() - - async def test_random_pread_matches_full_read(self, fx_client): - """A random-offset pread on a fresh connection reads the same - bytes as the same window of a full sequential read on another - connection. This pins the encoder's random-access contract - without comparing against an on-disk golden.""" - full_conn = await fx_client.open_stream() - try: - full = await full_conn.read(0, fx_client.stream_size) - finally: - await full_conn.aclose() - - rng = random.Random(7) - conn = await fx_client.open_stream() - try: - for _ in range(20): - offset = rng.randrange(fx_client.stream_size) - size = rng.randrange(1, 256) - got = await conn.read(offset, size) - assert got == full[offset : offset + size] - finally: - await conn.aclose() - - async def test_two_connections_independent(self, fx_client): - full_conn = await fx_client.open_stream() - try: - expected = await full_conn.read(0, fx_client.stream_size) - finally: - await full_conn.aclose() - conn_a = await fx_client.open_stream() - conn_b = await fx_client.open_stream() - try: - half = len(expected) // 2 - a = await conn_a.read(0, half) - b = await conn_b.read(half, half) - assert a == expected[:half] - assert b == expected[half : 2 * half] - # Cross-encoder backward seeks: each connection has its own - # encoder, so a backward read on one doesn't disturb the - # other. - c = await conn_b.read(0, half) - d = await conn_a.read(half, half) - assert c == expected[:half] - assert d == expected[half : 2 * half] - finally: - await conn_a.aclose() - await conn_b.aclose() - - async def test_concurrent_connections_run_in_parallel(self, fx_client): - """Open four stream connections and run all four reads - concurrently in a trio nursery. Each one must see a byte-identical - copy of the stream, regardless of interleaving on the server.""" - full_conn = await fx_client.open_stream() - try: - expected = await full_conn.read(0, fx_client.stream_size) - finally: - await full_conn.aclose() - n = 4 - results: dict[int, bytes] = {} - - async def runner(idx: int) -> None: - conn = await fx_client.open_stream() - try: - results[idx] = await conn.read(0, fx_client.stream_size * 2) - finally: - await conn.aclose() - - async with trio.open_nursery() as nursery: - for i in range(n): - nursery.start_soon(runner, i) - assert len(results) == n - for body in results.values(): - assert body == expected - - async def test_read_after_close_raises(self, fx_client): - conn = await fx_client.open_stream() - await conn.aclose() - with pytest.raises(OSError, match="stream connection is closed") as excinfo: - await conn.read(0, 1) - assert excinfo.value.errno == errno.EIO - - async def test_aclose_is_idempotent(self, fx_client): - conn = await fx_client.open_stream() - await conn.aclose() - await conn.aclose() - - -class TestClientClose: - async def test_aclose_is_idempotent(self, fx_reader, fx_spec, fx_opts, tmp_path): - socket_path = tmp_path / "encoder.sock" - client = await _client_with_thread_server( - fx_reader, fx_spec, fx_opts, socket_path - ) - await client.aclose() - await client.aclose() - - async def test_aclose_terminates_server_thread( - self, fx_reader, fx_spec, fx_opts, tmp_path - ): - socket_path = tmp_path / "encoder.sock" - client = await _client_with_thread_server( - fx_reader, fx_spec, fx_opts, socket_path - ) - proc = client._proc - await client.aclose() - assert not proc.is_alive() - - -class TestTimeouts: - """The FUSE handler must never block forever on the worker. These - tests pin that property by pointing the client at a deliberately - unresponsive server and asserting that ``read`` and ``aclose`` - surface ``OSError(EIO)`` within a deadline.""" - - @staticmethod - def _bind_stall_listener(sock_path): - """Bind+listen a UNIX socket that will accept exactly one - connection inside the test's nursery and then go silent.""" - listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - listener.bind(str(sock_path)) - listener.listen(1) - listener.setblocking(False) - return listener - - async def _stall_server(self, listener): - trio_listener = trio.socket.from_stdlib_socket(listener) - conn, _ = await trio_listener.accept() - # Hold the connection open with no reads or writes. The test - # nursery cancels us at teardown. - try: - await trio.sleep_forever() - finally: - conn.close() - - async def _make_stalled_connection( - self, sock_path, nursery - ) -> encoder_client.StreamConnection: - listener = self._bind_stall_listener(sock_path) - nursery.start_soon(self._stall_server, listener) - stream = await trio.open_unix_socket(str(sock_path)) - return encoder_client.StreamConnection(stream) - - async def test_read_times_out_to_eio(self, monkeypatch, tmp_path): - monkeypatch.setattr(encoder_client, "_REQUEST_TIMEOUT_S", 0.2) - sock_path = tmp_path / "stall.sock" - async with trio.open_nursery() as nursery: - conn = await self._make_stalled_connection(sock_path, nursery) - t0 = trio.current_time() - with pytest.raises(OSError, match="encoder-server") as excinfo: - await conn.read(0, 1024) - elapsed = trio.current_time() - t0 - assert excinfo.value.errno == errno.EIO - assert elapsed < 1.0, f"read should fail fast, took {elapsed:.2f}s" - nursery.cancel_scope.cancel() - - async def test_read_after_timeout_is_immediate(self, monkeypatch, tmp_path): - monkeypatch.setattr(encoder_client, "_REQUEST_TIMEOUT_S", 0.2) - sock_path = tmp_path / "stall.sock" - async with trio.open_nursery() as nursery: - conn = await self._make_stalled_connection(sock_path, nursery) - with pytest.raises(OSError, match="encoder-server"): - await conn.read(0, 1024) - t0 = trio.current_time() - with pytest.raises(OSError, match="stream connection is closed") as excinfo: - await conn.read(0, 1024) - elapsed = trio.current_time() - t0 - assert excinfo.value.errno == errno.EIO - assert elapsed < 0.05, ( - f"second read should be immediate, took {elapsed:.3f}s" - ) - nursery.cancel_scope.cancel() - - async def test_aclose_does_not_hang_on_unresponsive_peer( - self, monkeypatch, tmp_path - ): - monkeypatch.setattr(encoder_client, "_ACLOSE_TIMEOUT_S", 0.2) - sock_path = tmp_path / "stall.sock" - async with trio.open_nursery() as nursery: - conn = await self._make_stalled_connection(sock_path, nursery) - t0 = trio.current_time() - await conn.aclose() - elapsed = trio.current_time() - t0 - assert elapsed < 1.0, f"aclose should not hang, took {elapsed:.2f}s" - nursery.cancel_scope.cancel() - - -class TestRealSubprocess: - """End-to-end tests against a real ``multiprocessing.Process`` worker.""" - - @pytest.mark.parametrize( - "spec", [formats.PLINK_SPEC, formats.BGEN_SPEC], ids=["plink", "bgen"] - ) - async def test_start_fast_fails_on_handshake_build_error( - self, fx_multiallelic_vcz, tmp_path, spec - ): - """``EncoderClient.start()`` must surface a clean ``OSError`` and - return well under the 10 s connect deadline when the subprocess - rejects the input during the metadata handshake. Static-file - build (here: multi-allelic VCZ) runs on demand inside - ``_handle_connection``; failures become errno replies on the - handshake socket.""" - socket_path = tmp_path / "encoder.sock" - t0 = trio.current_time() - with pytest.raises(OSError, match="encoder-server reported errno") as excinfo: - await encoder_client.EncoderClient.start( - str(fx_multiallelic_vcz.path), socket_path, spec - ) - elapsed = trio.current_time() - t0 - assert excinfo.value.errno == errno.EIO - assert elapsed < encoder_client._CONNECT_DEADLINE_S, ( - f"start() should fast-fail on handshake error, took {elapsed:.2f}s" - ) - - async def test_max_alleles_filter_through_start( - self, fx_multiallelic_vcz, tmp_path - ): - """``ViewPlinkOptions`` with ``--max-alleles 2`` flows through the - multiprocessing spawn into the worker's ``opts.make_reader`` and - drops multi-allelic variants, so the handshake succeeds and the - metadata reflects the filtered variant count.""" - # Sanity: the fixture really is multi-allelic, otherwise the - # filter has nothing to drop and the test isn't testing anything. - assert fx_multiallelic_vcz.num_biallelic_sites < ( - fx_multiallelic_vcz.num_variants - ) - socket_path = tmp_path / "encoder.sock" - default_opts = vcztools.ViewPlinkOptions() - selection = dataclasses.replace(default_opts.selection, max_alleles=2) - opts = dataclasses.replace(default_opts, selection=selection) - client = await encoder_client.EncoderClient.start( - str(fx_multiallelic_vcz.path), - socket_path, - formats.PLINK_SPEC, - opts=opts, - ) - try: - bim_lines = client.static_files[".bim"].decode("utf-8").splitlines() - assert len(bim_lines) == fx_multiallelic_vcz.num_biallelic_sites - fam_lines = client.static_files[".fam"].decode("utf-8").splitlines() - assert len(fam_lines) == fx_multiallelic_vcz.num_samples - bytes_per_variant = (fx_multiallelic_vcz.num_samples + 3) // 4 - expected_bed_size = ( - 3 + fx_multiallelic_vcz.num_biallelic_sites * bytes_per_variant - ) - assert client.stream_size == expected_bed_size - conn = await client.open_stream() - try: - data = await conn.read(0, expected_bed_size) - assert len(data) == expected_bed_size - finally: - await conn.aclose() - finally: - await client.aclose() - assert isinstance(client._proc, mp.process.BaseProcess) - assert not client._proc.is_alive() - assert client._proc.exitcode == 0 - - @pytest.mark.parametrize("no_header_samples", [True, False]) - async def test_bgen_no_header_samples_stable_across_opens( - self, fx_small_vcz, tmp_path, no_header_samples - ): - """``--no-header-samples`` flows into every per-connection - ``BgenEncoder`` consistently. - - Each ``open_stream`` spawns a fresh server-side thread that - constructs a new encoder via ``spec.encoder_factory(reader, opts)``. - Three sequential open/read/close cycles must all return the same - bytes — and those bytes must match an in-process reference - ``BgenEncoder`` constructed with the matching - ``embed_header_samples`` flag. - """ - opts = dataclasses.replace( - vcztools.ViewBgenOptions(), no_header_samples=no_header_samples - ) - ref_reader = opts.make_reader(str(fx_small_vcz.path)) - with vcztools.BgenEncoder( - ref_reader, embed_header_samples=not no_header_samples - ) as ref: - expected_size = int(ref.total_size) - expected = ref.read(0, expected_size) - - socket_path = tmp_path / "encoder.sock" - async with await encoder_client.EncoderClient.start( - str(fx_small_vcz.path), socket_path, formats.BGEN_SPEC, opts=opts - ) as client: - assert client.stream_size == expected_size - for cycle in range(3): - conn = await client.open_stream() - try: - data = await conn.read(0, client.stream_size) - finally: - await conn.aclose() - assert data == expected, f"cycle {cycle} differed from reference" - - @pytest.mark.parametrize( - ("no_sample_file", "no_bgi"), - [(True, False), (False, True), (True, True), (False, False)], - ) - async def test_bgen_sidecar_toggles_stable_across_starts( - self, fx_small_vcz, tmp_path, no_sample_file, no_bgi - ): - """The handshake's ``static_files`` honours ``--no-sample-file`` - / ``--no-bgi`` and yields the same suffix set on each - ``EncoderClient.start`` against the same options.""" - opts = dataclasses.replace( - vcztools.ViewBgenOptions(), - no_sample_file=no_sample_file, - no_bgi=no_bgi, - ) - expected_suffixes = formats.BGEN_SPEC.static_suffixes(opts) - first: dict[str, bytes] | None = None - for cycle in range(3): - socket_path = tmp_path / f"encoder-{cycle}.sock" - async with await encoder_client.EncoderClient.start( - str(fx_small_vcz.path), socket_path, formats.BGEN_SPEC, opts=opts - ) as client: - assert tuple(client.static_files) == expected_suffixes - # ``.bgen.bgi`` SQLite payloads have non-deterministic - # header bytes across independent writes; compare sizes - # only for that entry and the full body for ``.sample``. - if first is None: - first = dict(client.static_files) - continue - for suffix, body in client.static_files.items(): - if suffix == ".bgen.bgi": - assert len(body) == len(first[suffix]) - else: - assert body == first[suffix] - - @pytest.mark.parametrize( - "spec", [formats.PLINK_SPEC, formats.BGEN_SPEC], ids=["plink", "bgen"] - ) - async def test_spawn_handshake_open_read_close(self, fx_small_vcz, tmp_path, spec): - # The expected stream_size and static-body sizes are computed - # via a fresh in-process reader so the test does not depend on - # external goldens (write_plink / write_bgen). See the - # encoder-server test module for the rationale. - opts = ( - vcztools.ViewPlinkOptions() - if spec.name == "plink" - else vcztools.ViewBgenOptions() - ) - ref_reader = opts.make_reader(str(fx_small_vcz.path)) - with spec.encoder_factory(ref_reader, opts) as enc: - expected_stream_size = int(enc.total_size) - socket_path = tmp_path / "encoder.sock" - client = await encoder_client.EncoderClient.start( - str(fx_small_vcz.path), socket_path, spec - ) - try: - assert client.stream_size == expected_stream_size - assert set(client.static_files) == set(spec.static_suffixes(opts)) - conn = await client.open_stream() - try: - data = await conn.read(0, client.stream_size) - assert len(data) == client.stream_size - finally: - await conn.aclose() - finally: - await client.aclose() - assert isinstance(client._proc, mp.process.BaseProcess) - assert not client._proc.is_alive() - assert client._proc.exitcode == 0 diff --git a/tests/test_encoder_host.py b/tests/test_encoder_host.py new file mode 100644 index 0000000..2e4f117 --- /dev/null +++ b/tests/test_encoder_host.py @@ -0,0 +1,477 @@ +"""Tests for the in-process encoder host. + +Two layers: + +- ``StreamHandle`` exercised directly against a hand-built fake encoder + so the timeout / drain / leak paths can be triggered deterministically. +- ``EncoderHost`` exercised against the real spec factories (PLINK / BGEN) + on the ``fx_small_vcz`` fixture, parity-checked against an in-process + reference encoder built with the same ``(reader, opts)``. + +End-to-end FUSE behaviour against real ``plink`` / ``bgenix`` binaries +lives in ``test_plink_apps.py`` / ``test_bgen_apps.py``. +""" + +import dataclasses +import errno +import threading + +import pytest +import trio +import vcztools + +from biofuse import encoder_host, formats + + +@pytest.fixture(params=[formats.PLINK_SPEC, formats.BGEN_SPEC], ids=["plink", "bgen"]) +def fx_spec(request): + return request.param + + +@pytest.fixture +def fx_opts(fx_spec): + if fx_spec.name == "plink": + return vcztools.ViewPlinkOptions() + return vcztools.ViewBgenOptions() + + +@pytest.fixture +def fx_reference(fx_small_vcz, fx_spec, fx_opts): + """In-process reference: ``(static_files, stream_size, full_bytes)``. + + Built directly off the spec from a fresh reader so tests don't need + on-disk goldens. ``.bgen.bgi`` SQLite payloads have non-deterministic + header bytes across independent writes; tests using this fixture + therefore compare sizes / suffix sets for that entry rather than + full bodies. + """ + reader = fx_opts.make_reader(str(fx_small_vcz.path)) + try: + reader.materialise_variant_filter() + static_files = fx_spec.build_static_files(reader, fx_opts) + with fx_spec.encoder_factory(reader, fx_opts) as enc: + stream_size = int(enc.total_size) + full_bytes = enc.read(0, stream_size) + finally: + reader.__exit__(None, None, None) + return static_files, stream_size, full_bytes + + +# ----------------------------------------------------------------------------- +# Fake encoder for unit-testing StreamHandle in isolation. +# ----------------------------------------------------------------------------- + + +class _FakeEncoder: + """Hand-built encoder for StreamHandle timeout/drain tests. + + The ``read`` method blocks on a per-call ``threading.Event`` so a + test can release the worker thread at will. ``close`` records the + invocation. + """ + + def __init__(self) -> None: + self.release_read = threading.Event() + self.entered_read = threading.Event() + self.read_calls: list[tuple[int, int]] = [] + self.close_calls = 0 + self._payload = b"X" + + def set_payload(self, body: bytes) -> None: + self._payload = body + + def read(self, off: int, size: int) -> bytes: + self.read_calls.append((off, size)) + self.entered_read.set() + self.release_read.wait() + return self._payload + + def close(self) -> None: + self.close_calls += 1 + + +# ----------------------------------------------------------------------------- +# StreamHandle direct unit tests. +# ----------------------------------------------------------------------------- + + +class TestStreamHandleHappyPath: + async def test_read_returns_encoder_bytes(self): + encoder = _FakeEncoder() + encoder.set_payload(b"hello") + encoder.release_read.set() + handle = encoder_host.StreamHandle(encoder) + try: + got = await handle.read(0, 5) + assert got == b"hello" + assert encoder.read_calls == [(0, 5)] + finally: + await handle.aclose() + assert encoder.close_calls == 1 + + async def test_aclose_is_idempotent(self): + encoder = _FakeEncoder() + encoder.release_read.set() + handle = encoder_host.StreamHandle(encoder) + await handle.aclose() + await handle.aclose() + assert encoder.close_calls == 1 + + async def test_on_aclose_hook_fires_once(self): + encoder = _FakeEncoder() + encoder.release_read.set() + hook_calls: list[tuple[float, float]] = [] + handle = encoder_host.StreamHandle( + encoder, on_aclose=lambda t0, t1: hook_calls.append((t0, t1)) + ) + await handle.aclose() + await handle.aclose() # idempotent: hook only fires once. + assert len(hook_calls) == 1 + t0, t1 = hook_calls[0] + assert t1 >= t0 + + async def test_read_after_close_raises_eio(self): + encoder = _FakeEncoder() + encoder.release_read.set() + handle = encoder_host.StreamHandle(encoder) + await handle.aclose() + with pytest.raises(OSError, match="stream handle is closed") as excinfo: + await handle.read(0, 1) + assert excinfo.value.errno == errno.EIO + + +class TestStreamHandleSerialisation: + async def test_concurrent_reads_serialise_on_one_handle(self): + """Two concurrent ``handle.read`` calls must not enter the + encoder at the same time — ``FormatEncoder`` mutates iterator + state in place.""" + encoder = _FakeEncoder() + encoder.set_payload(b"A") + handle = encoder_host.StreamHandle(encoder) + in_flight = 0 + peak = 0 + lock = threading.Lock() + + original_read = encoder.read + + def watched_read(off: int, size: int) -> bytes: + nonlocal in_flight, peak + with lock: + in_flight += 1 + peak = max(peak, in_flight) + try: + return original_read(off, size) + finally: + with lock: + in_flight -= 1 + + encoder.read = watched_read + + try: + async with trio.open_nursery() as nursery: + + async def runner(): + # Release per-call so each read can complete. + encoder.release_read.set() + await handle.read(0, 1) + encoder.release_read.clear() + + # Sequentially start tasks that will serialise on the lock. + for _ in range(3): + encoder.release_read.set() + nursery.start_soon(handle.read, 0, 1) + await trio.sleep(0) + finally: + encoder.release_read.set() + await handle.aclose() + + assert peak == 1 + + +class TestStreamHandleTimeout: + async def test_read_times_out_to_eio(self, monkeypatch): + monkeypatch.setattr(encoder_host, "_REQUEST_TIMEOUT_S", 0.1) + encoder = _FakeEncoder() # release_read intentionally not set + handle = encoder_host.StreamHandle(encoder) + try: + t0 = trio.current_time() + with pytest.raises(OSError, match="encoder read timed out") as excinfo: + await handle.read(0, 1024) + elapsed = trio.current_time() - t0 + assert excinfo.value.errno == errno.EIO + assert elapsed < 1.0, f"read should fail fast, took {elapsed:.2f}s" + finally: + # Release the abandoned worker so aclose can drain. + encoder.release_read.set() + await handle.aclose() + + async def test_read_after_timeout_is_immediate(self, monkeypatch): + monkeypatch.setattr(encoder_host, "_REQUEST_TIMEOUT_S", 0.1) + encoder = _FakeEncoder() + handle = encoder_host.StreamHandle(encoder) + try: + with pytest.raises(OSError, match="encoder read timed out"): + await handle.read(0, 1024) + t0 = trio.current_time() + with pytest.raises(OSError, match="stream handle is closed") as excinfo: + await handle.read(0, 1024) + elapsed = trio.current_time() - t0 + assert excinfo.value.errno == errno.EIO + assert elapsed < 0.05, ( + f"second read should be immediate, took {elapsed:.3f}s" + ) + finally: + encoder.release_read.set() + await handle.aclose() + + async def test_aclose_drains_abandoned_thread_then_closes(self, monkeypatch): + """After a timeout the worker thread is still running. ``aclose`` + must wait for it to return before calling ``encoder.close`` — + the encoder's own thread pool cannot tolerate concurrent + ``read``/``close``.""" + monkeypatch.setattr(encoder_host, "_REQUEST_TIMEOUT_S", 0.1) + encoder = _FakeEncoder() + handle = encoder_host.StreamHandle(encoder) + with pytest.raises(OSError, match="encoder read timed out"): + await handle.read(0, 1024) + assert encoder.close_calls == 0 # not yet drained + + # Release the abandoned worker; aclose should then close the encoder. + encoder.release_read.set() + await handle.aclose() + assert encoder.close_calls == 1 + + +class TestStreamHandleAcloseLeak: + async def test_aclose_does_not_hang_on_wedged_encoder(self, monkeypatch): + """If the abandoned worker thread never returns, ``aclose`` must + log + leak rather than hang the unmount path. We pin the + elapsed-time bound here; the warning text is best-effort logging.""" + monkeypatch.setattr(encoder_host, "_REQUEST_TIMEOUT_S", 0.05) + monkeypatch.setattr(encoder_host, "_ACLOSE_TIMEOUT_S", 0.1) + encoder = _FakeEncoder() + handle = encoder_host.StreamHandle(encoder) + try: + with pytest.raises(OSError, match="encoder read timed out"): + await handle.read(0, 1024) + t0 = trio.current_time() + await handle.aclose() + elapsed = trio.current_time() - t0 + assert elapsed < 1.0, f"aclose should not hang, took {elapsed:.2f}s" + # encoder.close was not called because we never drained. + assert encoder.close_calls == 0 + finally: + encoder.release_read.set() + + +# ----------------------------------------------------------------------------- +# EncoderHost end-to-end tests against real spec factories. +# ----------------------------------------------------------------------------- + + +class TestEncoderHostStart: + async def test_static_files_keys_match_spec( + self, fx_small_vcz, fx_spec, fx_opts, fx_reference + ): + ref_static, ref_stream_size, _ = fx_reference + async with await encoder_host.EncoderHost.start( + str(fx_small_vcz.path), fx_spec, opts=fx_opts + ) as host: + assert tuple(host.static_files) == fx_spec.static_suffixes(fx_opts) + assert host.stream_size == ref_stream_size + for suffix in ref_static: + # ``.bgen.bgi`` SQLite headers vary across writes; size match + # is the strongest portable assertion. + if suffix == ".bgen.bgi": + assert len(host.static_files[suffix]) == len(ref_static[suffix]) + else: + assert host.static_files[suffix] == ref_static[suffix] + + async def test_start_failure_on_multiallelic_plink( + self, fx_multiallelic_vcz, tmp_path + ): + """PLINK static-file build refuses multi-allelic VCZ. The + failure must propagate cleanly from ``start`` rather than + leaving a half-initialised host behind.""" + with pytest.raises((ValueError, OSError)): + await encoder_host.EncoderHost.start( + str(fx_multiallelic_vcz.path), + formats.PLINK_SPEC, + opts=vcztools.ViewPlinkOptions(), + ) + + async def test_max_alleles_filter_drops_multiallelic_sites( + self, fx_multiallelic_vcz + ): + """``ViewPlinkOptions(max_alleles=2)`` must reach ``opts.make_reader`` + and drop multi-allelic variants so the static-file build succeeds.""" + # Sanity: the fixture really is multi-allelic. + assert fx_multiallelic_vcz.num_biallelic_sites < ( + fx_multiallelic_vcz.num_variants + ) + default_opts = vcztools.ViewPlinkOptions() + selection = dataclasses.replace(default_opts.selection, max_alleles=2) + opts = dataclasses.replace(default_opts, selection=selection) + async with await encoder_host.EncoderHost.start( + str(fx_multiallelic_vcz.path), formats.PLINK_SPEC, opts=opts + ) as host: + bim_lines = host.static_files[".bim"].decode("utf-8").splitlines() + assert len(bim_lines) == fx_multiallelic_vcz.num_biallelic_sites + fam_lines = host.static_files[".fam"].decode("utf-8").splitlines() + assert len(fam_lines) == fx_multiallelic_vcz.num_samples + + +class TestEncoderHostStreaming: + async def test_full_stream_read_matches_reference( + self, fx_small_vcz, fx_spec, fx_opts, fx_reference + ): + _, ref_stream_size, ref_bytes = fx_reference + async with await encoder_host.EncoderHost.start( + str(fx_small_vcz.path), fx_spec, opts=fx_opts + ) as host: + handle = await host.open_stream() + try: + got = await handle.read(0, ref_stream_size * 2) + assert got == ref_bytes + finally: + await handle.aclose() + + async def test_chunked_stream_read(self, fx_small_vcz, fx_spec, fx_opts): + async with await encoder_host.EncoderHost.start( + str(fx_small_vcz.path), fx_spec, opts=fx_opts + ) as host: + handle = await host.open_stream() + try: + chunks = [] + offset = 0 + while True: + data = await handle.read(offset, 4096) + if len(data) == 0: + break + chunks.append(data) + offset += len(data) + assert sum(len(c) for c in chunks) == host.stream_size + finally: + await handle.aclose() + + async def test_two_handles_independent( + self, fx_small_vcz, fx_spec, fx_opts, fx_reference + ): + _, _, ref_bytes = fx_reference + async with await encoder_host.EncoderHost.start( + str(fx_small_vcz.path), fx_spec, opts=fx_opts + ) as host: + handle_a = await host.open_stream() + handle_b = await host.open_stream() + try: + half = len(ref_bytes) // 2 + a = await handle_a.read(0, half) + b = await handle_b.read(half, half) + assert a == ref_bytes[:half] + assert b == ref_bytes[half : 2 * half] + # Each handle has its own encoder; a backward read on one + # does not disturb the other. + c = await handle_b.read(0, half) + d = await handle_a.read(half, half) + assert c == ref_bytes[:half] + assert d == ref_bytes[half : 2 * half] + finally: + await handle_a.aclose() + await handle_b.aclose() + + async def test_concurrent_handles_run_in_parallel( + self, fx_small_vcz, fx_spec, fx_opts, fx_reference + ): + _, _, ref_bytes = fx_reference + results: dict[int, bytes] = {} + + async with await encoder_host.EncoderHost.start( + str(fx_small_vcz.path), fx_spec, opts=fx_opts + ) as host: + + async def runner(idx: int) -> None: + handle = await host.open_stream() + try: + results[idx] = await handle.read(0, host.stream_size * 2) + finally: + await handle.aclose() + + async with trio.open_nursery() as nursery: + for i in range(4): + nursery.start_soon(runner, i) + + assert len(results) == 4 + for body in results.values(): + assert body == ref_bytes + + +class TestEncoderHostClose: + async def test_aclose_is_idempotent(self, fx_small_vcz, fx_spec, fx_opts): + host = await encoder_host.EncoderHost.start( + str(fx_small_vcz.path), fx_spec, opts=fx_opts + ) + await host.aclose() + await host.aclose() + + async def test_open_stream_after_close_raises(self, fx_small_vcz, fx_spec, fx_opts): + host = await encoder_host.EncoderHost.start( + str(fx_small_vcz.path), fx_spec, opts=fx_opts + ) + await host.aclose() + with pytest.raises(OSError, match="encoder host is closed") as excinfo: + await host.open_stream() + assert excinfo.value.errno == errno.EIO + + +class TestEncoderHostBgenOptions: + @pytest.mark.parametrize("no_header_samples", [True, False]) + async def test_no_header_samples_stable_across_opens( + self, fx_small_vcz, no_header_samples + ): + """``--no-header-samples`` flows into every per-handle + ``BgenEncoder``. Three sequential open/read/close cycles must + all return the same bytes — and those bytes must match an + in-process reference ``BgenEncoder`` with the matching + ``embed_header_samples`` flag.""" + opts = dataclasses.replace( + vcztools.ViewBgenOptions(), no_header_samples=no_header_samples + ) + ref_reader = opts.make_reader(str(fx_small_vcz.path)) + try: + with vcztools.BgenEncoder( + ref_reader, embed_header_samples=not no_header_samples + ) as ref: + expected_size = int(ref.total_size) + expected = ref.read(0, expected_size) + finally: + ref_reader.__exit__(None, None, None) + + async with await encoder_host.EncoderHost.start( + str(fx_small_vcz.path), formats.BGEN_SPEC, opts=opts + ) as host: + assert host.stream_size == expected_size + for cycle in range(3): + handle = await host.open_stream() + try: + data = await handle.read(0, host.stream_size) + finally: + await handle.aclose() + assert data == expected, f"cycle {cycle} differed from reference" + + @pytest.mark.parametrize( + ("no_sample_file", "no_bgi"), + [(True, False), (False, True), (True, True), (False, False)], + ) + async def test_sidecar_toggles_honour_spec( + self, fx_small_vcz, no_sample_file, no_bgi + ): + opts = dataclasses.replace( + vcztools.ViewBgenOptions(), + no_sample_file=no_sample_file, + no_bgi=no_bgi, + ) + expected_suffixes = formats.BGEN_SPEC.static_suffixes(opts) + async with await encoder_host.EncoderHost.start( + str(fx_small_vcz.path), formats.BGEN_SPEC, opts=opts + ) as host: + assert tuple(host.static_files) == expected_suffixes diff --git a/tests/test_encoder_ops.py b/tests/test_encoder_ops.py index fad3eca..5af45f5 100644 --- a/tests/test_encoder_ops.py +++ b/tests/test_encoder_ops.py @@ -1,11 +1,10 @@ """Unit tests for EncoderOps. Exercises the streaming Operations class via direct async-method calls -— no kernel mount, no subprocess. The vcztools/Zarr-side parity tests -live in ``test_encoder_server.py`` (against the server module directly) -and in ``test_encoder_client.py`` (against a real subprocess). End-to-end -FUSE behaviour against real plink / bgenix binaries lives in -``test_plink_apps.py`` / ``test_bgen_apps.py``. +— no kernel mount, no real encoder. The vcztools/Zarr-side parity +tests live in ``test_encoder_host.py``. End-to-end FUSE behaviour +against real plink / bgenix binaries lives in ``test_plink_apps.py`` +/ ``test_bgen_apps.py``. Tests parametrise over both :data:`biofuse.formats.PLINK_SPEC` and :data:`biofuse.formats.BGEN_SPEC` so the spec-driven inode table and @@ -32,9 +31,9 @@ def _default_opts(spec): return vcztools.ViewBgenOptions() -class _FakeStreamConnection: +class _FakeStreamHandle: """In-process stand-in for - :class:`biofuse.encoder_client.StreamConnection`. + :class:`biofuse.encoder_host.StreamHandle`. Records the call sequence so tests can assert EncoderOps dispatched correctly. Reads return deterministic bytes derived from @@ -74,11 +73,11 @@ async def aclose(self) -> None: self._on_aclose(t0, time.monotonic()) -class _FakeClient: - """In-process stand-in for :class:`biofuse.encoder_client.EncoderClient`. +class _FakeHost: + """In-process stand-in for :class:`biofuse.encoder_host.EncoderHost`. Holds canned ``static_files`` / ``stream_size`` and hands out - :class:`_FakeStreamConnection` instances on demand. + :class:`_FakeStreamHandle` instances on demand. """ def __init__( @@ -98,20 +97,18 @@ def __init__( self.calls: list[tuple] = [] self._next_open_error: OSError | None = None self._next_conn_id = 1 - self.connections: list[_FakeStreamConnection] = [] + self.connections: list[_FakeStreamHandle] = [] def raise_on_next_open(self, exc: OSError) -> None: self._next_open_error = exc - async def open_stream(self, *, on_aclose=None) -> _FakeStreamConnection: + async def open_stream(self, *, on_aclose=None) -> _FakeStreamHandle: self.calls.append(("open_stream",)) if self._next_open_error is not None: exc = self._next_open_error self._next_open_error = None raise exc - conn = _FakeStreamConnection( - self._next_conn_id, self.calls, on_aclose=on_aclose - ) + conn = _FakeStreamHandle(self._next_conn_id, self.calls, on_aclose=on_aclose) self._next_conn_id += 1 self.connections.append(conn) return conn @@ -128,13 +125,13 @@ def fx_static_suffixes(fx_spec): @pytest.fixture -def fx_client(fx_spec): - return _FakeClient(fx_spec) +def fx_host(fx_spec): + return _FakeHost(fx_spec) @pytest.fixture -def fx_ops(fx_client, fx_spec): - return encoder_ops.EncoderOps(fx_client, "small", fx_spec) +def fx_ops(fx_host, fx_spec): + return encoder_ops.EncoderOps(fx_host, "small", fx_spec) @pytest.fixture @@ -148,8 +145,8 @@ def fx_first_static_name(fx_static_suffixes): @pytest.fixture -def fx_first_static_bytes(fx_client, fx_static_suffixes): - return fx_client.static_files[fx_static_suffixes[0]] +def fx_first_static_bytes(fx_host, fx_static_suffixes): + return fx_host.static_files[fx_static_suffixes[0]] async def _expect_fuse_error(coro, expected_errno): @@ -164,21 +161,21 @@ def test_creates_one_inode_per_manifest_entry(self, fx_ops, fx_static_suffixes): assert len(fx_ops._name_to_inode) == n_expected def test_basename_propagates_to_all_files( - self, fx_client, fx_spec, fx_static_suffixes + self, fx_host, fx_spec, fx_static_suffixes ): - ops = encoder_ops.EncoderOps(fx_client, "alt", fx_spec) + ops = encoder_ops.EncoderOps(fx_host, "alt", fx_spec) expected = sorted( [f"alt{fx_spec.streaming_suffix}"] + [f"alt{suffix}" for suffix in fx_static_suffixes] ) assert sorted(ops._name_to_inode) == expected - def test_sizes_match_client_metadata(self, fx_client, fx_spec, fx_static_suffixes): - ops = encoder_ops.EncoderOps(fx_client, "small", fx_spec) + def test_sizes_match_client_metadata(self, fx_host, fx_spec, fx_static_suffixes): + ops = encoder_ops.EncoderOps(fx_host, "small", fx_spec) sizes = {ops._inode_to_name[i]: size for i, size in ops._inode_to_size.items()} - expected = {f"small{fx_spec.streaming_suffix}": fx_client.stream_size} + expected = {f"small{fx_spec.streaming_suffix}": fx_host.stream_size} for suffix in fx_static_suffixes: - expected[f"small{suffix}"] = len(fx_client.static_files[suffix]) + expected[f"small{suffix}"] = len(fx_host.static_files[suffix]) assert sizes == expected def test_inodes_assigned_in_sorted_order(self, fx_ops): @@ -193,20 +190,20 @@ async def test_root(self, fx_ops): attrs = await fx_ops.getattr(pyfuse3.ROOT_INODE) assert stat.S_ISDIR(attrs.st_mode) - async def test_streaming_file(self, fx_ops, fx_client, fx_stream_name): + async def test_streaming_file(self, fx_ops, fx_host, fx_stream_name): inode = fx_ops._name_to_inode[fx_stream_name] attrs = await fx_ops.getattr(inode) assert stat.S_ISREG(attrs.st_mode) - assert attrs.st_size == fx_client.stream_size + assert attrs.st_size == fx_host.stream_size async def test_unknown_inode(self, fx_ops): await _expect_fuse_error(fx_ops.getattr(9999), errno.ENOENT) class TestLookup: - async def test_known_name(self, fx_ops, fx_client, fx_stream_name): + async def test_known_name(self, fx_ops, fx_host, fx_stream_name): attrs = await fx_ops.lookup(pyfuse3.ROOT_INODE, fx_stream_name.encode("utf-8")) - assert attrs.st_size == fx_client.stream_size + assert attrs.st_size == fx_host.stream_size async def test_unknown_name(self, fx_ops): await _expect_fuse_error( @@ -273,66 +270,66 @@ async def test_open_unknown_inode(self, fx_ops): class TestOpenDispatch: async def test_stream_open_creates_connection( - self, fx_ops, fx_client, fx_spec, fx_stream_name + self, fx_ops, fx_host, fx_spec, fx_stream_name ): inode = fx_ops._name_to_inode[fx_stream_name] info = await fx_ops.open(inode, os.O_RDONLY) try: - assert fx_client.calls == [("open_stream",)] + assert fx_host.calls == [("open_stream",)] assert fx_ops._fh_to_kind[info.fh] == fx_spec.streaming_kind assert info.fh in fx_ops._fh_to_conn finally: await fx_ops.release(info.fh) async def test_static_open_does_not_call_client( - self, fx_ops, fx_client, fx_first_static_name + self, fx_ops, fx_host, fx_first_static_name ): inode = fx_ops._name_to_inode[fx_first_static_name] info = await fx_ops.open(inode, os.O_RDONLY) try: - assert fx_client.calls == [] + assert fx_host.calls == [] assert fx_ops._fh_to_kind[info.fh] == "static" assert info.fh not in fx_ops._fh_to_conn finally: await fx_ops.release(info.fh) async def test_each_stream_open_gets_distinct_fh_and_connection( - self, fx_ops, fx_client, fx_stream_name + self, fx_ops, fx_host, fx_stream_name ): inode = fx_ops._name_to_inode[fx_stream_name] info1 = await fx_ops.open(inode, os.O_RDONLY) info2 = await fx_ops.open(inode, os.O_RDONLY) try: assert info1.fh != info2.fh - assert len(fx_client.connections) == 2 - assert fx_client.connections[0] is not fx_client.connections[1] + assert len(fx_host.connections) == 2 + assert fx_host.connections[0] is not fx_host.connections[1] finally: await fx_ops.release(info1.fh) await fx_ops.release(info2.fh) async def test_stream_open_propagates_oserror_as_fuseerror( - self, fx_ops, fx_client, fx_stream_name + self, fx_ops, fx_host, fx_stream_name ): - fx_client.raise_on_next_open(OSError(errno.EACCES, "denied")) + fx_host.raise_on_next_open(OSError(errno.EACCES, "denied")) inode = fx_ops._name_to_inode[fx_stream_name] await _expect_fuse_error(fx_ops.open(inode, os.O_RDONLY), errno.EACCES) class TestRead: async def test_stream_read_dispatches_to_connection( - self, fx_ops, fx_client, fx_stream_name + self, fx_ops, fx_host, fx_stream_name ): inode = fx_ops._name_to_inode[fx_stream_name] info = await fx_ops.open(inode, os.O_RDONLY) try: data = await fx_ops.read(info.fh, 16, 8) - assert ("read", 1, 16, 8) in fx_client.calls + assert ("read", 1, 16, 8) in fx_host.calls assert data == bytes(((16 + i) & 0xFF) for i in range(8)) finally: await fx_ops.release(info.fh) async def test_static_read_serves_from_cached_bytes( - self, fx_ops, fx_client, fx_first_static_name, fx_first_static_bytes + self, fx_ops, fx_host, fx_first_static_name, fx_first_static_bytes ): inode = fx_ops._name_to_inode[fx_first_static_name] info = await fx_ops.open(inode, os.O_RDONLY) @@ -340,7 +337,7 @@ async def test_static_read_serves_from_cached_bytes( data = await fx_ops.read(info.fh, 0, len(fx_first_static_bytes)) assert data == fx_first_static_bytes # No client traffic for static reads. - assert all(c[0] != "read" for c in fx_client.calls) + assert all(c[0] != "read" for c in fx_host.calls) finally: await fx_ops.release(info.fh) @@ -371,12 +368,12 @@ async def test_read_unknown_fh_returns_ebadf(self, fx_ops): await _expect_fuse_error(fx_ops.read(9999, 0, 10), errno.EBADF) async def test_stream_read_propagates_oserror_as_fuseerror( - self, fx_ops, fx_client, fx_stream_name + self, fx_ops, fx_host, fx_stream_name ): inode = fx_ops._name_to_inode[fx_stream_name] info = await fx_ops.open(inode, os.O_RDONLY) try: - fx_client.connections[0].raise_on_next_read(OSError(errno.EIO, "boom")) + fx_host.connections[0].raise_on_next_read(OSError(errno.EIO, "boom")) await _expect_fuse_error(fx_ops.read(info.fh, 0, 10), errno.EIO) finally: await fx_ops.release(info.fh) @@ -384,21 +381,21 @@ async def test_stream_read_propagates_oserror_as_fuseerror( class TestRelease: async def test_stream_release_closes_connection( - self, fx_ops, fx_client, fx_stream_name + self, fx_ops, fx_host, fx_stream_name ): inode = fx_ops._name_to_inode[fx_stream_name] info = await fx_ops.open(inode, os.O_RDONLY) await fx_ops.release(info.fh) - assert ("aclose", 1) in fx_client.calls + assert ("aclose", 1) in fx_host.calls assert info.fh not in fx_ops._fh_to_conn async def test_static_release_does_not_call_client( - self, fx_ops, fx_client, fx_first_static_name + self, fx_ops, fx_host, fx_first_static_name ): inode = fx_ops._name_to_inode[fx_first_static_name] info = await fx_ops.open(inode, os.O_RDONLY) await fx_ops.release(info.fh) - assert all(c[0] != "aclose" for c in fx_client.calls) + assert all(c[0] != "aclose" for c in fx_host.calls) async def test_release_unknown_fh_silent(self, fx_ops): await fx_ops.release(9999) @@ -411,9 +408,9 @@ async def test_release_is_idempotent(self, fx_ops, fx_stream_name): class TestAccessLogger: - async def test_records_per_read(self, fx_client, fx_spec, fx_static_suffixes): + async def test_records_per_read(self, fx_host, fx_spec, fx_static_suffixes): log = access_log.AccessLogger() - ops = encoder_ops.EncoderOps(fx_client, "small", fx_spec, access_logger=log) + ops = encoder_ops.EncoderOps(fx_host, "small", fx_spec, access_logger=log) stream_inode = ops._name_to_inode[f"small{fx_spec.streaming_suffix}"] static_inode = ops._name_to_inode[f"small{fx_static_suffixes[0]}"] stream_info = await ops.open(stream_inode, os.O_RDONLY) @@ -442,8 +439,8 @@ class TestCapacityLimiter: streaming-file opens without blocking static reads. """ - async def test_stream_blocks_at_cap(self, fx_client, fx_spec): - ops = encoder_ops.EncoderOps(fx_client, "small", fx_spec, max_open_stream=2) + async def test_stream_blocks_at_cap(self, fx_host, fx_spec): + ops = encoder_ops.EncoderOps(fx_host, "small", fx_spec, max_open_stream=2) stream_inode = ops._name_to_inode[f"small{fx_spec.streaming_suffix}"] info1 = await ops.open(stream_inode, os.O_RDONLY) info2 = await ops.open(stream_inode, os.O_RDONLY) @@ -465,9 +462,9 @@ async def third_open(): await ops.release(third_info[0].fh) async def test_static_does_not_block_when_stream_at_cap( - self, fx_client, fx_spec, fx_static_suffixes + self, fx_host, fx_spec, fx_static_suffixes ): - ops = encoder_ops.EncoderOps(fx_client, "small", fx_spec, max_open_stream=1) + ops = encoder_ops.EncoderOps(fx_host, "small", fx_spec, max_open_stream=1) stream_inode = ops._name_to_inode[f"small{fx_spec.streaming_suffix}"] static_inode = ops._name_to_inode[f"small{fx_static_suffixes[0]}"] stream_info = await ops.open(stream_inode, os.O_RDONLY) @@ -475,16 +472,16 @@ async def test_static_does_not_block_when_stream_at_cap( await ops.release(static_info.fh) await ops.release(stream_info.fh) - async def test_failed_open_releases_slot(self, fx_client, fx_spec): - ops = encoder_ops.EncoderOps(fx_client, "small", fx_spec, max_open_stream=1) + async def test_failed_open_releases_slot(self, fx_host, fx_spec): + ops = encoder_ops.EncoderOps(fx_host, "small", fx_spec, max_open_stream=1) stream_inode = ops._name_to_inode[f"small{fx_spec.streaming_suffix}"] - fx_client.raise_on_next_open(OSError(errno.EACCES, "denied")) + fx_host.raise_on_next_open(OSError(errno.EACCES, "denied")) await _expect_fuse_error(ops.open(stream_inode, os.O_RDONLY), errno.EACCES) info = await ops.open(stream_inode, os.O_RDONLY) await ops.release(info.fh) async def test_open_returns_eagain_when_limiter_starved( - self, fx_client, fx_spec, monkeypatch + self, fx_host, fx_spec, monkeypatch ): """A leaked limiter slot must not pin FUSE_OPEN forever. @@ -492,7 +489,7 @@ async def test_open_returns_eagain_when_limiter_starved( released, a competing open must surface ``EAGAIN`` once the per-mount limiter deadline expires.""" monkeypatch.setattr(encoder_ops, "_LIMITER_TIMEOUT_S", 0.2) - ops = encoder_ops.EncoderOps(fx_client, "small", fx_spec, max_open_stream=1) + ops = encoder_ops.EncoderOps(fx_host, "small", fx_spec, max_open_stream=1) stream_inode = ops._name_to_inode[f"small{fx_spec.streaming_suffix}"] held = await ops.open(stream_inode, os.O_RDONLY) try: @@ -501,7 +498,7 @@ async def test_open_returns_eagain_when_limiter_starved( await ops.release(held.fh) async def test_limiter_timeout_records_access_event( - self, fx_client, fx_spec, monkeypatch + self, fx_host, fx_spec, monkeypatch ): """A timed-out FUSE_OPEN must leave a ``limiter_timeout`` event in the access log so post-hoc analysis can attribute an @@ -510,7 +507,7 @@ async def test_limiter_timeout_records_access_event( monkeypatch.setattr(encoder_ops, "_LIMITER_TIMEOUT_S", timeout_s) log = access_log.AccessLogger() ops = encoder_ops.EncoderOps( - fx_client, "small", fx_spec, max_open_stream=1, access_logger=log + fx_host, "small", fx_spec, max_open_stream=1, access_logger=log ) stream_inode = ops._name_to_inode[f"small{fx_spec.streaming_suffix}"] held = await ops.open(stream_inode, os.O_RDONLY) @@ -532,9 +529,9 @@ class TestLifecycleEvents: are emitted on the access logger so we can localise where time is spent in the lifecycle without changing the read trace.""" - async def test_stream_emits_full_lifecycle(self, fx_client, fx_spec): + async def test_stream_emits_full_lifecycle(self, fx_host, fx_spec): log = access_log.AccessLogger() - ops = encoder_ops.EncoderOps(fx_client, "small", fx_spec, access_logger=log) + ops = encoder_ops.EncoderOps(fx_host, "small", fx_spec, access_logger=log) stream_inode = ops._name_to_inode[f"small{fx_spec.streaming_suffix}"] info = await ops.open(stream_inode, os.O_RDONLY) await ops.read(info.fh, 0, 8) @@ -552,10 +549,10 @@ async def test_stream_emits_full_lifecycle(self, fx_client, fx_spec): assert acl.fh == info.fh async def test_static_emits_open_and_release_only( - self, fx_client, fx_spec, fx_static_suffixes + self, fx_host, fx_spec, fx_static_suffixes ): log = access_log.AccessLogger() - ops = encoder_ops.EncoderOps(fx_client, "small", fx_spec, access_logger=log) + ops = encoder_ops.EncoderOps(fx_host, "small", fx_spec, access_logger=log) static_inode = ops._name_to_inode[f"small{fx_static_suffixes[0]}"] info = await ops.open(static_inode, os.O_RDONLY) await ops.release(info.fh) @@ -597,11 +594,9 @@ async def test_returns_statvfs_data_with_expected_fields(self, fx_ops): out = await fx_ops.statfs() assert isinstance(out, pyfuse3.StatvfsData) - async def test_block_count_matches_sum_of_file_sizes(self, fx_ops, fx_client): + async def test_block_count_matches_sum_of_file_sizes(self, fx_ops, fx_host): out = await fx_ops.statfs() - total = fx_client.stream_size + sum( - len(b) for b in fx_client.static_files.values() - ) + total = fx_host.stream_size + sum(len(b) for b in fx_host.static_files.values()) assert out.f_bsize > 0 assert out.f_frsize == out.f_bsize assert out.f_blocks == (total + out.f_bsize - 1) // out.f_bsize diff --git a/tests/test_encoder_protocol.py b/tests/test_encoder_protocol.py deleted file mode 100644 index 3fba80c..0000000 --- a/tests/test_encoder_protocol.py +++ /dev/null @@ -1,124 +0,0 @@ -"""Pure unit tests for the encoder-server wire protocol. - -The protocol module has no I/O — these tests pin its byte layouts and -roundtrip semantics. Metadata-reply shape is variable-arity (any -number of static-file bodies in any order, plus one streaming-file -size); both 2-static layouts used in production (PLINK bim/fam, BGEN -sample/bgi) are exercised here. -""" - -import errno -import struct - -import pytest - -from biofuse import encoder_protocol - - -class TestRequestFraming: - def test_get_metadata_request_is_just_tag(self): - assert encoder_protocol.pack_get_metadata_request() == b"M" - - def test_read_request_layout(self): - buf = encoder_protocol.pack_read_request(4096, 8192) - assert buf[:1] == b"R" - off, size = struct.unpack(" bytes: - buf = bytearray() - while len(buf) < n: - chunk = sock.recv(n - len(buf)) - if len(chunk) == 0: - raise EOFError(f"socket closed after {len(buf)}/{n} bytes") - buf.extend(chunk) - return bytes(buf) - - -def _read_status_and_body(sock: socket.socket) -> tuple[int, bytes]: - status = encoder_protocol.parse_status( - _recv_exact(sock, encoder_protocol.REPLY_STATUS_SIZE) - ) - if status <= 0: - return status, b"" - return status, _recv_exact(sock, status) - - -def _spawn_handle_connection( - session: encoder_server._ServerSession, -) -> tuple[socket.socket, threading.Thread]: - parent_sock, child_sock = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) - thread = threading.Thread( - target=encoder_server._handle_connection, - args=(child_sock, session), - daemon=True, - ) - thread.start() - return parent_sock, thread - - -class _FakeEncoder: - """In-memory encoder stand-in for direct unit tests of the - per-tag reply helpers. ``read(off, size)`` returns deterministic - bytes so callers can assert exact payloads.""" - - total_size = 4096 - - def __init__(self): - self.calls: list[tuple[int, int]] = [] - - def read(self, off: int, size: int) -> bytes: - self.calls.append((off, size)) - return bytes((off + i) & 0xFF for i in range(size)) - - -def _spawn_serve_connection( - session: encoder_server._ServerSession, -) -> tuple[socket.socket, threading.Thread]: - """Run ``_serve_connection`` in a daemon thread with a fresh - encoder bound to the thread. Returns ``(parent_sock, thread)``; - caller closes ``parent_sock`` and joins ``thread``. The child - socket is closed by the target wrapper once the loop exits.""" - parent_sock, child_sock = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) - - def target(): - try: - with session.spec.encoder_factory(session.reader, session.opts) as encoder: - encoder_server._serve_connection( - child_sock, session, encoder, "test-thread" - ) - finally: - try: - child_sock.close() - except OSError: - pass - - thread = threading.Thread(target=target, daemon=True) - thread.start() - return parent_sock, thread - - -class TestMakeErrorReply: - def test_oserror_uses_exc_errno_without_logging(self, caplog): - exc = OSError(errno.ENOENT, "missing") - with caplog.at_level(logging.DEBUG, logger="biofuse.encoder_server"): - reply = encoder_server._make_error_reply(exc, "ctx") - assert encoder_protocol.parse_status(reply) == -errno.ENOENT - emitted = [r for r in caplog.records if r.name == "biofuse.encoder_server"] - assert emitted == [] - - def test_non_oserror_uses_eio_and_logs_error_plus_debug_traceback(self, caplog): - exc = ValueError("boom") - with caplog.at_level(logging.DEBUG, logger="biofuse.encoder_server"): - reply = encoder_server._make_error_reply(exc, "during dispatch") - assert encoder_protocol.parse_status(reply) == -errno.EIO - encoder_records = [ - r for r in caplog.records if r.name == "biofuse.encoder_server" - ] - error_records = [r for r in encoder_records if r.levelno == logging.ERROR] - assert len(error_records) == 1 - msg = error_records[0].getMessage() - assert "during dispatch" in msg - assert f"errno {errno.EIO}" in msg - assert "boom" in msg - debug_records = [ - r - for r in encoder_records - if r.levelno == logging.DEBUG - and "during dispatch traceback" in r.getMessage() - ] - assert len(debug_records) == 1 - assert debug_records[0].exc_info is not None - - -class TestMakeMetadataReply: - def test_happy_path_matches_expected_static(self, fx_session, fx_expected): - spec, expected_static, expected_stream_size = fx_expected - reply = encoder_server._make_metadata_reply(fx_session) - status = encoder_protocol.parse_status( - reply[: encoder_protocol.REPLY_STATUS_SIZE] - ) - body = reply[encoder_protocol.REPLY_STATUS_SIZE :] - assert status == len(body) - n_static, stream_size = encoder_protocol.parse_metadata_prefix( - body[: encoder_protocol.META_PREFIX_SIZE] - ) - assert n_static == len(spec.static_suffixes(fx_session.opts)) - assert stream_size == expected_stream_size - sizes_size = n_static * encoder_protocol.META_SIZE_ENTRY_SIZE - sizes = encoder_protocol.parse_static_sizes( - body[ - encoder_protocol.META_PREFIX_SIZE : encoder_protocol.META_PREFIX_SIZE - + sizes_size - ], - n_static, - ) - suffixes = spec.static_suffixes(fx_session.opts) - for suffix, size in zip(suffixes, sizes, strict=True): - assert size == len(expected_static[suffix]) - - def test_missing_suffix_raises_value_error(self, fx_reader, fx_spec, fx_opts): - bad_spec = dataclasses.replace( - fx_spec, build_static_files=lambda reader, opts: {} - ) - session = encoder_server._ServerSession(fx_reader, bad_spec, fx_opts) - with pytest.raises(ValueError, match="returned keys"): - encoder_server._make_metadata_reply(session) - - def test_extra_suffix_raises_value_error(self, fx_reader, fx_spec, fx_opts): - original_build = fx_spec.build_static_files - - def with_extra(r, o): - return {**original_build(r, o), ".unexpected": b"x"} - - bad_spec = dataclasses.replace(fx_spec, build_static_files=with_extra) - session = encoder_server._ServerSession(fx_reader, bad_spec, fx_opts) - with pytest.raises(ValueError, match="returned keys"): - encoder_server._make_metadata_reply(session) - - -class TestMakeReadReply: - def test_full_payload_returns_packed_reply(self): - # ``_make_read_reply`` is called after the tag has already been - # consumed from the wire — send only the payload bytes. - parent, child = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) - encoder = _FakeEncoder() - try: - parent.sendall(struct.pack(" tuple[socket.socket, pathlib.Path]: - sock_path = tmp_path / "encoder.sock" - listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - listener.bind(str(sock_path)) - listener.listen(8) - return listener, sock_path - - def _start_server_thread( - self, listener: socket.socket, session: encoder_server._ServerSession - ) -> tuple[socket.socket, threading.Thread]: - parent_stop, child_stop = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) - thread = threading.Thread( - target=encoder_server.serve_forever, - args=(listener, child_stop, session), - daemon=True, - ) - thread.start() - return parent_stop, thread - - def test_concurrent_connections_each_get_metadata(self, fx_session, tmp_path): - listener, sock_path = self._bind_listener(tmp_path) - parent_stop, server_thread = self._start_server_thread(listener, fx_session) - try: - replies: list[bytes] = [] - errors: list[BaseException] = [] - lock = threading.Lock() - - def client_worker(): - try: - s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - s.connect(str(sock_path)) - try: - s.sendall(encoder_protocol.pack_get_metadata_request()) - _, body = _read_status_and_body(s) - with lock: - replies.append(body) - finally: - s.close() - except BaseException as exc: # noqa: BLE001 - with lock: - errors.append(exc) - - threads = [threading.Thread(target=client_worker) for _ in range(4)] - for t in threads: - t.start() - for t in threads: - t.join(timeout=10) - assert errors == [] - assert len(replies) == 4 - assert all(r == replies[0] for r in replies) - finally: - parent_stop.close() - server_thread.join(timeout=5) - assert not server_thread.is_alive() - - def test_concurrent_stream_reads_independent_state( - self, fx_session, fx_expected, tmp_path - ): - """Each connection runs in its own server thread with its own - encoder; full reads on distinct connections see byte-identical - streams regardless of interleaving.""" - _, _, expected_stream_size = fx_expected - listener, sock_path = self._bind_listener(tmp_path) - parent_stop, server_thread = self._start_server_thread(listener, fx_session) - try: - n = 4 - results: dict[int, bytes] = {} - errors: list[BaseException] = [] - lock = threading.Lock() - barrier = threading.Barrier(n) - - def worker(tid): - try: - s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - s.connect(str(sock_path)) - try: - barrier.wait(timeout=5) - s.sendall( - encoder_protocol.pack_read_request(0, expected_stream_size) - ) - _, body = _read_status_and_body(s) - with lock: - results[tid] = body - finally: - s.close() - except BaseException as exc: # noqa: BLE001 - with lock: - errors.append(exc) - - threads = [threading.Thread(target=worker, args=(i,)) for i in range(n)] - for t in threads: - t.start() - for t in threads: - t.join(timeout=30) - assert errors == [] - assert len(results) == n - reference = results[0] - assert len(reference) == expected_stream_size - for body in results.values(): - assert body == reference - finally: - parent_stop.close() - server_thread.join(timeout=5) - assert not server_thread.is_alive() - - def test_stop_signal_exits_accept_loop(self, fx_session, tmp_path): - listener, sock_path = self._bind_listener(tmp_path) - parent_stop, server_thread = self._start_server_thread(listener, fx_session) - # Server is in select(); close parent's stop end → child wakes. - parent_stop.close() - server_thread.join(timeout=5) - assert not server_thread.is_alive() - # The listener socket should now be closed; new connects fail. - s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - s.settimeout(1) - with pytest.raises(ConnectionRefusedError): - s.connect(str(sock_path)) - - -class TestServerMainHandshakeFailure: - """Static-file build failures (e.g. multi-allelic input) surface at - handshake time as errno replies on the wire. The subprocess must - keep running through a failed handshake and exit cleanly when - stop-signalled.""" - - @pytest.mark.parametrize( - "spec", [formats.PLINK_SPEC, formats.BGEN_SPEC], ids=["plink", "bgen"] - ) - def test_multiallelic_handshake_returns_errno( - self, fx_multiallelic_vcz, tmp_path, spec - ): - sock_path = tmp_path / "encoder.sock" - listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - listener.bind(str(sock_path)) - listener.listen(8) - parent_stop, child_stop = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) - ctx = mp.get_context("spawn") - opts = ( - vcztools.ViewPlinkOptions() - if spec.name == "plink" - else vcztools.ViewBgenOptions() - ) - proc = ctx.Process( - target=encoder_server._server_main, - args=( - listener, - child_stop, - str(fx_multiallelic_vcz.path), - spec, - opts, - ), - ) - proc.start() - listener.close() - child_stop.close() - parent_stop_closed = False - try: - client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - client_sock.settimeout(30) - deadline = time.monotonic() + 10.0 - while True: - try: - client_sock.connect(str(sock_path)) - break - except (FileNotFoundError, ConnectionRefusedError): - if time.monotonic() > deadline: - raise - time.sleep(0.05) - client_sock.sendall(encoder_protocol.pack_get_metadata_request()) - status_buf = _recv_exact(client_sock, encoder_protocol.REPLY_STATUS_SIZE) - status = encoder_protocol.parse_status(status_buf) - assert status < 0, f"expected errno reply, got status={status}" - assert -status == errno.EIO - client_sock.close() - parent_stop.close() - parent_stop_closed = True - proc.join(timeout=10) - assert not proc.is_alive(), "subprocess did not exit after stop signal" - assert proc.exitcode == 0, ( - f"subprocess exited with {proc.exitcode}; expected 0" - ) - finally: - if not parent_stop_closed: - parent_stop.close() - if proc.is_alive(): - proc.terminate() - proc.join(timeout=5) - try: - os.unlink(sock_path) - except OSError: - pass - - -class TestServerMainSmoke: - """End-to-end check that ``_server_main`` runs in a real subprocess.""" - - @pytest.mark.parametrize( - "spec", [formats.PLINK_SPEC, formats.BGEN_SPEC], ids=["plink", "bgen"] - ) - def test_spawn_metadata_handshake(self, fx_small_vcz, tmp_path, spec): - sock_path = tmp_path / "encoder.sock" - listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - listener.bind(str(sock_path)) - listener.listen(8) - parent_stop, child_stop = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) - ctx = mp.get_context("spawn") - opts = ( - vcztools.ViewPlinkOptions() - if spec.name == "plink" - else vcztools.ViewBgenOptions() - ) - proc = ctx.Process( - target=encoder_server._server_main, - args=( - listener, - child_stop, - str(fx_small_vcz.path), - spec, - opts, - ), - ) - proc.start() - listener.close() - child_stop.close() - try: - # Connect with a brief retry while the server's accept loop - # spins up. - deadline = time.monotonic() + 10 - client_sock = None - while time.monotonic() < deadline: - try: - client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - client_sock.connect(str(sock_path)) - break - except OSError: - if client_sock is not None: - client_sock.close() - client_sock = None - time.sleep(0.05) - assert client_sock is not None, "could not connect to spawned server" - try: - client_sock.sendall(encoder_protocol.pack_get_metadata_request()) - status, body = _read_status_and_body(client_sock) - assert status > 0 - n_static, stream_size = encoder_protocol.parse_metadata_prefix( - body[: encoder_protocol.META_PREFIX_SIZE] - ) - assert n_static == len(spec.static_suffixes(opts)) - assert stream_size > 0 - finally: - client_sock.close() - finally: - parent_stop.close() - proc.join(timeout=10) - if proc.is_alive(): - proc.terminate() - proc.join(timeout=5) - assert not proc.is_alive() - assert proc.exitcode == 0 - try: - os.unlink(sock_path) - except OSError: - pass diff --git a/tests/test_formats.py b/tests/test_formats.py index 96ecb05..6beb876 100644 --- a/tests/test_formats.py +++ b/tests/test_formats.py @@ -1,6 +1,6 @@ """Tests for biofuse.formats — the per-format spec table. -Pins the duck-typed contract that ``encoder_server`` / ``encoder_ops`` +Pins the duck-typed contract that ``encoder_host`` / ``encoder_ops`` rely on: each spec produces the expected static sidecar bytes for the shared :class:`VczReader` under a given options dataclass, and each ``encoder_factory`` yields an encoder whose ``total_size`` reflects diff --git a/tests/test_plink_apps.py b/tests/test_plink_apps.py index 1317118..6475733 100644 --- a/tests/test_plink_apps.py +++ b/tests/test_plink_apps.py @@ -27,7 +27,7 @@ import vcztools from vcztools.plink import write_plink -from biofuse import access_log, encoder_client, encoder_ops, formats, fuse_adapter +from biofuse import access_log, encoder_host, encoder_ops, formats, fuse_adapter def _open_reader(path) -> object: @@ -74,12 +74,13 @@ async def fx_mounted_plink(tmp_path, fx_medium_vcz): write_plink(_open_reader(fx_medium_vcz.path), golden / "medium") log = access_log.AccessLogger() - sock_path = tmp_path / "plink.sock" - async with await encoder_client.EncoderClient.start( - str(fx_medium_vcz.path), sock_path, formats.PLINK_SPEC - ) as client: + async with await encoder_host.EncoderHost.start( + str(fx_medium_vcz.path), + formats.PLINK_SPEC, + opts=vcztools.ViewPlinkOptions(), + ) as host: ops = encoder_ops.EncoderOps( - client, "medium", formats.PLINK_SPEC, access_logger=log + host, "medium", formats.PLINK_SPEC, access_logger=log ) async with fuse_adapter.mount(ops, str(mnt)): await _wait_for_mount(mnt) @@ -102,11 +103,12 @@ async def _mount_plink(tmp_path, vcz): mnt = tmp_path / "mnt" mnt.mkdir() basename = vcz.path.stem - sock_path = tmp_path / "plink.sock" - async with await encoder_client.EncoderClient.start( - str(vcz.path), sock_path, formats.PLINK_SPEC - ) as client: - ops = encoder_ops.EncoderOps(client, basename, formats.PLINK_SPEC) + async with await encoder_host.EncoderHost.start( + str(vcz.path), + formats.PLINK_SPEC, + opts=vcztools.ViewPlinkOptions(), + ) as host: + ops = encoder_ops.EncoderOps(host, basename, formats.PLINK_SPEC) async with fuse_adapter.mount(ops, str(mnt)): await _wait_for_mount(mnt) yield mnt, basename From 540f11f349b4d7abd6b979497576f355d493ffd8 Mon Sep 17 00:00:00 2001 From: Jerome Kelleher Date: Sun, 17 May 2026 12:23:39 +0100 Subject: [PATCH 2/2] Add bulk-data runner cross-validating mount bytes vs. encoders For both plink and bgen, mount the fixture VCZ via biofuse and verify the first 100 MB of the streaming file matches the bytes produced by BedEncoder / BgenEncoder run directly in-process. --- fs_tests/README.md | 9 ++ fs_tests/harness/bulk_data_runner.py | 189 +++++++++++++++++++++++++++ fs_tests/harness/cli.py | 19 ++- fs_tests/harness/mount.py | 9 +- 4 files changed, 222 insertions(+), 4 deletions(-) create mode 100644 fs_tests/harness/bulk_data_runner.py diff --git a/fs_tests/README.md b/fs_tests/README.md index 966daa2..a50f40e 100644 --- a/fs_tests/README.md +++ b/fs_tests/README.md @@ -9,6 +9,7 @@ every PR. | Category | Runner | Approx runtime | |---|---|---| | POSIX syscall semantics | native Python | ~10 s | +| Bulk-data cross-validation | native Python | ~15 s | | pjdfstest curated subset | external (`pjdfstest`) | ~30–60 s | | Read-pattern stress | external (`fio`) | ~4 min | | Read cross-validation | native Python (fsx-style) | ~30 s | @@ -125,6 +126,14 @@ that mount-level operations and static-file reads stay responsive even while the streaming file is saturated. The streaming file itself is *not* probed: that is the load. +**Bulk-data cross-validation.** The `bulk-data` runner mounts the +fixture VCZ once as plink and once as BGEN, reads up to 100 MB from +each streaming file (`.bed` / `.bgen`) on the mount, and compares the +bytes against the same prefix produced by `vcztools.BedEncoder` / +`vcztools.BgenEncoder` run directly in-process. The cap keeps memory +bounded when the encoder's `total_size` exceeds 100 MB; smaller +streams are compared in full. + **fsx is read-only mode only.** Apple/LTP/xfstests fsx all assume a writable filesystem (they bootstrap the in-memory model by writing to the file under test). None of them run unmodified against a read-only diff --git a/fs_tests/harness/bulk_data_runner.py b/fs_tests/harness/bulk_data_runner.py new file mode 100644 index 0000000..38e0eec --- /dev/null +++ b/fs_tests/harness/bulk_data_runner.py @@ -0,0 +1,189 @@ +"""Bulk-data cross-validation: mount bytes vs. encoder-direct bytes. + +For both formats (plink, bgen), this runner: + +1. Builds the streaming encoder (``BedEncoder`` / ``BgenEncoder``) + directly against the fixture VCZ, and reads the first 100 MB as the + oracle bytes. +2. Mounts the same VCZ via ``biofuse mount-`` and reads the + first 100 MB of the streaming file from the mountpoint. +3. Asserts the byte streams are identical. + +If the encoder's ``total_size`` is below 100 MB, the read length is +clipped to ``total_size`` for both sides — only the leading +``min(total_size, 100 MB)`` of the oracle is materialised, keeping +memory bounded. +""" + +import logging +import pathlib +import time + +import vcztools + +from . import fixtures, tools +from . import mount as mount_mod + +logger = logging.getLogger(__name__) + +CAP_BYTES = 100 * 1024 * 1024 + + +def _build_encoder(format_name: str, vcz_path: pathlib.Path): + if format_name == "plink": + opts = vcztools.ViewPlinkOptions() + reader = opts.make_reader(str(vcz_path)) + return vcztools.BedEncoder(reader) + if format_name == "bgen": + opts = vcztools.ViewBgenOptions() + reader = opts.make_reader(str(vcz_path)) + return vcztools.BgenEncoder(reader) + raise ValueError(f"unknown format {format_name!r}") + + +def _read_mount_prefix(path: pathlib.Path, size: int) -> bytes: + """Read up to ``size`` bytes from ``path``, looping until EOF or + the cap is reached.""" + chunks: list[bytes] = [] + remaining = size + with open(path, "rb") as fh: + while remaining > 0: + chunk = fh.read(remaining) + if len(chunk) == 0: + break + chunks.append(chunk) + remaining -= len(chunk) + return b"".join(chunks) + + +def _check_one( + *, + format_name: str, + streaming_suffix: str, + vcz_path: pathlib.Path, + basename: str, + log_dir: pathlib.Path, +) -> tools.CheckResult: + started = time.monotonic() + log_lines: list[str] = [f"format={format_name} basename={basename}"] + + with _build_encoder(format_name, vcz_path) as encoder: + total_size = encoder.total_size + compare_size = min(total_size, CAP_BYTES) + log_lines.append( + f"encoder total_size={total_size} compare_size={compare_size} " + f"(cap={CAP_BYTES})" + ) + expected = encoder.read(0, compare_size) + + if len(expected) != compare_size: + log_lines.append( + f"encoder short read: got {len(expected)} expected {compare_size}" + ) + (log_dir / f"{format_name}.log").write_text("\n".join(log_lines) + "\n") + return tools.CheckResult( + name=f"bulk-data:{format_name}", + passed=False, + duration_s=time.monotonic() - started, + detail=f"encoder short read: {len(expected)}/{compare_size}", + ) + + mountpoint = log_dir / f"mnt-{format_name}" + with mount_mod.BiofuseMount( + str(vcz_path), + mountpoint, + format_name=format_name, + basename=basename, + log_path=log_dir / f"{format_name}-mount.log", + ) as mnt: + target = mnt / f"{basename}{streaming_suffix}" + if not target.exists(): + log_lines.append(f"target missing: {target}") + (log_dir / f"{format_name}.log").write_text("\n".join(log_lines) + "\n") + return tools.CheckResult( + name=f"bulk-data:{format_name}", + passed=False, + duration_s=time.monotonic() - started, + detail=f"target file missing: {target.name}", + ) + mount_size = target.stat().st_size + log_lines.append(f"mount file size={mount_size}") + if mount_size != total_size: + (log_dir / f"{format_name}.log").write_text("\n".join(log_lines) + "\n") + return tools.CheckResult( + name=f"bulk-data:{format_name}", + passed=False, + duration_s=time.monotonic() - started, + detail=f"size mismatch: encoder={total_size} mount={mount_size}", + ) + actual = _read_mount_prefix(target, compare_size) + + duration = time.monotonic() - started + + if len(actual) != compare_size: + log_lines.append(f"mount short read: {len(actual)}/{compare_size}") + (log_dir / f"{format_name}.log").write_text("\n".join(log_lines) + "\n") + return tools.CheckResult( + name=f"bulk-data:{format_name}", + passed=False, + duration_s=duration, + detail=f"mount short read: {len(actual)}/{compare_size}", + ) + + if actual != expected: + first_diff = next( + (i for i, (a, b) in enumerate(zip(actual, expected)) if a != b), + min(len(actual), len(expected)), + ) + log_lines.append(f"BYTE MISMATCH at offset 0x{first_diff:x}") + (log_dir / f"{format_name}.log").write_text("\n".join(log_lines) + "\n") + return tools.CheckResult( + name=f"bulk-data:{format_name}", + passed=False, + duration_s=duration, + detail=f"byte mismatch at offset 0x{first_diff:x}", + ) + + log_lines.append(f"OK: {compare_size} bytes match ({duration:.1f}s)") + (log_dir / f"{format_name}.log").write_text("\n".join(log_lines) + "\n") + logger.info( + "bulk-data %s: %d bytes match (%.1fs)", format_name, compare_size, duration + ) + return tools.CheckResult( + name=f"bulk-data:{format_name}", + passed=True, + duration_s=duration, + detail=f"compared {compare_size} bytes (encoder total_size={total_size})", + ) + + +def run(*, log_dir: pathlib.Path) -> tools.RunnerResult: + started = time.monotonic() + log_dir.mkdir(parents=True, exist_ok=True) + + spec = fixtures.MEDIUM + vcz_path = fixtures.get_or_build(spec) + + checks: list[tools.CheckResult] = [] + for format_name, streaming_suffix in (("plink", ".bed"), ("bgen", ".bgen")): + checks.append( + _check_one( + format_name=format_name, + streaming_suffix=streaming_suffix, + vcz_path=vcz_path, + basename=spec.name, + log_dir=log_dir, + ) + ) + + duration = time.monotonic() - started + return tools.RunnerResult( + runner="bulk-data", + passed=all(c.passed for c in checks), + duration_s=duration, + checks=checks, + summary=( + f"bulk-data cross-validation: encoder vs mount, " + f"cap={CAP_BYTES // (1024 * 1024)} MB" + ), + ) diff --git a/fs_tests/harness/cli.py b/fs_tests/harness/cli.py index 5268554..d29541e 100644 --- a/fs_tests/harness/cli.py +++ b/fs_tests/harness/cli.py @@ -9,6 +9,7 @@ import click from . import ( + bulk_data_runner, fio_runner, fsx_runner, lifecycle, @@ -227,6 +228,18 @@ def fsx_cmd(ctx: click.Context, ops: int, max_op_size: int) -> None: sys.exit(_emit(results_dir, [result])) +@main.command("bulk-data") +@click.pass_context +def bulk_data_cmd(ctx: click.Context) -> None: + """Cross-validate mount bytes against direct-encoder bytes (plink + bgen).""" + results_dir = ctx.obj["results_dir"] + log_dir = _runner_log_dir(results_dir, "bulk-data") + result = _run_with_banner( + "bulk-data", lambda: bulk_data_runner.run(log_dir=log_dir) + ) + sys.exit(_emit(results_dir, [result])) + + @main.command("lifecycle") @click.option("--iterations", type=int, default=50) @click.pass_context @@ -272,12 +285,16 @@ def all_cmd(ctx: click.Context) -> None: pjd_log_dir = _runner_log_dir(results_dir, "pjdfstest") fio_log_dir = _runner_log_dir(results_dir, "fio") fsx_log_dir = _runner_log_dir(results_dir, "fsx") + bulk_data_log_dir = _runner_log_dir(results_dir, "bulk-data") stress_log_dir = _runner_log_dir(results_dir, "stress-ng") lifecycle_log_dir = _runner_log_dir(results_dir, "lifecycle") liveness_log_dir = _runner_log_dir(results_dir, "active-under-stress") results: list[tools.RunnerResult] = [ - _run_with_banner("posix", lambda: posix.run(log_path=posix_log)) + _run_with_banner("posix", lambda: posix.run(log_path=posix_log)), + _run_with_banner( + "bulk-data", lambda: bulk_data_runner.run(log_dir=bulk_data_log_dir) + ), ] if not quick: results.append( diff --git a/fs_tests/harness/mount.py b/fs_tests/harness/mount.py index f01eb52..1652a85 100644 --- a/fs_tests/harness/mount.py +++ b/fs_tests/harness/mount.py @@ -47,11 +47,12 @@ def force_unmount(mountpoint: pathlib.Path) -> None: class BiofuseMount: - """Context manager that runs ``biofuse mount-plink`` in a subprocess. + """Context manager that runs ``biofuse mount-`` in a subprocess. Spawns the real CLI so the harness exercises the same code path a user hits. Each mount is its own subprocess, sidestepping pyfuse3's - one-mount-per-process limitation entirely. + one-mount-per-process limitation entirely. ``format_name`` selects + between ``mount-plink`` (default) and ``mount-bgen``. """ def __init__( @@ -59,6 +60,7 @@ def __init__( vcz_url: str, mountpoint: pathlib.Path, *, + format_name: str = "plink", basename: str | None = None, log_path: pathlib.Path | None = None, access_log_path: pathlib.Path | None = None, @@ -67,6 +69,7 @@ def __init__( ) -> None: self.vcz_url = vcz_url self.mountpoint = mountpoint + self.format_name = format_name self.basename = basename self.log_path = log_path self.access_log_path = access_log_path @@ -85,7 +88,7 @@ def _build_cmd(self) -> list[str]: ) cmd: list[str] = [ biofuse_bin, - "mount-plink", + f"mount-{self.format_name}", self.vcz_url, str(self.mountpoint), ]