diff --git a/biofuse/cli.py b/biofuse/cli.py index 80f7bb8..ccbdde9 100644 --- a/biofuse/cli.py +++ b/biofuse/cli.py @@ -4,14 +4,13 @@ import pathlib import signal import sys -import tempfile from functools import wraps import click import trio import vcztools -from biofuse import access_log, encoder_client, encoder_ops, formats, fuse_adapter +from biofuse import access_log, encoder_host, encoder_ops, formats, fuse_adapter logger = logging.getLogger(__name__) @@ -76,11 +75,12 @@ def biofuse_main(): def mount_plink(vcz_url, mount_dir, basename, access_log_path, **kwargs): """Mount a PLINK 1.9 view of VCZ_URL at MOUNT_DIR. - Spawns a plink-server subprocess that owns the ``VczReader`` and - serves ``.bed`` reads over an ``AF_UNIX`` socket. ``.bim`` and - ``.fam`` are precomputed once at mount time and held in the FUSE - process's memory; only ``.bed`` reads cross the wire. ``--no-bim`` - and ``--no-fam`` suppress the corresponding sidecar from the mount. + The FUSE handler owns the ``VczReader`` and one fresh + :class:`vcztools.BedEncoder` per open ``.bed`` fh; ``encoder.read`` + runs on worker threads so the pyfuse3 main loop stays responsive. + ``.bim`` and ``.fam`` are precomputed once at mount time and held + in memory; only ``.bed`` reads invoke the encoder. ``--no-bim`` and + ``--no-fam`` suppress the corresponding sidecar from the mount. The bcftools-view-style filter / backend / log options are inherited from ``vcztools view-plink``; see ``vcztools view-plink --help`` for the @@ -105,10 +105,11 @@ def mount_plink(vcz_url, mount_dir, basename, access_log_path, **kwargs): def mount_bgen(vcz_url, mount_dir, basename, access_log_path, **kwargs): """Mount an Oxford BGEN view of VCZ_URL at MOUNT_DIR. - Spawns a bgen-server subprocess that owns the ``VczReader`` and - serves ``.bgen`` reads over an ``AF_UNIX`` socket. ``.sample`` and - ``.bgen.bgi`` are precomputed once at mount time and held in the - FUSE process's memory; only ``.bgen`` reads cross the wire. + The FUSE handler owns the ``VczReader`` and one fresh + :class:`vcztools.BgenEncoder` per open ``.bgen`` fh; ``encoder.read`` + runs on worker threads so the pyfuse3 main loop stays responsive. + ``.sample`` and ``.bgen.bgi`` are precomputed once at mount time + and held in memory; only ``.bgen`` reads invoke the encoder. ``--no-sample-file`` and ``--no-bgi`` suppress the corresponding sidecar; ``--no-header-samples`` drops the sample identifiers from the ``.bgen`` header block. ``--unphased`` ignores the input's @@ -144,18 +145,15 @@ def _run_mount( log_path = pathlib.Path(access_log_path) if access_log_path is not None else None resolved_basename = basename if basename is not None else _default_basename(vcz_url) - with tempfile.TemporaryDirectory(prefix="biofuse-") as sock_dir: - sock_path = pathlib.Path(sock_dir) / f"{spec.name}.sock" - trio.run( - _amount, - spec, - vcz_url, - str(mount_dir_path), - resolved_basename, - opts, - log_path, - sock_path, - ) + trio.run( + _amount, + spec, + vcz_url, + str(mount_dir_path), + resolved_basename, + opts, + log_path, + ) async def _amount( @@ -165,17 +163,15 @@ async def _amount( basename: str, opts, log_path: pathlib.Path | None, - sock_path: pathlib.Path, ) -> None: - async with await encoder_client.EncoderClient.start( + async with await encoder_host.EncoderHost.start( vcz_url, - sock_path, spec, opts=opts, - ) as client: + ) as host: with access_log.AccessLogger(log_path) as access_logger: ops = encoder_ops.EncoderOps( - client, basename, spec, access_logger=access_logger + host, basename, spec, access_logger=access_logger ) async with fuse_adapter.mount(ops, mount_dir): click.echo(f"mounted at {mount_dir}", err=True) diff --git a/biofuse/encoder_client.py b/biofuse/encoder_client.py deleted file mode 100644 index 27d39a5..0000000 --- a/biofuse/encoder_client.py +++ /dev/null @@ -1,391 +0,0 @@ -"""Parent-side async client for the encoder-server subprocess. - -:class:`EncoderClient` connects briefly at startup to fetch the -precomputed static-sidecar bytes (in the format spec's declared -order) and the streaming-file size, then for each streaming-file -``open()`` from the FUSE layer it spins up a fresh -:class:`StreamConnection` over a new ``AF_UNIX`` socket. Each -:class:`StreamConnection` is its own conversation with its own -server-side thread and encoder. -""" - -import errno -import logging -import multiprocessing as mp -import pathlib -import socket -import time -from collections.abc import Callable - -import trio -import vcztools - -from biofuse import encoder_protocol, encoder_server, formats - -logger = logging.getLogger(__name__) - - -_SHUTDOWN_GRACE_SECONDS = 5.0 -_CONNECT_RETRY_SLEEP_S = 0.05 -_CONNECT_DEADLINE_S = 10.0 - -# Per-operation deadlines for the parent → server protocol. The FUSE -# handler must never await indefinitely on the worker; on expiry we -# surface an ``OSError`` to the FUSE layer so the kernel sees a real -# I/O error and unblocks the consumer's syscall instead of pinning it -# in uninterruptible sleep. -_REQUEST_TIMEOUT_S = 30.0 -_OPEN_TIMEOUT_S = 5.0 -_ACLOSE_TIMEOUT_S = 2.0 - - -class StreamConnection: - """One streaming-file reader: a dedicated socket to the encoder-server. - - Reads on the same connection are serialised via an internal - ``trio.Lock`` because the wire protocol is request/reply - synchronous on a single socket. Different :class:`StreamConnection` - instances are fully independent — they live in different server - threads and do not contend with each other. - """ - - def __init__( - self, - stream: trio.SocketStream, - *, - on_aclose: Callable[[float, float], None] | None = None, - ) -> None: - self._stream = stream - self._lock = trio.Lock() - self._closed = False - self._on_aclose = on_aclose - - async def read(self, off: int, size: int) -> bytes: - if self._closed: - raise OSError(errno.EIO, "stream connection is closed") - request = encoder_protocol.pack_read_request(off, size) - with trio.move_on_after(_REQUEST_TIMEOUT_S) as cs: - async with self._lock: - if self._closed: - raise OSError(errno.EIO, "stream connection is closed") - await self._stream.send_all(request) - status_buf = await _recv_exact( - self._stream, encoder_protocol.REPLY_STATUS_SIZE - ) - status = encoder_protocol.parse_status(status_buf) - if status < 0: - raise encoder_protocol.status_to_error(status) - if status == 0: - return b"" - return await _recv_exact(self._stream, status) - # Reached only if ``move_on_after`` caught a Cancelled — the - # inner block always returns or raises through. Mark the - # connection dead so other tasks queued on ``self._lock`` wake - # to an immediate EIO instead of repeating the wait against a - # known-broken socket. - if not cs.cancelled_caught: # pragma: no cover - defensive - raise RuntimeError("encoder-server read fall-through") - self._closed = True - with trio.CancelScope(shield=True): - with trio.move_on_after(_ACLOSE_TIMEOUT_S): - try: - await self._stream.aclose() - except (trio.BrokenResourceError, OSError) as exc: - logger.debug("aclose after timeout raised: %s", exc) - raise OSError(errno.EIO, "encoder-server request timed out") - - async def aclose(self) -> None: - if self._closed: - return - self._closed = True - t_start = time.monotonic() - with trio.CancelScope(shield=True): - with trio.move_on_after(_ACLOSE_TIMEOUT_S) as cs: - try: - await self._stream.send_eof() - except ( - trio.ClosedResourceError, - trio.BrokenResourceError, - OSError, - ) as exc: - logger.debug("send_eof on stream connection raised: %s", exc) - await self._stream.aclose() - if cs.cancelled_caught: - logger.debug( - "stream connection aclose timed out after %.1fs", - _ACLOSE_TIMEOUT_S, - ) - if self._on_aclose is not None: - try: - self._on_aclose(t_start, time.monotonic()) - except Exception as exc: # noqa: BLE001 - never let logging blow up cleanup - logger.debug("on_aclose hook raised: %s", exc) - - -class EncoderClient: - """Parent-side client for one mounted encoder-server subprocess. - - Construct via :meth:`EncoderClient.start` (an async classmethod - that spawns the server, runs the metadata handshake, and returns - the ready client). The instance is also an async context manager; - ``aclose()`` signals the server to stop, joins the subprocess, - and unlinks the listener socket file. - - Attributes populated by the handshake: - - - ``static_files``: dict mapping each suffix returned by - ``spec.static_suffixes(opts)`` to its precomputed bytes. - - ``stream_size``: total byte size of the streaming file. - """ - - def __init__(self, spec: formats.FormatSpec, opts) -> None: - self.spec = spec - self.opts = opts - self.static_files: dict[str, bytes] = {} - self.stream_size: int = 0 - self._proc: mp.process.BaseProcess | None = None - self._socket_path: pathlib.Path | None = None - self._stop_sock: socket.socket | None = None - self._closed = False - - @classmethod - async def start( - cls, - vcz_url: str, - socket_path: pathlib.Path, - spec: formats.FormatSpec, - *, - opts=None, - ) -> "EncoderClient": - """Spawn the server, run the metadata handshake, return client. - - The parent creates the listener and the stop-signal socketpair - itself, then hands both to the child. Multiprocessing's socket - reduction dups the fds across the spawn boundary; the parent - closes its own copies once the child has started. - - ``opts`` is the ``vcztools.ViewPlinkOptions`` (for - ``spec.name == "plink"``) or ``vcztools.ViewBgenOptions`` - dataclass parsed from the CLI. It carries the bcftools-style - filtering options forwarded to ``opts.make_reader`` in the - worker, plus the per-mount feature toggles (``no_bim``, - ``no_bgi``, …) and the log-level / log-file settings the - worker applies on startup. Defaults to a fresh instance of the - format's options dataclass (every field at its dataclass - default) when ``None``. - """ - if opts is None: - if spec.name == "plink": - opts = vcztools.ViewPlinkOptions() - else: - opts = vcztools.ViewBgenOptions() - socket_path = pathlib.Path(socket_path) - socket_path.parent.mkdir(parents=True, exist_ok=True) - if socket_path.exists(): - socket_path.unlink() - listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - listener.bind(str(socket_path)) - listener.listen(64) - parent_stop, child_stop = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) - ctx = mp.get_context("spawn") - proc: mp.process.BaseProcess = ctx.Process( - target=encoder_server._server_main, - args=(listener, child_stop, vcz_url, spec, opts), - name=f"biofuse-{spec.name}-server", - ) - try: - proc.start() - finally: - listener.close() - child_stop.close() - - self = cls(spec, opts) - self._proc = proc - self._socket_path = socket_path - self._stop_sock = parent_stop - try: - await self._handshake() - except BaseException as exc: - # If the subprocess has exited on its own during startup - # the handshake failure is a downstream symptom (refused - # connect, RST mid frame, premature EOF). Surface a single - # clean OSError so the CLI prints one error line and - # points the user at the server's own log. Check - # ``is_alive`` before ``aclose`` — the latter signals stop - # and joins the child, after which ``exitcode`` is set - # regardless of why the child exited. - child_died_on_own = not proc.is_alive() - await self.aclose() - if child_died_on_own and not isinstance(exc, KeyboardInterrupt): - raise OSError( - errno.EIO, - f"{spec.name}-server exited during startup; " - "see log above for details", - ) from exc - raise - return self - - async def __aenter__(self) -> "EncoderClient": - return self - - async def __aexit__(self, exc_type, exc, tb) -> None: - await self.aclose() - - async def open_stream( - self, - *, - on_aclose: Callable[[float, float], None] | None = None, - ) -> StreamConnection: - """Open a new dedicated socket for one streaming-file reader.""" - stream = await self._connect_stream() - return StreamConnection(stream, on_aclose=on_aclose) - - async def aclose(self) -> None: - """Tear down the server. Idempotent. - - Closes the parent end of the stop-signal socketpair so the - server's accept loop wakes via ``select`` and exits, joins - the subprocess (escalating to SIGTERM / SIGKILL after - ``_SHUTDOWN_GRACE_SECONDS``), and unlinks the listener path. - """ - if self._closed: - return - self._closed = True - if self._stop_sock is not None: - try: - self._stop_sock.close() - except OSError: - pass - self._stop_sock = None - if self._proc is not None: - await trio.to_thread.run_sync(self._sync_join_proc) - if self._socket_path is not None and self._socket_path.exists(): - try: - self._socket_path.unlink() - except OSError as exc: - logger.debug("unlink socket path raised: %s", exc) - - # -- internals ------------------------------------------------------ - - async def _handshake(self) -> None: - stream = await self._connect_stream(retry_until_listening=True) - try: - await stream.send_all(encoder_protocol.pack_get_metadata_request()) - status_buf = await _recv_exact(stream, encoder_protocol.REPLY_STATUS_SIZE) - status = encoder_protocol.parse_status(status_buf) - if status < 0: - raise encoder_protocol.status_to_error(status) - prefix = await _recv_exact(stream, encoder_protocol.META_PREFIX_SIZE) - n_static, stream_size = encoder_protocol.parse_metadata_prefix(prefix) - expected_suffixes = self.spec.static_suffixes(self.opts) - if n_static != len(expected_suffixes): - raise OSError( - errno.EIO, - f"{self.spec.name}-server reported {n_static} static files; " - f"client expected {len(expected_suffixes)}", - ) - sizes_buf = await _recv_exact( - stream, n_static * encoder_protocol.META_SIZE_ENTRY_SIZE - ) - sizes = encoder_protocol.parse_static_sizes(sizes_buf, n_static) - self.static_files = { - suffix: await _recv_exact(stream, size) - for suffix, size in zip(expected_suffixes, sizes, strict=True) - } - self.stream_size = stream_size - finally: - try: - await stream.send_eof() - except ( - trio.ClosedResourceError, - trio.BrokenResourceError, - OSError, - ): - pass - await stream.aclose() - - async def _connect_stream( - self, *, retry_until_listening: bool = False - ) -> trio.SocketStream: - """Open a fresh ``AF_UNIX`` connection to the server. - - With ``retry_until_listening=True`` the call retries briefly - while the child is still bringing up its accept loop, bounded - by ``_CONNECT_DEADLINE_S``. If the child has already exited - (typically due to a startup failure that ``_server_main`` - caught and logged) the retry loop bails out immediately - instead of waiting out the deadline. - """ - assert self._socket_path is not None - path = str(self._socket_path) - deadline = trio.current_time() + _CONNECT_DEADLINE_S - last_exc: BaseException | None = None - while True: - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.setblocking(False) - trio_sock = trio.socket.from_stdlib_socket(sock) - try: - with trio.fail_after(_OPEN_TIMEOUT_S): - await trio_sock.connect(path) - return trio.SocketStream(trio_sock) - except trio.TooSlowError as exc: - trio_sock.close() - raise OSError( - errno.EIO, - f"encoder-server connect timed out after {_OPEN_TIMEOUT_S:.1f}s", - ) from exc - except (FileNotFoundError, ConnectionRefusedError, OSError) as exc: - trio_sock.close() - last_exc = exc - if not retry_until_listening: - raise - if self._proc is not None and not self._proc.is_alive(): - raise OSError( - errno.EIO, - f"{self.spec.name}-server exited during startup; " - "see log above for details", - ) from last_exc - if trio.current_time() > deadline: - raise OSError( - errno.EIO, - f"{self.spec.name}-server not listening at {path}: {exc}", - ) from last_exc - await trio.sleep(_CONNECT_RETRY_SLEEP_S) - - def _sync_join_proc(self) -> None: - proc = self._proc - if proc is None: - return - if proc.is_alive(): - proc.join(timeout=_SHUTDOWN_GRACE_SECONDS) - if proc.is_alive(): - logger.warning("%s-server did not exit; sending SIGTERM", self.spec.name) - proc.terminate() - proc.join(timeout=_SHUTDOWN_GRACE_SECONDS) - if proc.is_alive(): - logger.warning( - "%s-server still alive after SIGTERM; killing", self.spec.name - ) - proc.kill() - proc.join(timeout=_SHUTDOWN_GRACE_SECONDS) - - -async def _recv_exact(stream: trio.SocketStream, n: int) -> bytes: - """Read exactly ``n`` bytes off ``stream``, with a guaranteed checkpoint. - - Raises :class:`OSError(EIO)` on EOF, which the caller surfaces to - the FUSE layer as ``FUSEError(EIO)``. - """ - if n == 0: - await trio.lowlevel.checkpoint() - return b"" - buf = bytearray(n) - view = memoryview(buf) - got = 0 - while got < n: - chunk = await stream.receive_some(n - got) - if len(chunk) == 0: - raise OSError(errno.EIO, "encoder-server closed socket") - view[got : got + len(chunk)] = chunk - got += len(chunk) - return bytes(buf) diff --git a/biofuse/encoder_host.py b/biofuse/encoder_host.py new file mode 100644 index 0000000..e9f5f13 --- /dev/null +++ b/biofuse/encoder_host.py @@ -0,0 +1,262 @@ +"""In-process host for the per-format encoders. + +Replaces the previous ``encoder_client`` / ``encoder_server`` / +``encoder_protocol`` stack: the ``VczReader`` and the per-fh +:class:`~vcztools.format_encoder.FormatEncoder` instances live in the +FUSE handler process. Heavy blocking work (encoder construction, +``encoder.read``, ``encoder.close``, and reader teardown) is dispatched +to worker threads via :func:`trio.to_thread.run_sync` so the pyfuse3 +trio task is free to schedule other FUSE requests. + +:class:`EncoderHost` mirrors the public surface the FUSE Operations +layer expects (``static_files``, ``stream_size``, ``open_stream`` -> +:class:`StreamHandle`) so :class:`biofuse.encoder_ops.EncoderOps` is +agnostic to whether the encoder lives in-process or out-of-process. + +Per-read timeout handling matters because the trio thread cannot kill +a worker thread. The read path uses ``abandon_on_cancel=True`` so the +trio task wakes immediately on timeout, but the worker thread keeps +running ``encoder.read`` to completion. :class:`StreamHandle` tracks +the thread via a ``threading.Event`` that the worker itself sets in a +``finally`` block, and :meth:`StreamHandle.aclose` drains that event +before calling ``encoder.close`` — concurrent close+read on the same +encoder is unsafe. +""" + +import errno +import logging +import threading +import time +from collections.abc import Callable + +import trio + +from biofuse import formats + +logger = logging.getLogger(__name__) + + +# Per-request deadline for ``encoder.read``. Slow reads under high I/O +# load do legitimately happen; surfacing EIO to the FUSE syscall is +# better than pinning the consumer in uninterruptible sleep. +_REQUEST_TIMEOUT_S = 30.0 + +# Outer deadline for ``StreamHandle.aclose`` — covers draining any +# abandoned worker thread plus the ``encoder.close`` call. If the +# abandoned thread is permanently wedged we log a warning and leak the +# encoder rather than hang unmount indefinitely. +_ACLOSE_TIMEOUT_S = 2.0 + + +class StreamHandle: + """One streaming-file reader. + + Owns one :class:`~vcztools.format_encoder.FormatEncoder`. Reads + serialise on an internal ``trio.Lock`` — :class:`FormatEncoder` + mutates iterator state in place, so at most one worker thread may + enter ``encoder.read`` at a time. ``read`` and ``aclose`` + off-thread the blocking calls via :func:`trio.to_thread.run_sync`. + + On a per-read timeout the handle is marked closed and subsequent + reads return ``OSError(EIO)`` immediately; the abandoned worker + thread eventually returns, at which point :meth:`aclose` may close + the encoder. + """ + + def __init__( + self, + encoder, + *, + on_aclose: Callable[[float, float], None] | None = None, + ) -> None: + self._encoder = encoder + self._lock = trio.Lock() + self._closed = False + self._aclose_called = False + self._on_aclose = on_aclose + # Cleared on read entry, set by the worker thread on return. + # Used by ``aclose`` to drain any abandoned worker before + # closing the encoder. Initial state = idle. + self._thread_done = threading.Event() + self._thread_done.set() + + async def read(self, off: int, size: int) -> bytes: + if self._closed: + raise OSError(errno.EIO, "stream handle is closed") + async with self._lock: + if self._closed: + raise OSError(errno.EIO, "stream handle is closed") + self._thread_done.clear() + encoder = self._encoder + thread_done = self._thread_done + + def call() -> bytes: + try: + return encoder.read(off, size) + finally: + thread_done.set() + + with trio.move_on_after(_REQUEST_TIMEOUT_S) as cs: + try: + return await trio.to_thread.run_sync(call, abandon_on_cancel=True) + except OSError: + raise + except Exception as exc: + # Non-OSError encoder failures (``ValueError`` for + # haploid PLINK, ``NotImplementedError`` for mixed + # ploidy BGEN, …) become ``OSError(EIO)`` so the + # FUSE layer surfaces a real I/O error to the + # kernel rather than crashing the trio task. + logger.error("encoder.read raised; converting to EIO: %s", exc) + logger.debug("encoder.read traceback", exc_info=True) + raise OSError(errno.EIO, f"encoder.read failed: {exc}") from exc + if cs.cancelled_caught: + # Mark dead so subsequent reads on this fh return EIO + # immediately rather than queueing behind a known-stuck + # worker. The abandoned thread will eventually set + # ``_thread_done``; ``aclose`` waits for it. + self._closed = True + raise OSError(errno.EIO, "encoder read timed out") + raise RuntimeError("encoder read fall-through") # pragma: no cover + + async def aclose(self) -> None: + if self._aclose_called: + return + self._aclose_called = True + self._closed = True + t_start = time.monotonic() + # Wait for any abandoned worker thread to finish before + # calling ``encoder.close``. The encoder's own + # ``ThreadPoolExecutor`` is shut down inside ``close``; + # in-flight tasks must drain first. ``Event.wait`` carries + # its own timeout because cancelling ``to_thread.run_sync`` + # from outside cannot interrupt a thread blocked on a + # ``threading.Event``. ``abandon_on_cancel=True`` shields the + # trio task from a still-running drain thread on shutdown. + drained = await trio.to_thread.run_sync( + self._thread_done.wait, + _ACLOSE_TIMEOUT_S, + abandon_on_cancel=True, + ) + if drained: + await trio.to_thread.run_sync(self._encoder.close, abandon_on_cancel=True) + else: + logger.warning( + "encoder did not finish draining within %.1fs; " + "leaking encoder and its thread pool", + _ACLOSE_TIMEOUT_S, + ) + if self._on_aclose is not None: + try: + self._on_aclose(t_start, time.monotonic()) + except Exception as exc: # noqa: BLE001 - never let logging blow up cleanup + logger.debug("on_aclose hook raised: %s", exc) + + +class EncoderHost: + """Parent-process host for one mounted view. + + Construct via :meth:`EncoderHost.start` — an async classmethod + that opens the reader, materialises the variant filter, and builds + the static-sidecar bytes on a worker thread, returning a ready + host. The instance is also an async context manager; ``aclose`` + closes the reader. + + Attributes populated by ``start``: + + - ``static_files``: dict mapping each suffix returned by + ``spec.static_suffixes(opts)`` to its precomputed bytes. + - ``stream_size``: total byte size of the streaming file. + """ + + def __init__(self, spec: formats.FormatSpec, opts) -> None: + self.spec = spec + self.opts = opts + self.static_files: dict[str, bytes] = {} + self.stream_size: int = 0 + self._reader = None + self._closed = False + + @classmethod + async def start( + cls, + vcz_url: str, + spec: formats.FormatSpec, + *, + opts, + ) -> "EncoderHost": + """Open the reader and build the static-sidecar bytes.""" + self = cls(spec, opts) + try: + await trio.to_thread.run_sync(self._sync_start, vcz_url) + except BaseException: + await self.aclose() + raise + return self + + def _sync_start(self, vcz_url: str) -> None: + reader = self.opts.make_reader(vcz_url) + try: + reader.materialise_variant_filter() + expected_suffixes = self.spec.static_suffixes(self.opts) + static_files = self.spec.build_static_files(reader, self.opts) + missing = set(expected_suffixes) - set(static_files) + extra = set(static_files) - set(expected_suffixes) + if missing or extra: + raise ValueError( + f"{self.spec.name}: build_static_files returned keys " + f"{sorted(static_files)}; expected {list(expected_suffixes)}" + ) + with self.spec.encoder_factory(reader, self.opts) as encoder: + stream_size = int(encoder.total_size) + except BaseException: + reader.__exit__(None, None, None) + raise + # Keep ordering in the static_files dict equal to the spec's + # declared order — the FUSE adapter does not depend on this + # but it keeps logs / diagnostics stable. + self.static_files = { + suffix: static_files[suffix] for suffix in expected_suffixes + } + self.stream_size = stream_size + self._reader = reader + + async def __aenter__(self) -> "EncoderHost": + return self + + async def __aexit__(self, exc_type, exc, tb) -> None: + await self.aclose() + + async def open_stream( + self, + *, + on_aclose: Callable[[float, float], None] | None = None, + ) -> StreamHandle: + """Construct a fresh encoder and wrap it in a :class:`StreamHandle`. + + Encoder construction runs in a worker thread because it + touches the reader's metadata caches and spins up a + ``ThreadPoolExecutor``. + """ + if self._reader is None: + raise OSError(errno.EIO, "encoder host is closed") + encoder = await trio.to_thread.run_sync(self._sync_open_encoder) + return StreamHandle(encoder, on_aclose=on_aclose) + + def _sync_open_encoder(self): + cm = self.spec.encoder_factory(self._reader, self.opts) + encoder = cm.__enter__() + # We rely on ``encoder.close`` (called by ``StreamHandle.aclose``) + # to release the encoder's resources; the context-manager + # protocol on ``FormatEncoder`` is a thin wrapper around it. + return encoder + + async def aclose(self) -> None: + """Close the reader. Idempotent.""" + if self._closed: + return + self._closed = True + reader = self._reader + self._reader = None + if reader is not None: + await trio.to_thread.run_sync(reader.__exit__, None, None, None) diff --git a/biofuse/encoder_ops.py b/biofuse/encoder_ops.py index c45eb3d..0e617a0 100644 --- a/biofuse/encoder_ops.py +++ b/biofuse/encoder_ops.py @@ -6,21 +6,18 @@ - the streaming file suffix (``.bed`` / ``.bgen``) and its ``streaming_kind`` dispatch key; - the static-sidecar suffixes (``.bim``/``.fam`` for PLINK, - ``.sample``/``.bgen.bgi`` for BGEN), served from cached bytes the - client fetched once at mount time. - -The static-sidecar bytes are fetched once at mount time and held in -memory by the FUSE adapter; reads against them are served directly -from the cached bytes without crossing process boundaries. - -Each streaming-file ``open()`` from the kernel allocates a fresh -:class:`biofuse.encoder_client.StreamConnection` — a dedicated socket -to the encoder-server subprocess, where one server thread with one -encoder runs synchronously for the lifetime of the connection. -``read()`` on that fh forwards straight to the per-fh socket; the + ``.sample``/``.bgen.bgi`` for BGEN), built once at mount time and + held in the host's memory. + +Static-sidecar reads are served directly from the cached bytes. Each +streaming-file ``open()`` from the kernel allocates a fresh +:class:`biofuse.encoder_host.StreamHandle` — a dedicated +:class:`~vcztools.format_encoder.FormatEncoder` whose blocking work +runs on worker threads via :func:`trio.to_thread.run_sync`, leaving +the pyfuse3 trio task free for other FUSE requests. ``read()`` on +that fh dispatches one ``encoder.read`` call per request; the kernel's parallel readahead requests on a single fh serialise on the -``StreamConnection``'s internal lock. ``release()`` closes the socket; -the server thread sees EOF and exits. +``StreamHandle``'s internal lock. ``release()`` closes the encoder. """ import errno @@ -53,23 +50,23 @@ _STATIC_KIND = "static" -class _StreamConnectionProto(Protocol): +class _StreamHandleProto(Protocol): async def read(self, off: int, size: int) -> bytes: ... async def aclose(self) -> None: ... -class EncoderClientProto(Protocol): - """The slice of :class:`biofuse.encoder_client.EncoderClient` that +class EncoderHostProto(Protocol): + """The slice of :class:`biofuse.encoder_host.EncoderHost` that :class:`EncoderOps` depends on. - Defined as a Protocol so tests can inject a fake without spawning - a subprocess. + Defined as a Protocol so tests can inject a fake without opening + a real reader. """ static_files: dict[str, bytes] stream_size: int - async def open_stream(self) -> _StreamConnectionProto: ... + async def open_stream(self) -> _StreamHandleProto: ... class EncoderOps(pyfuse3.Operations): @@ -77,20 +74,20 @@ class EncoderOps(pyfuse3.Operations): Parameters ---------- - client - An :class:`EncoderClient` already past its metadata handshake - (or any object satisfying :class:`EncoderClientProto`). The - caller owns the client's lifetime. + host + An :class:`~biofuse.encoder_host.EncoderHost` whose ``start`` + has populated ``static_files`` and ``stream_size`` (or any + object satisfying :class:`EncoderHostProto`). The caller owns + the host's lifetime. basename Stem used for the exposed files: ``{basename}{spec.streaming_suffix}`` - plus one ``{basename}{suffix}`` per entry of the static-files - dict the client received during the metadata handshake. + plus one ``{basename}{suffix}`` per entry of ``host.static_files``. spec The active :class:`~biofuse.formats.FormatSpec`. Its ``streaming_suffix`` is exposed as the streaming filename; ``streaming_kind`` is the dispatch key for read routing. - The set of static suffixes is read off ``client.static_files``, - which is the post-options filtered set the server actually + The set of static suffixes is read off ``host.static_files``, + which is the post-options filtered set the host actually produced. max_open_stream Maximum number of concurrent open streaming-file fhs. New @@ -106,7 +103,7 @@ class EncoderOps(pyfuse3.Operations): def __init__( self, - client: EncoderClientProto, + host: EncoderHostProto, basename: str, spec: formats.FormatSpec, *, @@ -114,7 +111,7 @@ def __init__( access_logger: access_log_mod.AccessLogger | None = None, ) -> None: super().__init__() - self._client = client + self._host = host self._basename = basename self._spec = spec self._access_logger = access_logger @@ -127,13 +124,13 @@ def __init__( # the streaming file's kind is the spec's ``streaming_kind``; # all static files share the same ``_STATIC_KIND`` dispatch key. # ``_name_to_suffix`` maps each static filename to its suffix - # for lookups into ``client.static_files``. + # for lookups into ``host.static_files``. stream_name = f"{basename}{spec.streaming_suffix}" manifest: list[tuple[str, str, int]] = [ - (stream_name, spec.streaming_kind, client.stream_size) + (stream_name, spec.streaming_kind, host.stream_size) ] self._name_to_suffix: dict[str, str] = {} - for suffix, body in client.static_files.items(): + for suffix, body in host.static_files.items(): name = f"{basename}{suffix}" manifest.append((name, _STATIC_KIND, len(body))) self._name_to_suffix[name] = suffix @@ -153,7 +150,7 @@ def __init__( self._next_fh = 1 self._fh_to_kind: dict[int, str] = {} self._fh_to_name: dict[int, str] = {} - self._fh_to_conn: dict[int, _StreamConnectionProto] = {} + self._fh_to_conn: dict[int, _StreamHandleProto] = {} def _build_attrs(self, inode: int) -> pyfuse3.EntryAttributes: attrs = pyfuse3.EntryAttributes() @@ -234,7 +231,7 @@ async def open(self, inode, flags, ctx=None): self._record_event("limiter_wait", name, fh, t_limiter_start, t_limiter_end) try: on_aclose = self._make_aclose_recorder(name, fh) - conn = await self._client.open_stream(on_aclose=on_aclose) + conn = await self._host.open_stream(on_aclose=on_aclose) except OSError as exc: self._stream_limiter.release_on_behalf_of(fh) raise pyfuse3.FUSEError(exc.errno or errno.EIO) from exc @@ -272,7 +269,7 @@ async def read(self, fh, off, size): t_start = time.monotonic() if kind == _STATIC_KIND: suffix = self._name_to_suffix[name] - data = self._read_static(self._client.static_files[suffix], off, size) + data = self._read_static(self._host.static_files[suffix], off, size) elif kind == self._spec.streaming_kind: assert conn is not None try: diff --git a/biofuse/encoder_protocol.py b/biofuse/encoder_protocol.py deleted file mode 100644 index a6fd763..0000000 --- a/biofuse/encoder_protocol.py +++ /dev/null @@ -1,132 +0,0 @@ -"""Wire protocol for the encoder-server subprocess. - -A minimal request/reply protocol exchanged over a connected -``AF_UNIX`` ``SOCK_STREAM`` socket. Each socket carries one -synchronous conversation: the parent sends a request, awaits the -reply, then sends the next request. There is no ``seq`` id and no -``fh`` field — the *socket is the channel*. - -Two request shapes ------------------- - -``M`` (1 byte) — get metadata. No payload. - Reply body: ```` then - ``n_static`` × ```` then the concatenated static-file - bodies in the order declared by the format spec. ``status`` on - success is the total body length in bytes. - -``R off:Q size:Q`` (17 bytes) — read ``size`` bytes at ``off`` from - the connection's streaming file. Reply body is the data bytes; - ``status`` is the body length. - -Reply layout (common) ---------------------- - -```` followed by ``status`` bytes of body when -``status >= 0``. ``status < 0`` is an error reply: ``-status`` is a -POSIX errno and no body follows. -""" - -import errno -import struct - -TAG_GET_METADATA = b"M" -TAG_READ = b"R" - -_REQ_READ = struct.Struct("`` entries, then the concatenated static bodies. -_META_PREFIX = struct.Struct(" bytes: - return TAG_GET_METADATA - - -def pack_read_request(off: int, size: int) -> bytes: - return TAG_READ + _REQ_READ.pack(off, size) - - -def parse_read_payload(buf: bytes) -> tuple[int, int]: - """Decode the 16-byte READ payload into ``(off, size)``.""" - return _REQ_READ.unpack(buf) - - -# -- reply encoding ----------------------------------------------------- - - -def pack_metadata_reply(static_bodies: list[bytes], stream_size: int) -> bytes: - """Pack a variable-arity metadata reply. - - ``static_bodies`` is the per-static-file payload, in the spec's - declared order. The reply body is the prefix + per-static sizes + - the concatenated bodies; ``status`` is the body length. - """ - n_static = len(static_bodies) - prefix = _META_PREFIX.pack(n_static, stream_size) - sizes = b"".join(_META_SIZE_ENTRY.pack(len(b)) for b in static_bodies) - body = prefix + sizes + b"".join(static_bodies) - return _REPLY_STATUS.pack(len(body)) + body - - -def pack_read_reply(data: bytes) -> bytes: - return _REPLY_STATUS.pack(len(data)) + data - - -def pack_error_reply(err: int) -> bytes: - if err <= 0: - raise ValueError(f"err must be a positive errno (got {err})") - return _REPLY_STATUS.pack(-err) - - -# -- reply decoding ----------------------------------------------------- - - -def parse_status(buf: bytes) -> int: - (status,) = _REPLY_STATUS.unpack(buf) - return status - - -def parse_metadata_prefix(buf: bytes) -> tuple[int, int]: - """Decode the ```` prefix of a metadata reply.""" - return _META_PREFIX.unpack(buf) - - -def parse_static_sizes(buf: bytes, n_static: int) -> tuple[int, ...]: - """Decode the per-static-file size array following the prefix.""" - return tuple( - _META_SIZE_ENTRY.unpack_from(buf, offset=i * META_SIZE_ENTRY_SIZE)[0] - for i in range(n_static) - ) - - -# -- shared helpers ----------------------------------------------------- - - -def status_to_error(status: int) -> OSError: - """Build an ``OSError`` from a negative reply status.""" - if status >= 0: - raise ValueError(f"status must be negative (got {status})") - err = -status - return OSError(err, f"encoder-server reported errno {err}") - - -def errno_for_exception(exc: BaseException) -> int: - """Pick a positive errno for any exception raised in the server. - - ``OSError`` is honoured directly; everything else becomes ``EIO``. - """ - if isinstance(exc, OSError) and exc.errno is not None: - return exc.errno - return errno.EIO diff --git a/biofuse/encoder_server.py b/biofuse/encoder_server.py deleted file mode 100644 index 6c7c5cb..0000000 --- a/biofuse/encoder_server.py +++ /dev/null @@ -1,337 +0,0 @@ -"""Encoder-server subprocess. - -A standalone server that owns one ``VczReader`` and serves byte-range -requests for one output format (PLINK 1 binary / Oxford BGEN) over an -``AF_UNIX`` listening socket. The owning :class:`~biofuse.formats.FormatSpec` -selects: - -- which static sidecar files to build on demand for the metadata - handshake (``.bim``/``.fam`` for PLINK; ``.sample``/``.bgen.bgi`` - for BGEN), via :meth:`FormatSpec.build_static_files`; -- which encoder class to construct per accepted connection, via - :meth:`FormatSpec.encoder_factory`. - -Each accepted connection runs in its own daemon thread with its own -encoder, so different consumers of the same streaming file get -independent state and run concurrently. - -Wire protocol is :mod:`biofuse.encoder_protocol` — synchronous, -per-socket, no seq ids: the socket *is* the channel. Server threads -serve one request at a time on their socket and exit cleanly when the -parent half-closes. -""" - -import logging -import select -import socket -import threading -import time - -from biofuse import encoder_protocol, formats - -logger = logging.getLogger(__name__) - - -class _ServerSession: - """Server-side state shared across all connection threads. - - Holds the ``VczReader``, the active format spec, and the options - dataclass that selected this mount's feature flags - (``--no-bim``/``--no-bgi``/``--no-header-samples``/…). The static - sidecar bytes and the streaming-file size are built on demand - inside the metadata handshake (see ``_make_metadata_reply``) - rather than cached here — the parent fuse handler is the canonical - owner of that metadata after the handshake completes. Immutable - after construction; safe to read concurrently from any thread - without locking. - """ - - def __init__( - self, - reader, - spec: formats.FormatSpec, - opts, - ) -> None: - self.reader = reader - self.spec = spec - self.opts = opts - - -def _recv_exact_sync(sock: socket.socket, n: int) -> bytes: - """Read exactly ``n`` bytes off ``sock``. Returns ``b""`` on clean EOF. - - Raises :class:`EOFError` if EOF arrives mid-frame. - """ - if n == 0: - return b"" - buf = bytearray(n) - view = memoryview(buf) - got = 0 - while got < n: - chunk = sock.recv_into(view[got:], n - got) - if chunk == 0: - if got == 0: - return b"" - raise EOFError(f"socket closed mid-frame after {got}/{n} bytes") - got += chunk - return bytes(buf) - - -def _make_error_reply(exc: BaseException, context: str) -> bytes: - """Pack an errno reply for ``exc`` and log the cause. - - Non-``OSError`` causes log a one-line ERROR with the errno and - the exception text, plus a DEBUG-only traceback. Used by both - the encoder-construction failure path and the per-dispatch - exception handler. - """ - err = encoder_protocol.errno_for_exception(exc) - if not isinstance(exc, OSError): - logger.error("encoder-server %s; replying with errno %d: %s", context, err, exc) - logger.debug("encoder-server %s traceback", context, exc_info=True) - return encoder_protocol.pack_error_reply(err) - - -def _make_metadata_reply(session: "_ServerSession") -> bytes: - """Build the reply for a ``TAG_GET_METADATA`` request. - - The static sidecars and the streaming-file size are built on - demand from the format spec, serialised into the reply, then - dropped: the parent fuse handler owns the canonical copy after - the handshake. Encoder construction is I/O-free so reading - ``total_size`` here is cheap. - """ - expected_suffixes = session.spec.static_suffixes(session.opts) - static_files = session.spec.build_static_files(session.reader, session.opts) - missing = set(expected_suffixes) - set(static_files) - extra = set(static_files) - set(expected_suffixes) - if missing or extra: - raise ValueError( - f"{session.spec.name}: build_static_files returned keys " - f"{sorted(static_files)}; expected {list(expected_suffixes)}" - ) - with session.spec.encoder_factory(session.reader, session.opts) as encoder: - stream_size = int(encoder.total_size) - ordered_bodies = [static_files[suffix] for suffix in expected_suffixes] - return encoder_protocol.pack_metadata_reply(ordered_bodies, stream_size) - - -def _make_read_reply(conn_sock: socket.socket, encoder, tname: str) -> bytes | None: - """Build the reply for a ``TAG_READ`` request, or ``None`` on - truncated payload (caller terminates the connection).""" - payload = _recv_exact_sync(conn_sock, encoder_protocol.REQ_READ_PAYLOAD_SIZE) - if len(payload) < encoder_protocol.REQ_READ_PAYLOAD_SIZE: - return None - off, size = encoder_protocol.parse_read_payload(payload) - t_read = time.monotonic() - data = encoder.read(off, size) - logger.debug( - "%s: encoder.read off=%d size=%d in %.3fs", - tname, - off, - size, - time.monotonic() - t_read, - ) - return encoder_protocol.pack_read_reply(data) - - -def _send_reply(conn_sock: socket.socket, reply: bytes) -> bool: - """Send ``reply`` on the socket; return ``True`` on success and - ``False`` if the send raised ``OSError`` (already logged).""" - try: - conn_sock.sendall(reply) - return True - except OSError as exc: - logger.warning("encoder-server send failed: %s", exc) - return False - - -def _serve_connection( - conn_sock: socket.socket, - session: "_ServerSession", - encoder, - tname: str, -) -> None: - """Run the tag-dispatch loop for one connected client. - - Exits cleanly on EOF, unknown tag, truncated payload, or a send - failure. Exceptions raised inside a dispatch case are converted - to errno replies and the loop continues to the next request. - """ - while True: - try: - tag_buf = _recv_exact_sync(conn_sock, 1) - except EOFError as exc: - logger.warning("encoder-server frame error: %s", exc) - return - if len(tag_buf) == 0: - return - tag = bytes(tag_buf) - try: - if tag == encoder_protocol.TAG_GET_METADATA: - reply = _make_metadata_reply(session) - elif tag == encoder_protocol.TAG_READ: - reply = _make_read_reply(conn_sock, encoder, tname) - else: - logger.warning( - "encoder-server: unknown tag %r; closing connection", tag - ) - return - except Exception as exc: # noqa: BLE001 - any error becomes errno reply - reply = _make_error_reply(exc, "dispatch raised") - if reply is None: - return - if not _send_reply(conn_sock, reply): - return - - -def _handle_connection(conn_sock: socket.socket, session: _ServerSession) -> None: - """Run one client connection synchronously. Returns on EOF. - - A fresh encoder (PLINK ``BedEncoder`` / BGEN ``BgenEncoder``, - selected by the session's :class:`~biofuse.formats.FormatSpec`) is - constructed up front for every connection and its lifetime is bound - to the connection via a ``with`` block. If construction fails, an - errno reply is written to the socket so the client surfaces a real - ``OSError`` rather than an unexplained EOF. - """ - tname = threading.current_thread().name - logger.debug("%s: conn accepted", tname) - with conn_sock: - try: - t_enc = time.monotonic() - encoder_cm = session.spec.encoder_factory(session.reader, session.opts) - except Exception as exc: # noqa: BLE001 - any error becomes errno reply - _send_reply( - conn_sock, _make_error_reply(exc, "encoder construction failed") - ) - return - with encoder_cm as encoder: - logger.debug( - "%s: encoder created in %.3fs", tname, time.monotonic() - t_enc - ) - _serve_connection(conn_sock, session, encoder, tname) - logger.debug("%s: conn thread exit", tname) - - -def serve_forever( - listener_sock: socket.socket, - stop_sock: socket.socket, - session: _ServerSession, -) -> None: - """Accept loop. Spawns one daemon thread per connection. - - Returns when ``stop_sock`` becomes readable (the parent closed - its end of the socketpair). Closes ``listener_sock`` on the way - out. Connection threads are daemon threads; they exit on their - own once their client closes the socket. We do not join them on - shutdown — the process exit reaps them. - """ - try: - while True: - try: - readable, _, _ = select.select([listener_sock, stop_sock], [], []) - except OSError as exc: - logger.warning("encoder-server select failed: %s", exc) - return - if stop_sock in readable: - return - try: - conn_sock, _ = listener_sock.accept() - except OSError as exc: - logger.warning("encoder-server accept failed: %s", exc) - return - t = threading.Thread( - target=_handle_connection, - args=(conn_sock, session), - name=f"{session.spec.name}-conn", - daemon=True, - ) - try: - t.start() - except RuntimeError as exc: - # Per-process thread budget exhausted (e.g. cgroup pids - # limit). Decline this connection cleanly so the server - # stays alive; the client sees an EOF on the half-open - # socket and surfaces it as ``OSError`` to the FUSE - # layer. - logger.warning( - "encoder-server: thread.start() failed (%s); active=%d", - exc, - threading.active_count(), - ) - try: - conn_sock.close() - except OSError: - pass - continue - finally: - try: - listener_sock.close() - except OSError: - pass - - -def _server_main( - listener_sock: socket.socket, - stop_sock: socket.socket, - vcz_url: str, - spec: formats.FormatSpec, - opts, -) -> None: - """Subprocess entry point invoked via ``multiprocessing.Process``. - - The two sockets are handle-passed by ``multiprocessing`` (the - reduction machinery dups the fds into the child). The reader is - used as a context manager so its shared ``ThreadPoolExecutor`` - (one pool per reader, drawn on by every encoder / - ``ReadaheadPipeline``) is drained on the way out. - - ``opts`` is the ``vcztools.ViewPlinkOptions`` / ``vcztools.ViewBgenOptions`` - dataclass parsed from the CLI. Its nested ``log`` field configures - logging in the subprocess so its own ``logger.debug`` / ``logger.info`` - output reaches the same sink as the parent, and its ``make_reader`` - method opens ``vcz_url`` with the bcftools-style filtering options - (regions, samples, …) and the storage / readahead settings. - - Any exception raised before ``serve_forever`` starts (reader - construction, ``_ServerSession`` construction) is caught here so - multiprocessing's default handler does not print a traceback. The - cause is logged at ERROR (visible at default verbosity); the - traceback only surfaces at DEBUG. Static-sidecar build errors - (e.g. multi-allelic input rejection) now surface inside - ``_handle_connection`` on the first metadata request rather than - at session construction; the handler converts them to errno - replies so the parent sees a clean ``OSError`` from - ``_handshake``. - """ - opts.log.apply() - try: - with opts.make_reader(vcz_url) as reader: - # Bcftools-style filters (--max-alleles, --types, --include, - # …) configure a per-variant predicate via - # ``reader.set_variant_filter``; the format encoders refuse - # readers in that state. Resolve the predicate now into a - # fixed surviving-variant chunk plan so each connection's - # encoder sees a plain reader. No-op when no variant filter - # is configured. - reader.materialise_variant_filter() - session = _ServerSession(reader, spec, opts) - try: - serve_forever(listener_sock, stop_sock, session) - finally: - try: - stop_sock.close() - except OSError: - pass - except Exception as exc: # noqa: BLE001 - cleanly surface any startup failure - logger.error("%s-server startup failed: %s", spec.name, exc) - logger.debug("%s-server startup traceback", spec.name, exc_info=True) - try: - listener_sock.close() - except OSError: - pass - try: - stop_sock.close() - except OSError: - pass diff --git a/biofuse/formats.py b/biofuse/formats.py index f547761..00ebbd8 100644 --- a/biofuse/formats.py +++ b/biofuse/formats.py @@ -1,16 +1,16 @@ -"""Format specs for the encoder-server stack. +"""Format specs for the encoder-host stack. A :class:`FormatSpec` bundles everything the format-agnostic ``encoder_*`` modules need to serve one output format: - the suffix of the streaming file (``.bed`` / ``.bgen``), -- a callable returning the static-sidecar suffixes the parent should - expect for a given options dataclass, +- a callable returning the static-sidecar suffixes the host should + produce for a given options dataclass, - a builder that produces those static bytes from a ``VczReader`` plus options, - a factory that constructs one :class:`vcztools.BedEncoder` / - :class:`vcztools.BgenEncoder` per server-side connection, also - parameterised by the options dataclass. + :class:`vcztools.BgenEncoder` per streaming fh, also parameterised + by the options dataclass. Both :class:`vcztools.BedEncoder` and :class:`vcztools.BgenEncoder` extend :class:`vcztools.format_encoder.FormatEncoder`, so they share @@ -23,13 +23,13 @@ ``--no-fam`` suppress the corresponding PLINK sidecars, and ``--no-sample-file`` / ``--no-bgi`` suppress the BGEN sidecars. ``--no-header-samples`` flips ``embed_header_samples=False`` on the -``BgenEncoder`` so the per-connection ``.bgen`` stream omits the -sample identifiers from its header block. +``BgenEncoder`` so the per-fh ``.bgen`` stream omits the sample +identifiers from its header block. For BGEN the ``.bgen.bgi`` sidecar is a SQLite database that :func:`vcztools.write_bgi` writes to a filesystem path. We materialise -it to a tempfile at session-init time, read the bytes back, and hold -them in the server process's memory alongside the ``.sample`` text. +it to a tempfile at host startup, read the bytes back, and hold them +in the host's memory alongside the ``.sample`` text. """ import dataclasses @@ -43,7 +43,7 @@ @dataclasses.dataclass(frozen=True) class FormatSpec: - """One output format the encoder-server stack can serve.""" + """One output format the encoder-host stack can serve.""" name: str """Short identifier used in CLI / log lines (``"plink"`` / ``"bgen"``).""" @@ -69,7 +69,7 @@ class FormatSpec: encoder_factory: Callable """``(reader, opts) -> FormatEncoder``: construct one fresh encoder - for one server connection, parameterised by ``opts``.""" + for one streaming fh, parameterised by ``opts``.""" def _plink_static_suffixes(opts) -> tuple[str, ...]: diff --git a/fs_tests/README.md b/fs_tests/README.md index 966daa2..a50f40e 100644 --- a/fs_tests/README.md +++ b/fs_tests/README.md @@ -9,6 +9,7 @@ every PR. | Category | Runner | Approx runtime | |---|---|---| | POSIX syscall semantics | native Python | ~10 s | +| Bulk-data cross-validation | native Python | ~15 s | | pjdfstest curated subset | external (`pjdfstest`) | ~30–60 s | | Read-pattern stress | external (`fio`) | ~4 min | | Read cross-validation | native Python (fsx-style) | ~30 s | @@ -125,6 +126,14 @@ that mount-level operations and static-file reads stay responsive even while the streaming file is saturated. The streaming file itself is *not* probed: that is the load. +**Bulk-data cross-validation.** The `bulk-data` runner mounts the +fixture VCZ once as plink and once as BGEN, reads up to 100 MB from +each streaming file (`.bed` / `.bgen`) on the mount, and compares the +bytes against the same prefix produced by `vcztools.BedEncoder` / +`vcztools.BgenEncoder` run directly in-process. The cap keeps memory +bounded when the encoder's `total_size` exceeds 100 MB; smaller +streams are compared in full. + **fsx is read-only mode only.** Apple/LTP/xfstests fsx all assume a writable filesystem (they bootstrap the in-memory model by writing to the file under test). None of them run unmodified against a read-only diff --git a/fs_tests/harness/bulk_data_runner.py b/fs_tests/harness/bulk_data_runner.py new file mode 100644 index 0000000..38e0eec --- /dev/null +++ b/fs_tests/harness/bulk_data_runner.py @@ -0,0 +1,189 @@ +"""Bulk-data cross-validation: mount bytes vs. encoder-direct bytes. + +For both formats (plink, bgen), this runner: + +1. Builds the streaming encoder (``BedEncoder`` / ``BgenEncoder``) + directly against the fixture VCZ, and reads the first 100 MB as the + oracle bytes. +2. Mounts the same VCZ via ``biofuse mount-`` and reads the + first 100 MB of the streaming file from the mountpoint. +3. Asserts the byte streams are identical. + +If the encoder's ``total_size`` is below 100 MB, the read length is +clipped to ``total_size`` for both sides — only the leading +``min(total_size, 100 MB)`` of the oracle is materialised, keeping +memory bounded. +""" + +import logging +import pathlib +import time + +import vcztools + +from . import fixtures, tools +from . import mount as mount_mod + +logger = logging.getLogger(__name__) + +CAP_BYTES = 100 * 1024 * 1024 + + +def _build_encoder(format_name: str, vcz_path: pathlib.Path): + if format_name == "plink": + opts = vcztools.ViewPlinkOptions() + reader = opts.make_reader(str(vcz_path)) + return vcztools.BedEncoder(reader) + if format_name == "bgen": + opts = vcztools.ViewBgenOptions() + reader = opts.make_reader(str(vcz_path)) + return vcztools.BgenEncoder(reader) + raise ValueError(f"unknown format {format_name!r}") + + +def _read_mount_prefix(path: pathlib.Path, size: int) -> bytes: + """Read up to ``size`` bytes from ``path``, looping until EOF or + the cap is reached.""" + chunks: list[bytes] = [] + remaining = size + with open(path, "rb") as fh: + while remaining > 0: + chunk = fh.read(remaining) + if len(chunk) == 0: + break + chunks.append(chunk) + remaining -= len(chunk) + return b"".join(chunks) + + +def _check_one( + *, + format_name: str, + streaming_suffix: str, + vcz_path: pathlib.Path, + basename: str, + log_dir: pathlib.Path, +) -> tools.CheckResult: + started = time.monotonic() + log_lines: list[str] = [f"format={format_name} basename={basename}"] + + with _build_encoder(format_name, vcz_path) as encoder: + total_size = encoder.total_size + compare_size = min(total_size, CAP_BYTES) + log_lines.append( + f"encoder total_size={total_size} compare_size={compare_size} " + f"(cap={CAP_BYTES})" + ) + expected = encoder.read(0, compare_size) + + if len(expected) != compare_size: + log_lines.append( + f"encoder short read: got {len(expected)} expected {compare_size}" + ) + (log_dir / f"{format_name}.log").write_text("\n".join(log_lines) + "\n") + return tools.CheckResult( + name=f"bulk-data:{format_name}", + passed=False, + duration_s=time.monotonic() - started, + detail=f"encoder short read: {len(expected)}/{compare_size}", + ) + + mountpoint = log_dir / f"mnt-{format_name}" + with mount_mod.BiofuseMount( + str(vcz_path), + mountpoint, + format_name=format_name, + basename=basename, + log_path=log_dir / f"{format_name}-mount.log", + ) as mnt: + target = mnt / f"{basename}{streaming_suffix}" + if not target.exists(): + log_lines.append(f"target missing: {target}") + (log_dir / f"{format_name}.log").write_text("\n".join(log_lines) + "\n") + return tools.CheckResult( + name=f"bulk-data:{format_name}", + passed=False, + duration_s=time.monotonic() - started, + detail=f"target file missing: {target.name}", + ) + mount_size = target.stat().st_size + log_lines.append(f"mount file size={mount_size}") + if mount_size != total_size: + (log_dir / f"{format_name}.log").write_text("\n".join(log_lines) + "\n") + return tools.CheckResult( + name=f"bulk-data:{format_name}", + passed=False, + duration_s=time.monotonic() - started, + detail=f"size mismatch: encoder={total_size} mount={mount_size}", + ) + actual = _read_mount_prefix(target, compare_size) + + duration = time.monotonic() - started + + if len(actual) != compare_size: + log_lines.append(f"mount short read: {len(actual)}/{compare_size}") + (log_dir / f"{format_name}.log").write_text("\n".join(log_lines) + "\n") + return tools.CheckResult( + name=f"bulk-data:{format_name}", + passed=False, + duration_s=duration, + detail=f"mount short read: {len(actual)}/{compare_size}", + ) + + if actual != expected: + first_diff = next( + (i for i, (a, b) in enumerate(zip(actual, expected)) if a != b), + min(len(actual), len(expected)), + ) + log_lines.append(f"BYTE MISMATCH at offset 0x{first_diff:x}") + (log_dir / f"{format_name}.log").write_text("\n".join(log_lines) + "\n") + return tools.CheckResult( + name=f"bulk-data:{format_name}", + passed=False, + duration_s=duration, + detail=f"byte mismatch at offset 0x{first_diff:x}", + ) + + log_lines.append(f"OK: {compare_size} bytes match ({duration:.1f}s)") + (log_dir / f"{format_name}.log").write_text("\n".join(log_lines) + "\n") + logger.info( + "bulk-data %s: %d bytes match (%.1fs)", format_name, compare_size, duration + ) + return tools.CheckResult( + name=f"bulk-data:{format_name}", + passed=True, + duration_s=duration, + detail=f"compared {compare_size} bytes (encoder total_size={total_size})", + ) + + +def run(*, log_dir: pathlib.Path) -> tools.RunnerResult: + started = time.monotonic() + log_dir.mkdir(parents=True, exist_ok=True) + + spec = fixtures.MEDIUM + vcz_path = fixtures.get_or_build(spec) + + checks: list[tools.CheckResult] = [] + for format_name, streaming_suffix in (("plink", ".bed"), ("bgen", ".bgen")): + checks.append( + _check_one( + format_name=format_name, + streaming_suffix=streaming_suffix, + vcz_path=vcz_path, + basename=spec.name, + log_dir=log_dir, + ) + ) + + duration = time.monotonic() - started + return tools.RunnerResult( + runner="bulk-data", + passed=all(c.passed for c in checks), + duration_s=duration, + checks=checks, + summary=( + f"bulk-data cross-validation: encoder vs mount, " + f"cap={CAP_BYTES // (1024 * 1024)} MB" + ), + ) diff --git a/fs_tests/harness/cli.py b/fs_tests/harness/cli.py index 5268554..d29541e 100644 --- a/fs_tests/harness/cli.py +++ b/fs_tests/harness/cli.py @@ -9,6 +9,7 @@ import click from . import ( + bulk_data_runner, fio_runner, fsx_runner, lifecycle, @@ -227,6 +228,18 @@ def fsx_cmd(ctx: click.Context, ops: int, max_op_size: int) -> None: sys.exit(_emit(results_dir, [result])) +@main.command("bulk-data") +@click.pass_context +def bulk_data_cmd(ctx: click.Context) -> None: + """Cross-validate mount bytes against direct-encoder bytes (plink + bgen).""" + results_dir = ctx.obj["results_dir"] + log_dir = _runner_log_dir(results_dir, "bulk-data") + result = _run_with_banner( + "bulk-data", lambda: bulk_data_runner.run(log_dir=log_dir) + ) + sys.exit(_emit(results_dir, [result])) + + @main.command("lifecycle") @click.option("--iterations", type=int, default=50) @click.pass_context @@ -272,12 +285,16 @@ def all_cmd(ctx: click.Context) -> None: pjd_log_dir = _runner_log_dir(results_dir, "pjdfstest") fio_log_dir = _runner_log_dir(results_dir, "fio") fsx_log_dir = _runner_log_dir(results_dir, "fsx") + bulk_data_log_dir = _runner_log_dir(results_dir, "bulk-data") stress_log_dir = _runner_log_dir(results_dir, "stress-ng") lifecycle_log_dir = _runner_log_dir(results_dir, "lifecycle") liveness_log_dir = _runner_log_dir(results_dir, "active-under-stress") results: list[tools.RunnerResult] = [ - _run_with_banner("posix", lambda: posix.run(log_path=posix_log)) + _run_with_banner("posix", lambda: posix.run(log_path=posix_log)), + _run_with_banner( + "bulk-data", lambda: bulk_data_runner.run(log_dir=bulk_data_log_dir) + ), ] if not quick: results.append( diff --git a/fs_tests/harness/mount.py b/fs_tests/harness/mount.py index f01eb52..1652a85 100644 --- a/fs_tests/harness/mount.py +++ b/fs_tests/harness/mount.py @@ -47,11 +47,12 @@ def force_unmount(mountpoint: pathlib.Path) -> None: class BiofuseMount: - """Context manager that runs ``biofuse mount-plink`` in a subprocess. + """Context manager that runs ``biofuse mount-`` in a subprocess. Spawns the real CLI so the harness exercises the same code path a user hits. Each mount is its own subprocess, sidestepping pyfuse3's - one-mount-per-process limitation entirely. + one-mount-per-process limitation entirely. ``format_name`` selects + between ``mount-plink`` (default) and ``mount-bgen``. """ def __init__( @@ -59,6 +60,7 @@ def __init__( vcz_url: str, mountpoint: pathlib.Path, *, + format_name: str = "plink", basename: str | None = None, log_path: pathlib.Path | None = None, access_log_path: pathlib.Path | None = None, @@ -67,6 +69,7 @@ def __init__( ) -> None: self.vcz_url = vcz_url self.mountpoint = mountpoint + self.format_name = format_name self.basename = basename self.log_path = log_path self.access_log_path = access_log_path @@ -85,7 +88,7 @@ def _build_cmd(self) -> list[str]: ) cmd: list[str] = [ biofuse_bin, - "mount-plink", + f"mount-{self.format_name}", self.vcz_url, str(self.mountpoint), ] diff --git a/tests/test_bgen_apps.py b/tests/test_bgen_apps.py index c74660b..7990880 100644 --- a/tests/test_bgen_apps.py +++ b/tests/test_bgen_apps.py @@ -1,10 +1,9 @@ """End-to-end tests for BGEN mounts. -Mirrors :mod:`test_plink_apps`: mount a VCZ as a BGEN fileset via the -real encoder-server subprocess, then read the mounted files. Where -``plink2`` is on PATH, ``TestPlinkTwo`` also exercises the mount as a -BGEN reader and compares its output to a side-by-side copy of the -encoder's bytes. +Mirrors :mod:`test_plink_apps`: mount a VCZ as a BGEN fileset, then +read the mounted files. Where ``plink2`` is on PATH, ``TestPlinkTwo`` +also exercises the mount as a BGEN reader and compares its output to +a side-by-side copy of the encoder's bytes. """ import contextlib @@ -19,7 +18,7 @@ import trio import vcztools -from biofuse import access_log, encoder_client, encoder_ops, formats, fuse_adapter +from biofuse import access_log, encoder_host, encoder_ops, formats, fuse_adapter def _open_reader(path) -> object: @@ -66,12 +65,13 @@ async def fx_mounted_bgen(tmp_path, fx_medium_vcz): expected = _encoder_bytes(fx_medium_vcz.path) log = access_log.AccessLogger() - sock_path = tmp_path / "bgen.sock" - async with await encoder_client.EncoderClient.start( - str(fx_medium_vcz.path), sock_path, formats.BGEN_SPEC - ) as client: + async with await encoder_host.EncoderHost.start( + str(fx_medium_vcz.path), + formats.BGEN_SPEC, + opts=vcztools.ViewBgenOptions(), + ) as host: ops = encoder_ops.EncoderOps( - client, "medium", formats.BGEN_SPEC, access_logger=log + host, "medium", formats.BGEN_SPEC, access_logger=log ) async with fuse_adapter.mount(ops, str(mnt)): await _wait_for_mount(mnt) @@ -92,18 +92,21 @@ async def _arun(cmd) -> None: async def _mount_bgen(tmp_path, vcz, opts=None): """Mount ``vcz`` as a BGEN fileset; yield ``(mnt, basename)``. - ``opts`` is the ``vcztools.ViewBgenOptions`` dataclass the - encoder-server runs under; defaults to a fresh ``ViewBgenOptions()`` - (every field at its dataclass default). + ``opts`` is the ``vcztools.ViewBgenOptions`` dataclass the host + runs under; defaults to a fresh ``ViewBgenOptions()`` (every field + at its dataclass default). """ mnt = tmp_path / "mnt" mnt.mkdir() basename = vcz.path.stem - sock_path = tmp_path / "bgen.sock" - async with await encoder_client.EncoderClient.start( - str(vcz.path), sock_path, formats.BGEN_SPEC, opts=opts - ) as client: - ops = encoder_ops.EncoderOps(client, basename, formats.BGEN_SPEC) + if opts is None: + opts = vcztools.ViewBgenOptions() + async with await encoder_host.EncoderHost.start( + str(vcz.path), + formats.BGEN_SPEC, + opts=opts, + ) as host: + ops = encoder_ops.EncoderOps(host, basename, formats.BGEN_SPEC) async with fuse_adapter.mount(ops, str(mnt)): await _wait_for_mount(mnt) yield mnt, basename diff --git a/tests/test_encoder_client.py b/tests/test_encoder_client.py deleted file mode 100644 index 6e2274e..0000000 --- a/tests/test_encoder_client.py +++ /dev/null @@ -1,532 +0,0 @@ -"""Tests for EncoderClient and StreamConnection. - -Most tests pair the trio client with a thread-based ``serve_forever`` -running on a real ``AF_UNIX`` listener under tmp_path. Real-subprocess -tests use the parametrised :data:`fx_spec` to exercise both PLINK and -BGEN through the spawn handshake. -""" - -import dataclasses -import errno -import multiprocessing as mp -import pathlib -import random -import socket -import threading - -import pytest -import trio -import vcztools - -from biofuse import encoder_client, encoder_server, formats - - -@pytest.fixture(params=[formats.PLINK_SPEC, formats.BGEN_SPEC], ids=["plink", "bgen"]) -def fx_spec(request): - return request.param - - -@pytest.fixture -def fx_opts(fx_spec): - if fx_spec.name == "plink": - return vcztools.ViewPlinkOptions() - return vcztools.ViewBgenOptions() - - -@pytest.fixture -def fx_reader(fx_small_vcz): - return vcztools.ViewPlinkOptions().make_reader(str(fx_small_vcz.path)) - - -@pytest.fixture -def fx_expected(fx_reader, fx_spec, fx_opts): - """Static-body / stream-size reference for the active spec. - - Built directly from the spec to avoid the BGEN / SQLite ``.bgi`` - non-determinism caveats documented in :mod:`test_encoder_server`. - """ - static = fx_spec.build_static_files(fx_reader, fx_opts) - with fx_spec.encoder_factory(fx_reader, fx_opts) as encoder: - stream_size = int(encoder.total_size) - return fx_spec, static, stream_size - - -class _ThreadServer: - """Drop-in stand-in for the ``multiprocessing.Process`` worker. - - Runs ``encoder_server.serve_forever`` on a thread bound to a real - ``AF_UNIX`` listener at ``socket_path``. Exposes the small subset - of the ``mp.Process`` API that ``EncoderClient.aclose`` needs. - """ - - def __init__( - self, - reader, - spec: formats.FormatSpec, - opts, - socket_path: pathlib.Path, - ) -> None: - self.exitcode: int | None = None - self.session = encoder_server._ServerSession(reader, spec, opts) - self._listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self._listener.bind(str(socket_path)) - self._listener.listen(16) - self._parent_stop, self._child_stop = socket.socketpair( - socket.AF_UNIX, socket.SOCK_STREAM - ) - self._thread = threading.Thread( - target=encoder_server.serve_forever, - args=(self._listener, self._child_stop, self.session), - daemon=True, - ) - self._thread.start() - - def is_alive(self) -> bool: - return self._thread.is_alive() - - def join(self, timeout: float | None = None) -> None: - self._thread.join(timeout=timeout) - if not self._thread.is_alive(): - self.exitcode = 0 - - def terminate(self) -> None: - try: - self._parent_stop.close() - except OSError: - pass - - def kill(self) -> None: - self.terminate() - - @property - def parent_stop(self) -> socket.socket: - return self._parent_stop - - -async def _client_with_thread_server( - reader, spec: formats.FormatSpec, opts, socket_path: pathlib.Path -) -> encoder_client.EncoderClient: - server = _ThreadServer(reader, spec, opts, socket_path) - self = encoder_client.EncoderClient.__new__(encoder_client.EncoderClient) - self.spec = spec - self.opts = opts - self.static_files = {} - self.stream_size = 0 - self._proc = server - self._socket_path = socket_path - self._stop_sock = server.parent_stop - self._closed = False - try: - await self._handshake() - except BaseException: - await self.aclose() - raise - return self - - -@pytest.fixture -async def fx_client(fx_reader, fx_spec, fx_opts, tmp_path): - socket_path = tmp_path / "encoder.sock" - client = await _client_with_thread_server(fx_reader, fx_spec, fx_opts, socket_path) - try: - yield client - finally: - await client.aclose() - - -class TestHandshake: - async def test_metadata_populated(self, fx_client, fx_expected): - spec, expected_static, expected_stream_size = fx_expected - assert fx_client.spec is spec - assert fx_client.stream_size == expected_stream_size - assert set(fx_client.static_files) == set(expected_static) - for suffix in expected_static: - assert len(fx_client.static_files[suffix]) == len(expected_static[suffix]) - - -class TestStreamConnection: - async def test_full_stream_read_matches_stream_size(self, fx_client): - conn = await fx_client.open_stream() - try: - data = await conn.read(0, fx_client.stream_size * 2) - assert len(data) == fx_client.stream_size - finally: - await conn.aclose() - - async def test_chunked_stream_read(self, fx_client): - conn = await fx_client.open_stream() - try: - chunks = [] - offset = 0 - while True: - data = await conn.read(offset, 4096) - if len(data) == 0: - break - chunks.append(data) - offset += len(data) - assert sum(len(c) for c in chunks) == fx_client.stream_size - finally: - await conn.aclose() - - async def test_random_pread_matches_full_read(self, fx_client): - """A random-offset pread on a fresh connection reads the same - bytes as the same window of a full sequential read on another - connection. This pins the encoder's random-access contract - without comparing against an on-disk golden.""" - full_conn = await fx_client.open_stream() - try: - full = await full_conn.read(0, fx_client.stream_size) - finally: - await full_conn.aclose() - - rng = random.Random(7) - conn = await fx_client.open_stream() - try: - for _ in range(20): - offset = rng.randrange(fx_client.stream_size) - size = rng.randrange(1, 256) - got = await conn.read(offset, size) - assert got == full[offset : offset + size] - finally: - await conn.aclose() - - async def test_two_connections_independent(self, fx_client): - full_conn = await fx_client.open_stream() - try: - expected = await full_conn.read(0, fx_client.stream_size) - finally: - await full_conn.aclose() - conn_a = await fx_client.open_stream() - conn_b = await fx_client.open_stream() - try: - half = len(expected) // 2 - a = await conn_a.read(0, half) - b = await conn_b.read(half, half) - assert a == expected[:half] - assert b == expected[half : 2 * half] - # Cross-encoder backward seeks: each connection has its own - # encoder, so a backward read on one doesn't disturb the - # other. - c = await conn_b.read(0, half) - d = await conn_a.read(half, half) - assert c == expected[:half] - assert d == expected[half : 2 * half] - finally: - await conn_a.aclose() - await conn_b.aclose() - - async def test_concurrent_connections_run_in_parallel(self, fx_client): - """Open four stream connections and run all four reads - concurrently in a trio nursery. Each one must see a byte-identical - copy of the stream, regardless of interleaving on the server.""" - full_conn = await fx_client.open_stream() - try: - expected = await full_conn.read(0, fx_client.stream_size) - finally: - await full_conn.aclose() - n = 4 - results: dict[int, bytes] = {} - - async def runner(idx: int) -> None: - conn = await fx_client.open_stream() - try: - results[idx] = await conn.read(0, fx_client.stream_size * 2) - finally: - await conn.aclose() - - async with trio.open_nursery() as nursery: - for i in range(n): - nursery.start_soon(runner, i) - assert len(results) == n - for body in results.values(): - assert body == expected - - async def test_read_after_close_raises(self, fx_client): - conn = await fx_client.open_stream() - await conn.aclose() - with pytest.raises(OSError, match="stream connection is closed") as excinfo: - await conn.read(0, 1) - assert excinfo.value.errno == errno.EIO - - async def test_aclose_is_idempotent(self, fx_client): - conn = await fx_client.open_stream() - await conn.aclose() - await conn.aclose() - - -class TestClientClose: - async def test_aclose_is_idempotent(self, fx_reader, fx_spec, fx_opts, tmp_path): - socket_path = tmp_path / "encoder.sock" - client = await _client_with_thread_server( - fx_reader, fx_spec, fx_opts, socket_path - ) - await client.aclose() - await client.aclose() - - async def test_aclose_terminates_server_thread( - self, fx_reader, fx_spec, fx_opts, tmp_path - ): - socket_path = tmp_path / "encoder.sock" - client = await _client_with_thread_server( - fx_reader, fx_spec, fx_opts, socket_path - ) - proc = client._proc - await client.aclose() - assert not proc.is_alive() - - -class TestTimeouts: - """The FUSE handler must never block forever on the worker. These - tests pin that property by pointing the client at a deliberately - unresponsive server and asserting that ``read`` and ``aclose`` - surface ``OSError(EIO)`` within a deadline.""" - - @staticmethod - def _bind_stall_listener(sock_path): - """Bind+listen a UNIX socket that will accept exactly one - connection inside the test's nursery and then go silent.""" - listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - listener.bind(str(sock_path)) - listener.listen(1) - listener.setblocking(False) - return listener - - async def _stall_server(self, listener): - trio_listener = trio.socket.from_stdlib_socket(listener) - conn, _ = await trio_listener.accept() - # Hold the connection open with no reads or writes. The test - # nursery cancels us at teardown. - try: - await trio.sleep_forever() - finally: - conn.close() - - async def _make_stalled_connection( - self, sock_path, nursery - ) -> encoder_client.StreamConnection: - listener = self._bind_stall_listener(sock_path) - nursery.start_soon(self._stall_server, listener) - stream = await trio.open_unix_socket(str(sock_path)) - return encoder_client.StreamConnection(stream) - - async def test_read_times_out_to_eio(self, monkeypatch, tmp_path): - monkeypatch.setattr(encoder_client, "_REQUEST_TIMEOUT_S", 0.2) - sock_path = tmp_path / "stall.sock" - async with trio.open_nursery() as nursery: - conn = await self._make_stalled_connection(sock_path, nursery) - t0 = trio.current_time() - with pytest.raises(OSError, match="encoder-server") as excinfo: - await conn.read(0, 1024) - elapsed = trio.current_time() - t0 - assert excinfo.value.errno == errno.EIO - assert elapsed < 1.0, f"read should fail fast, took {elapsed:.2f}s" - nursery.cancel_scope.cancel() - - async def test_read_after_timeout_is_immediate(self, monkeypatch, tmp_path): - monkeypatch.setattr(encoder_client, "_REQUEST_TIMEOUT_S", 0.2) - sock_path = tmp_path / "stall.sock" - async with trio.open_nursery() as nursery: - conn = await self._make_stalled_connection(sock_path, nursery) - with pytest.raises(OSError, match="encoder-server"): - await conn.read(0, 1024) - t0 = trio.current_time() - with pytest.raises(OSError, match="stream connection is closed") as excinfo: - await conn.read(0, 1024) - elapsed = trio.current_time() - t0 - assert excinfo.value.errno == errno.EIO - assert elapsed < 0.05, ( - f"second read should be immediate, took {elapsed:.3f}s" - ) - nursery.cancel_scope.cancel() - - async def test_aclose_does_not_hang_on_unresponsive_peer( - self, monkeypatch, tmp_path - ): - monkeypatch.setattr(encoder_client, "_ACLOSE_TIMEOUT_S", 0.2) - sock_path = tmp_path / "stall.sock" - async with trio.open_nursery() as nursery: - conn = await self._make_stalled_connection(sock_path, nursery) - t0 = trio.current_time() - await conn.aclose() - elapsed = trio.current_time() - t0 - assert elapsed < 1.0, f"aclose should not hang, took {elapsed:.2f}s" - nursery.cancel_scope.cancel() - - -class TestRealSubprocess: - """End-to-end tests against a real ``multiprocessing.Process`` worker.""" - - @pytest.mark.parametrize( - "spec", [formats.PLINK_SPEC, formats.BGEN_SPEC], ids=["plink", "bgen"] - ) - async def test_start_fast_fails_on_handshake_build_error( - self, fx_multiallelic_vcz, tmp_path, spec - ): - """``EncoderClient.start()`` must surface a clean ``OSError`` and - return well under the 10 s connect deadline when the subprocess - rejects the input during the metadata handshake. Static-file - build (here: multi-allelic VCZ) runs on demand inside - ``_handle_connection``; failures become errno replies on the - handshake socket.""" - socket_path = tmp_path / "encoder.sock" - t0 = trio.current_time() - with pytest.raises(OSError, match="encoder-server reported errno") as excinfo: - await encoder_client.EncoderClient.start( - str(fx_multiallelic_vcz.path), socket_path, spec - ) - elapsed = trio.current_time() - t0 - assert excinfo.value.errno == errno.EIO - assert elapsed < encoder_client._CONNECT_DEADLINE_S, ( - f"start() should fast-fail on handshake error, took {elapsed:.2f}s" - ) - - async def test_max_alleles_filter_through_start( - self, fx_multiallelic_vcz, tmp_path - ): - """``ViewPlinkOptions`` with ``--max-alleles 2`` flows through the - multiprocessing spawn into the worker's ``opts.make_reader`` and - drops multi-allelic variants, so the handshake succeeds and the - metadata reflects the filtered variant count.""" - # Sanity: the fixture really is multi-allelic, otherwise the - # filter has nothing to drop and the test isn't testing anything. - assert fx_multiallelic_vcz.num_biallelic_sites < ( - fx_multiallelic_vcz.num_variants - ) - socket_path = tmp_path / "encoder.sock" - default_opts = vcztools.ViewPlinkOptions() - selection = dataclasses.replace(default_opts.selection, max_alleles=2) - opts = dataclasses.replace(default_opts, selection=selection) - client = await encoder_client.EncoderClient.start( - str(fx_multiallelic_vcz.path), - socket_path, - formats.PLINK_SPEC, - opts=opts, - ) - try: - bim_lines = client.static_files[".bim"].decode("utf-8").splitlines() - assert len(bim_lines) == fx_multiallelic_vcz.num_biallelic_sites - fam_lines = client.static_files[".fam"].decode("utf-8").splitlines() - assert len(fam_lines) == fx_multiallelic_vcz.num_samples - bytes_per_variant = (fx_multiallelic_vcz.num_samples + 3) // 4 - expected_bed_size = ( - 3 + fx_multiallelic_vcz.num_biallelic_sites * bytes_per_variant - ) - assert client.stream_size == expected_bed_size - conn = await client.open_stream() - try: - data = await conn.read(0, expected_bed_size) - assert len(data) == expected_bed_size - finally: - await conn.aclose() - finally: - await client.aclose() - assert isinstance(client._proc, mp.process.BaseProcess) - assert not client._proc.is_alive() - assert client._proc.exitcode == 0 - - @pytest.mark.parametrize("no_header_samples", [True, False]) - async def test_bgen_no_header_samples_stable_across_opens( - self, fx_small_vcz, tmp_path, no_header_samples - ): - """``--no-header-samples`` flows into every per-connection - ``BgenEncoder`` consistently. - - Each ``open_stream`` spawns a fresh server-side thread that - constructs a new encoder via ``spec.encoder_factory(reader, opts)``. - Three sequential open/read/close cycles must all return the same - bytes — and those bytes must match an in-process reference - ``BgenEncoder`` constructed with the matching - ``embed_header_samples`` flag. - """ - opts = dataclasses.replace( - vcztools.ViewBgenOptions(), no_header_samples=no_header_samples - ) - ref_reader = opts.make_reader(str(fx_small_vcz.path)) - with vcztools.BgenEncoder( - ref_reader, embed_header_samples=not no_header_samples - ) as ref: - expected_size = int(ref.total_size) - expected = ref.read(0, expected_size) - - socket_path = tmp_path / "encoder.sock" - async with await encoder_client.EncoderClient.start( - str(fx_small_vcz.path), socket_path, formats.BGEN_SPEC, opts=opts - ) as client: - assert client.stream_size == expected_size - for cycle in range(3): - conn = await client.open_stream() - try: - data = await conn.read(0, client.stream_size) - finally: - await conn.aclose() - assert data == expected, f"cycle {cycle} differed from reference" - - @pytest.mark.parametrize( - ("no_sample_file", "no_bgi"), - [(True, False), (False, True), (True, True), (False, False)], - ) - async def test_bgen_sidecar_toggles_stable_across_starts( - self, fx_small_vcz, tmp_path, no_sample_file, no_bgi - ): - """The handshake's ``static_files`` honours ``--no-sample-file`` - / ``--no-bgi`` and yields the same suffix set on each - ``EncoderClient.start`` against the same options.""" - opts = dataclasses.replace( - vcztools.ViewBgenOptions(), - no_sample_file=no_sample_file, - no_bgi=no_bgi, - ) - expected_suffixes = formats.BGEN_SPEC.static_suffixes(opts) - first: dict[str, bytes] | None = None - for cycle in range(3): - socket_path = tmp_path / f"encoder-{cycle}.sock" - async with await encoder_client.EncoderClient.start( - str(fx_small_vcz.path), socket_path, formats.BGEN_SPEC, opts=opts - ) as client: - assert tuple(client.static_files) == expected_suffixes - # ``.bgen.bgi`` SQLite payloads have non-deterministic - # header bytes across independent writes; compare sizes - # only for that entry and the full body for ``.sample``. - if first is None: - first = dict(client.static_files) - continue - for suffix, body in client.static_files.items(): - if suffix == ".bgen.bgi": - assert len(body) == len(first[suffix]) - else: - assert body == first[suffix] - - @pytest.mark.parametrize( - "spec", [formats.PLINK_SPEC, formats.BGEN_SPEC], ids=["plink", "bgen"] - ) - async def test_spawn_handshake_open_read_close(self, fx_small_vcz, tmp_path, spec): - # The expected stream_size and static-body sizes are computed - # via a fresh in-process reader so the test does not depend on - # external goldens (write_plink / write_bgen). See the - # encoder-server test module for the rationale. - opts = ( - vcztools.ViewPlinkOptions() - if spec.name == "plink" - else vcztools.ViewBgenOptions() - ) - ref_reader = opts.make_reader(str(fx_small_vcz.path)) - with spec.encoder_factory(ref_reader, opts) as enc: - expected_stream_size = int(enc.total_size) - socket_path = tmp_path / "encoder.sock" - client = await encoder_client.EncoderClient.start( - str(fx_small_vcz.path), socket_path, spec - ) - try: - assert client.stream_size == expected_stream_size - assert set(client.static_files) == set(spec.static_suffixes(opts)) - conn = await client.open_stream() - try: - data = await conn.read(0, client.stream_size) - assert len(data) == client.stream_size - finally: - await conn.aclose() - finally: - await client.aclose() - assert isinstance(client._proc, mp.process.BaseProcess) - assert not client._proc.is_alive() - assert client._proc.exitcode == 0 diff --git a/tests/test_encoder_host.py b/tests/test_encoder_host.py new file mode 100644 index 0000000..2e4f117 --- /dev/null +++ b/tests/test_encoder_host.py @@ -0,0 +1,477 @@ +"""Tests for the in-process encoder host. + +Two layers: + +- ``StreamHandle`` exercised directly against a hand-built fake encoder + so the timeout / drain / leak paths can be triggered deterministically. +- ``EncoderHost`` exercised against the real spec factories (PLINK / BGEN) + on the ``fx_small_vcz`` fixture, parity-checked against an in-process + reference encoder built with the same ``(reader, opts)``. + +End-to-end FUSE behaviour against real ``plink`` / ``bgenix`` binaries +lives in ``test_plink_apps.py`` / ``test_bgen_apps.py``. +""" + +import dataclasses +import errno +import threading + +import pytest +import trio +import vcztools + +from biofuse import encoder_host, formats + + +@pytest.fixture(params=[formats.PLINK_SPEC, formats.BGEN_SPEC], ids=["plink", "bgen"]) +def fx_spec(request): + return request.param + + +@pytest.fixture +def fx_opts(fx_spec): + if fx_spec.name == "plink": + return vcztools.ViewPlinkOptions() + return vcztools.ViewBgenOptions() + + +@pytest.fixture +def fx_reference(fx_small_vcz, fx_spec, fx_opts): + """In-process reference: ``(static_files, stream_size, full_bytes)``. + + Built directly off the spec from a fresh reader so tests don't need + on-disk goldens. ``.bgen.bgi`` SQLite payloads have non-deterministic + header bytes across independent writes; tests using this fixture + therefore compare sizes / suffix sets for that entry rather than + full bodies. + """ + reader = fx_opts.make_reader(str(fx_small_vcz.path)) + try: + reader.materialise_variant_filter() + static_files = fx_spec.build_static_files(reader, fx_opts) + with fx_spec.encoder_factory(reader, fx_opts) as enc: + stream_size = int(enc.total_size) + full_bytes = enc.read(0, stream_size) + finally: + reader.__exit__(None, None, None) + return static_files, stream_size, full_bytes + + +# ----------------------------------------------------------------------------- +# Fake encoder for unit-testing StreamHandle in isolation. +# ----------------------------------------------------------------------------- + + +class _FakeEncoder: + """Hand-built encoder for StreamHandle timeout/drain tests. + + The ``read`` method blocks on a per-call ``threading.Event`` so a + test can release the worker thread at will. ``close`` records the + invocation. + """ + + def __init__(self) -> None: + self.release_read = threading.Event() + self.entered_read = threading.Event() + self.read_calls: list[tuple[int, int]] = [] + self.close_calls = 0 + self._payload = b"X" + + def set_payload(self, body: bytes) -> None: + self._payload = body + + def read(self, off: int, size: int) -> bytes: + self.read_calls.append((off, size)) + self.entered_read.set() + self.release_read.wait() + return self._payload + + def close(self) -> None: + self.close_calls += 1 + + +# ----------------------------------------------------------------------------- +# StreamHandle direct unit tests. +# ----------------------------------------------------------------------------- + + +class TestStreamHandleHappyPath: + async def test_read_returns_encoder_bytes(self): + encoder = _FakeEncoder() + encoder.set_payload(b"hello") + encoder.release_read.set() + handle = encoder_host.StreamHandle(encoder) + try: + got = await handle.read(0, 5) + assert got == b"hello" + assert encoder.read_calls == [(0, 5)] + finally: + await handle.aclose() + assert encoder.close_calls == 1 + + async def test_aclose_is_idempotent(self): + encoder = _FakeEncoder() + encoder.release_read.set() + handle = encoder_host.StreamHandle(encoder) + await handle.aclose() + await handle.aclose() + assert encoder.close_calls == 1 + + async def test_on_aclose_hook_fires_once(self): + encoder = _FakeEncoder() + encoder.release_read.set() + hook_calls: list[tuple[float, float]] = [] + handle = encoder_host.StreamHandle( + encoder, on_aclose=lambda t0, t1: hook_calls.append((t0, t1)) + ) + await handle.aclose() + await handle.aclose() # idempotent: hook only fires once. + assert len(hook_calls) == 1 + t0, t1 = hook_calls[0] + assert t1 >= t0 + + async def test_read_after_close_raises_eio(self): + encoder = _FakeEncoder() + encoder.release_read.set() + handle = encoder_host.StreamHandle(encoder) + await handle.aclose() + with pytest.raises(OSError, match="stream handle is closed") as excinfo: + await handle.read(0, 1) + assert excinfo.value.errno == errno.EIO + + +class TestStreamHandleSerialisation: + async def test_concurrent_reads_serialise_on_one_handle(self): + """Two concurrent ``handle.read`` calls must not enter the + encoder at the same time — ``FormatEncoder`` mutates iterator + state in place.""" + encoder = _FakeEncoder() + encoder.set_payload(b"A") + handle = encoder_host.StreamHandle(encoder) + in_flight = 0 + peak = 0 + lock = threading.Lock() + + original_read = encoder.read + + def watched_read(off: int, size: int) -> bytes: + nonlocal in_flight, peak + with lock: + in_flight += 1 + peak = max(peak, in_flight) + try: + return original_read(off, size) + finally: + with lock: + in_flight -= 1 + + encoder.read = watched_read + + try: + async with trio.open_nursery() as nursery: + + async def runner(): + # Release per-call so each read can complete. + encoder.release_read.set() + await handle.read(0, 1) + encoder.release_read.clear() + + # Sequentially start tasks that will serialise on the lock. + for _ in range(3): + encoder.release_read.set() + nursery.start_soon(handle.read, 0, 1) + await trio.sleep(0) + finally: + encoder.release_read.set() + await handle.aclose() + + assert peak == 1 + + +class TestStreamHandleTimeout: + async def test_read_times_out_to_eio(self, monkeypatch): + monkeypatch.setattr(encoder_host, "_REQUEST_TIMEOUT_S", 0.1) + encoder = _FakeEncoder() # release_read intentionally not set + handle = encoder_host.StreamHandle(encoder) + try: + t0 = trio.current_time() + with pytest.raises(OSError, match="encoder read timed out") as excinfo: + await handle.read(0, 1024) + elapsed = trio.current_time() - t0 + assert excinfo.value.errno == errno.EIO + assert elapsed < 1.0, f"read should fail fast, took {elapsed:.2f}s" + finally: + # Release the abandoned worker so aclose can drain. + encoder.release_read.set() + await handle.aclose() + + async def test_read_after_timeout_is_immediate(self, monkeypatch): + monkeypatch.setattr(encoder_host, "_REQUEST_TIMEOUT_S", 0.1) + encoder = _FakeEncoder() + handle = encoder_host.StreamHandle(encoder) + try: + with pytest.raises(OSError, match="encoder read timed out"): + await handle.read(0, 1024) + t0 = trio.current_time() + with pytest.raises(OSError, match="stream handle is closed") as excinfo: + await handle.read(0, 1024) + elapsed = trio.current_time() - t0 + assert excinfo.value.errno == errno.EIO + assert elapsed < 0.05, ( + f"second read should be immediate, took {elapsed:.3f}s" + ) + finally: + encoder.release_read.set() + await handle.aclose() + + async def test_aclose_drains_abandoned_thread_then_closes(self, monkeypatch): + """After a timeout the worker thread is still running. ``aclose`` + must wait for it to return before calling ``encoder.close`` — + the encoder's own thread pool cannot tolerate concurrent + ``read``/``close``.""" + monkeypatch.setattr(encoder_host, "_REQUEST_TIMEOUT_S", 0.1) + encoder = _FakeEncoder() + handle = encoder_host.StreamHandle(encoder) + with pytest.raises(OSError, match="encoder read timed out"): + await handle.read(0, 1024) + assert encoder.close_calls == 0 # not yet drained + + # Release the abandoned worker; aclose should then close the encoder. + encoder.release_read.set() + await handle.aclose() + assert encoder.close_calls == 1 + + +class TestStreamHandleAcloseLeak: + async def test_aclose_does_not_hang_on_wedged_encoder(self, monkeypatch): + """If the abandoned worker thread never returns, ``aclose`` must + log + leak rather than hang the unmount path. We pin the + elapsed-time bound here; the warning text is best-effort logging.""" + monkeypatch.setattr(encoder_host, "_REQUEST_TIMEOUT_S", 0.05) + monkeypatch.setattr(encoder_host, "_ACLOSE_TIMEOUT_S", 0.1) + encoder = _FakeEncoder() + handle = encoder_host.StreamHandle(encoder) + try: + with pytest.raises(OSError, match="encoder read timed out"): + await handle.read(0, 1024) + t0 = trio.current_time() + await handle.aclose() + elapsed = trio.current_time() - t0 + assert elapsed < 1.0, f"aclose should not hang, took {elapsed:.2f}s" + # encoder.close was not called because we never drained. + assert encoder.close_calls == 0 + finally: + encoder.release_read.set() + + +# ----------------------------------------------------------------------------- +# EncoderHost end-to-end tests against real spec factories. +# ----------------------------------------------------------------------------- + + +class TestEncoderHostStart: + async def test_static_files_keys_match_spec( + self, fx_small_vcz, fx_spec, fx_opts, fx_reference + ): + ref_static, ref_stream_size, _ = fx_reference + async with await encoder_host.EncoderHost.start( + str(fx_small_vcz.path), fx_spec, opts=fx_opts + ) as host: + assert tuple(host.static_files) == fx_spec.static_suffixes(fx_opts) + assert host.stream_size == ref_stream_size + for suffix in ref_static: + # ``.bgen.bgi`` SQLite headers vary across writes; size match + # is the strongest portable assertion. + if suffix == ".bgen.bgi": + assert len(host.static_files[suffix]) == len(ref_static[suffix]) + else: + assert host.static_files[suffix] == ref_static[suffix] + + async def test_start_failure_on_multiallelic_plink( + self, fx_multiallelic_vcz, tmp_path + ): + """PLINK static-file build refuses multi-allelic VCZ. The + failure must propagate cleanly from ``start`` rather than + leaving a half-initialised host behind.""" + with pytest.raises((ValueError, OSError)): + await encoder_host.EncoderHost.start( + str(fx_multiallelic_vcz.path), + formats.PLINK_SPEC, + opts=vcztools.ViewPlinkOptions(), + ) + + async def test_max_alleles_filter_drops_multiallelic_sites( + self, fx_multiallelic_vcz + ): + """``ViewPlinkOptions(max_alleles=2)`` must reach ``opts.make_reader`` + and drop multi-allelic variants so the static-file build succeeds.""" + # Sanity: the fixture really is multi-allelic. + assert fx_multiallelic_vcz.num_biallelic_sites < ( + fx_multiallelic_vcz.num_variants + ) + default_opts = vcztools.ViewPlinkOptions() + selection = dataclasses.replace(default_opts.selection, max_alleles=2) + opts = dataclasses.replace(default_opts, selection=selection) + async with await encoder_host.EncoderHost.start( + str(fx_multiallelic_vcz.path), formats.PLINK_SPEC, opts=opts + ) as host: + bim_lines = host.static_files[".bim"].decode("utf-8").splitlines() + assert len(bim_lines) == fx_multiallelic_vcz.num_biallelic_sites + fam_lines = host.static_files[".fam"].decode("utf-8").splitlines() + assert len(fam_lines) == fx_multiallelic_vcz.num_samples + + +class TestEncoderHostStreaming: + async def test_full_stream_read_matches_reference( + self, fx_small_vcz, fx_spec, fx_opts, fx_reference + ): + _, ref_stream_size, ref_bytes = fx_reference + async with await encoder_host.EncoderHost.start( + str(fx_small_vcz.path), fx_spec, opts=fx_opts + ) as host: + handle = await host.open_stream() + try: + got = await handle.read(0, ref_stream_size * 2) + assert got == ref_bytes + finally: + await handle.aclose() + + async def test_chunked_stream_read(self, fx_small_vcz, fx_spec, fx_opts): + async with await encoder_host.EncoderHost.start( + str(fx_small_vcz.path), fx_spec, opts=fx_opts + ) as host: + handle = await host.open_stream() + try: + chunks = [] + offset = 0 + while True: + data = await handle.read(offset, 4096) + if len(data) == 0: + break + chunks.append(data) + offset += len(data) + assert sum(len(c) for c in chunks) == host.stream_size + finally: + await handle.aclose() + + async def test_two_handles_independent( + self, fx_small_vcz, fx_spec, fx_opts, fx_reference + ): + _, _, ref_bytes = fx_reference + async with await encoder_host.EncoderHost.start( + str(fx_small_vcz.path), fx_spec, opts=fx_opts + ) as host: + handle_a = await host.open_stream() + handle_b = await host.open_stream() + try: + half = len(ref_bytes) // 2 + a = await handle_a.read(0, half) + b = await handle_b.read(half, half) + assert a == ref_bytes[:half] + assert b == ref_bytes[half : 2 * half] + # Each handle has its own encoder; a backward read on one + # does not disturb the other. + c = await handle_b.read(0, half) + d = await handle_a.read(half, half) + assert c == ref_bytes[:half] + assert d == ref_bytes[half : 2 * half] + finally: + await handle_a.aclose() + await handle_b.aclose() + + async def test_concurrent_handles_run_in_parallel( + self, fx_small_vcz, fx_spec, fx_opts, fx_reference + ): + _, _, ref_bytes = fx_reference + results: dict[int, bytes] = {} + + async with await encoder_host.EncoderHost.start( + str(fx_small_vcz.path), fx_spec, opts=fx_opts + ) as host: + + async def runner(idx: int) -> None: + handle = await host.open_stream() + try: + results[idx] = await handle.read(0, host.stream_size * 2) + finally: + await handle.aclose() + + async with trio.open_nursery() as nursery: + for i in range(4): + nursery.start_soon(runner, i) + + assert len(results) == 4 + for body in results.values(): + assert body == ref_bytes + + +class TestEncoderHostClose: + async def test_aclose_is_idempotent(self, fx_small_vcz, fx_spec, fx_opts): + host = await encoder_host.EncoderHost.start( + str(fx_small_vcz.path), fx_spec, opts=fx_opts + ) + await host.aclose() + await host.aclose() + + async def test_open_stream_after_close_raises(self, fx_small_vcz, fx_spec, fx_opts): + host = await encoder_host.EncoderHost.start( + str(fx_small_vcz.path), fx_spec, opts=fx_opts + ) + await host.aclose() + with pytest.raises(OSError, match="encoder host is closed") as excinfo: + await host.open_stream() + assert excinfo.value.errno == errno.EIO + + +class TestEncoderHostBgenOptions: + @pytest.mark.parametrize("no_header_samples", [True, False]) + async def test_no_header_samples_stable_across_opens( + self, fx_small_vcz, no_header_samples + ): + """``--no-header-samples`` flows into every per-handle + ``BgenEncoder``. Three sequential open/read/close cycles must + all return the same bytes — and those bytes must match an + in-process reference ``BgenEncoder`` with the matching + ``embed_header_samples`` flag.""" + opts = dataclasses.replace( + vcztools.ViewBgenOptions(), no_header_samples=no_header_samples + ) + ref_reader = opts.make_reader(str(fx_small_vcz.path)) + try: + with vcztools.BgenEncoder( + ref_reader, embed_header_samples=not no_header_samples + ) as ref: + expected_size = int(ref.total_size) + expected = ref.read(0, expected_size) + finally: + ref_reader.__exit__(None, None, None) + + async with await encoder_host.EncoderHost.start( + str(fx_small_vcz.path), formats.BGEN_SPEC, opts=opts + ) as host: + assert host.stream_size == expected_size + for cycle in range(3): + handle = await host.open_stream() + try: + data = await handle.read(0, host.stream_size) + finally: + await handle.aclose() + assert data == expected, f"cycle {cycle} differed from reference" + + @pytest.mark.parametrize( + ("no_sample_file", "no_bgi"), + [(True, False), (False, True), (True, True), (False, False)], + ) + async def test_sidecar_toggles_honour_spec( + self, fx_small_vcz, no_sample_file, no_bgi + ): + opts = dataclasses.replace( + vcztools.ViewBgenOptions(), + no_sample_file=no_sample_file, + no_bgi=no_bgi, + ) + expected_suffixes = formats.BGEN_SPEC.static_suffixes(opts) + async with await encoder_host.EncoderHost.start( + str(fx_small_vcz.path), formats.BGEN_SPEC, opts=opts + ) as host: + assert tuple(host.static_files) == expected_suffixes diff --git a/tests/test_encoder_ops.py b/tests/test_encoder_ops.py index fad3eca..5af45f5 100644 --- a/tests/test_encoder_ops.py +++ b/tests/test_encoder_ops.py @@ -1,11 +1,10 @@ """Unit tests for EncoderOps. Exercises the streaming Operations class via direct async-method calls -— no kernel mount, no subprocess. The vcztools/Zarr-side parity tests -live in ``test_encoder_server.py`` (against the server module directly) -and in ``test_encoder_client.py`` (against a real subprocess). End-to-end -FUSE behaviour against real plink / bgenix binaries lives in -``test_plink_apps.py`` / ``test_bgen_apps.py``. +— no kernel mount, no real encoder. The vcztools/Zarr-side parity +tests live in ``test_encoder_host.py``. End-to-end FUSE behaviour +against real plink / bgenix binaries lives in ``test_plink_apps.py`` +/ ``test_bgen_apps.py``. Tests parametrise over both :data:`biofuse.formats.PLINK_SPEC` and :data:`biofuse.formats.BGEN_SPEC` so the spec-driven inode table and @@ -32,9 +31,9 @@ def _default_opts(spec): return vcztools.ViewBgenOptions() -class _FakeStreamConnection: +class _FakeStreamHandle: """In-process stand-in for - :class:`biofuse.encoder_client.StreamConnection`. + :class:`biofuse.encoder_host.StreamHandle`. Records the call sequence so tests can assert EncoderOps dispatched correctly. Reads return deterministic bytes derived from @@ -74,11 +73,11 @@ async def aclose(self) -> None: self._on_aclose(t0, time.monotonic()) -class _FakeClient: - """In-process stand-in for :class:`biofuse.encoder_client.EncoderClient`. +class _FakeHost: + """In-process stand-in for :class:`biofuse.encoder_host.EncoderHost`. Holds canned ``static_files`` / ``stream_size`` and hands out - :class:`_FakeStreamConnection` instances on demand. + :class:`_FakeStreamHandle` instances on demand. """ def __init__( @@ -98,20 +97,18 @@ def __init__( self.calls: list[tuple] = [] self._next_open_error: OSError | None = None self._next_conn_id = 1 - self.connections: list[_FakeStreamConnection] = [] + self.connections: list[_FakeStreamHandle] = [] def raise_on_next_open(self, exc: OSError) -> None: self._next_open_error = exc - async def open_stream(self, *, on_aclose=None) -> _FakeStreamConnection: + async def open_stream(self, *, on_aclose=None) -> _FakeStreamHandle: self.calls.append(("open_stream",)) if self._next_open_error is not None: exc = self._next_open_error self._next_open_error = None raise exc - conn = _FakeStreamConnection( - self._next_conn_id, self.calls, on_aclose=on_aclose - ) + conn = _FakeStreamHandle(self._next_conn_id, self.calls, on_aclose=on_aclose) self._next_conn_id += 1 self.connections.append(conn) return conn @@ -128,13 +125,13 @@ def fx_static_suffixes(fx_spec): @pytest.fixture -def fx_client(fx_spec): - return _FakeClient(fx_spec) +def fx_host(fx_spec): + return _FakeHost(fx_spec) @pytest.fixture -def fx_ops(fx_client, fx_spec): - return encoder_ops.EncoderOps(fx_client, "small", fx_spec) +def fx_ops(fx_host, fx_spec): + return encoder_ops.EncoderOps(fx_host, "small", fx_spec) @pytest.fixture @@ -148,8 +145,8 @@ def fx_first_static_name(fx_static_suffixes): @pytest.fixture -def fx_first_static_bytes(fx_client, fx_static_suffixes): - return fx_client.static_files[fx_static_suffixes[0]] +def fx_first_static_bytes(fx_host, fx_static_suffixes): + return fx_host.static_files[fx_static_suffixes[0]] async def _expect_fuse_error(coro, expected_errno): @@ -164,21 +161,21 @@ def test_creates_one_inode_per_manifest_entry(self, fx_ops, fx_static_suffixes): assert len(fx_ops._name_to_inode) == n_expected def test_basename_propagates_to_all_files( - self, fx_client, fx_spec, fx_static_suffixes + self, fx_host, fx_spec, fx_static_suffixes ): - ops = encoder_ops.EncoderOps(fx_client, "alt", fx_spec) + ops = encoder_ops.EncoderOps(fx_host, "alt", fx_spec) expected = sorted( [f"alt{fx_spec.streaming_suffix}"] + [f"alt{suffix}" for suffix in fx_static_suffixes] ) assert sorted(ops._name_to_inode) == expected - def test_sizes_match_client_metadata(self, fx_client, fx_spec, fx_static_suffixes): - ops = encoder_ops.EncoderOps(fx_client, "small", fx_spec) + def test_sizes_match_client_metadata(self, fx_host, fx_spec, fx_static_suffixes): + ops = encoder_ops.EncoderOps(fx_host, "small", fx_spec) sizes = {ops._inode_to_name[i]: size for i, size in ops._inode_to_size.items()} - expected = {f"small{fx_spec.streaming_suffix}": fx_client.stream_size} + expected = {f"small{fx_spec.streaming_suffix}": fx_host.stream_size} for suffix in fx_static_suffixes: - expected[f"small{suffix}"] = len(fx_client.static_files[suffix]) + expected[f"small{suffix}"] = len(fx_host.static_files[suffix]) assert sizes == expected def test_inodes_assigned_in_sorted_order(self, fx_ops): @@ -193,20 +190,20 @@ async def test_root(self, fx_ops): attrs = await fx_ops.getattr(pyfuse3.ROOT_INODE) assert stat.S_ISDIR(attrs.st_mode) - async def test_streaming_file(self, fx_ops, fx_client, fx_stream_name): + async def test_streaming_file(self, fx_ops, fx_host, fx_stream_name): inode = fx_ops._name_to_inode[fx_stream_name] attrs = await fx_ops.getattr(inode) assert stat.S_ISREG(attrs.st_mode) - assert attrs.st_size == fx_client.stream_size + assert attrs.st_size == fx_host.stream_size async def test_unknown_inode(self, fx_ops): await _expect_fuse_error(fx_ops.getattr(9999), errno.ENOENT) class TestLookup: - async def test_known_name(self, fx_ops, fx_client, fx_stream_name): + async def test_known_name(self, fx_ops, fx_host, fx_stream_name): attrs = await fx_ops.lookup(pyfuse3.ROOT_INODE, fx_stream_name.encode("utf-8")) - assert attrs.st_size == fx_client.stream_size + assert attrs.st_size == fx_host.stream_size async def test_unknown_name(self, fx_ops): await _expect_fuse_error( @@ -273,66 +270,66 @@ async def test_open_unknown_inode(self, fx_ops): class TestOpenDispatch: async def test_stream_open_creates_connection( - self, fx_ops, fx_client, fx_spec, fx_stream_name + self, fx_ops, fx_host, fx_spec, fx_stream_name ): inode = fx_ops._name_to_inode[fx_stream_name] info = await fx_ops.open(inode, os.O_RDONLY) try: - assert fx_client.calls == [("open_stream",)] + assert fx_host.calls == [("open_stream",)] assert fx_ops._fh_to_kind[info.fh] == fx_spec.streaming_kind assert info.fh in fx_ops._fh_to_conn finally: await fx_ops.release(info.fh) async def test_static_open_does_not_call_client( - self, fx_ops, fx_client, fx_first_static_name + self, fx_ops, fx_host, fx_first_static_name ): inode = fx_ops._name_to_inode[fx_first_static_name] info = await fx_ops.open(inode, os.O_RDONLY) try: - assert fx_client.calls == [] + assert fx_host.calls == [] assert fx_ops._fh_to_kind[info.fh] == "static" assert info.fh not in fx_ops._fh_to_conn finally: await fx_ops.release(info.fh) async def test_each_stream_open_gets_distinct_fh_and_connection( - self, fx_ops, fx_client, fx_stream_name + self, fx_ops, fx_host, fx_stream_name ): inode = fx_ops._name_to_inode[fx_stream_name] info1 = await fx_ops.open(inode, os.O_RDONLY) info2 = await fx_ops.open(inode, os.O_RDONLY) try: assert info1.fh != info2.fh - assert len(fx_client.connections) == 2 - assert fx_client.connections[0] is not fx_client.connections[1] + assert len(fx_host.connections) == 2 + assert fx_host.connections[0] is not fx_host.connections[1] finally: await fx_ops.release(info1.fh) await fx_ops.release(info2.fh) async def test_stream_open_propagates_oserror_as_fuseerror( - self, fx_ops, fx_client, fx_stream_name + self, fx_ops, fx_host, fx_stream_name ): - fx_client.raise_on_next_open(OSError(errno.EACCES, "denied")) + fx_host.raise_on_next_open(OSError(errno.EACCES, "denied")) inode = fx_ops._name_to_inode[fx_stream_name] await _expect_fuse_error(fx_ops.open(inode, os.O_RDONLY), errno.EACCES) class TestRead: async def test_stream_read_dispatches_to_connection( - self, fx_ops, fx_client, fx_stream_name + self, fx_ops, fx_host, fx_stream_name ): inode = fx_ops._name_to_inode[fx_stream_name] info = await fx_ops.open(inode, os.O_RDONLY) try: data = await fx_ops.read(info.fh, 16, 8) - assert ("read", 1, 16, 8) in fx_client.calls + assert ("read", 1, 16, 8) in fx_host.calls assert data == bytes(((16 + i) & 0xFF) for i in range(8)) finally: await fx_ops.release(info.fh) async def test_static_read_serves_from_cached_bytes( - self, fx_ops, fx_client, fx_first_static_name, fx_first_static_bytes + self, fx_ops, fx_host, fx_first_static_name, fx_first_static_bytes ): inode = fx_ops._name_to_inode[fx_first_static_name] info = await fx_ops.open(inode, os.O_RDONLY) @@ -340,7 +337,7 @@ async def test_static_read_serves_from_cached_bytes( data = await fx_ops.read(info.fh, 0, len(fx_first_static_bytes)) assert data == fx_first_static_bytes # No client traffic for static reads. - assert all(c[0] != "read" for c in fx_client.calls) + assert all(c[0] != "read" for c in fx_host.calls) finally: await fx_ops.release(info.fh) @@ -371,12 +368,12 @@ async def test_read_unknown_fh_returns_ebadf(self, fx_ops): await _expect_fuse_error(fx_ops.read(9999, 0, 10), errno.EBADF) async def test_stream_read_propagates_oserror_as_fuseerror( - self, fx_ops, fx_client, fx_stream_name + self, fx_ops, fx_host, fx_stream_name ): inode = fx_ops._name_to_inode[fx_stream_name] info = await fx_ops.open(inode, os.O_RDONLY) try: - fx_client.connections[0].raise_on_next_read(OSError(errno.EIO, "boom")) + fx_host.connections[0].raise_on_next_read(OSError(errno.EIO, "boom")) await _expect_fuse_error(fx_ops.read(info.fh, 0, 10), errno.EIO) finally: await fx_ops.release(info.fh) @@ -384,21 +381,21 @@ async def test_stream_read_propagates_oserror_as_fuseerror( class TestRelease: async def test_stream_release_closes_connection( - self, fx_ops, fx_client, fx_stream_name + self, fx_ops, fx_host, fx_stream_name ): inode = fx_ops._name_to_inode[fx_stream_name] info = await fx_ops.open(inode, os.O_RDONLY) await fx_ops.release(info.fh) - assert ("aclose", 1) in fx_client.calls + assert ("aclose", 1) in fx_host.calls assert info.fh not in fx_ops._fh_to_conn async def test_static_release_does_not_call_client( - self, fx_ops, fx_client, fx_first_static_name + self, fx_ops, fx_host, fx_first_static_name ): inode = fx_ops._name_to_inode[fx_first_static_name] info = await fx_ops.open(inode, os.O_RDONLY) await fx_ops.release(info.fh) - assert all(c[0] != "aclose" for c in fx_client.calls) + assert all(c[0] != "aclose" for c in fx_host.calls) async def test_release_unknown_fh_silent(self, fx_ops): await fx_ops.release(9999) @@ -411,9 +408,9 @@ async def test_release_is_idempotent(self, fx_ops, fx_stream_name): class TestAccessLogger: - async def test_records_per_read(self, fx_client, fx_spec, fx_static_suffixes): + async def test_records_per_read(self, fx_host, fx_spec, fx_static_suffixes): log = access_log.AccessLogger() - ops = encoder_ops.EncoderOps(fx_client, "small", fx_spec, access_logger=log) + ops = encoder_ops.EncoderOps(fx_host, "small", fx_spec, access_logger=log) stream_inode = ops._name_to_inode[f"small{fx_spec.streaming_suffix}"] static_inode = ops._name_to_inode[f"small{fx_static_suffixes[0]}"] stream_info = await ops.open(stream_inode, os.O_RDONLY) @@ -442,8 +439,8 @@ class TestCapacityLimiter: streaming-file opens without blocking static reads. """ - async def test_stream_blocks_at_cap(self, fx_client, fx_spec): - ops = encoder_ops.EncoderOps(fx_client, "small", fx_spec, max_open_stream=2) + async def test_stream_blocks_at_cap(self, fx_host, fx_spec): + ops = encoder_ops.EncoderOps(fx_host, "small", fx_spec, max_open_stream=2) stream_inode = ops._name_to_inode[f"small{fx_spec.streaming_suffix}"] info1 = await ops.open(stream_inode, os.O_RDONLY) info2 = await ops.open(stream_inode, os.O_RDONLY) @@ -465,9 +462,9 @@ async def third_open(): await ops.release(third_info[0].fh) async def test_static_does_not_block_when_stream_at_cap( - self, fx_client, fx_spec, fx_static_suffixes + self, fx_host, fx_spec, fx_static_suffixes ): - ops = encoder_ops.EncoderOps(fx_client, "small", fx_spec, max_open_stream=1) + ops = encoder_ops.EncoderOps(fx_host, "small", fx_spec, max_open_stream=1) stream_inode = ops._name_to_inode[f"small{fx_spec.streaming_suffix}"] static_inode = ops._name_to_inode[f"small{fx_static_suffixes[0]}"] stream_info = await ops.open(stream_inode, os.O_RDONLY) @@ -475,16 +472,16 @@ async def test_static_does_not_block_when_stream_at_cap( await ops.release(static_info.fh) await ops.release(stream_info.fh) - async def test_failed_open_releases_slot(self, fx_client, fx_spec): - ops = encoder_ops.EncoderOps(fx_client, "small", fx_spec, max_open_stream=1) + async def test_failed_open_releases_slot(self, fx_host, fx_spec): + ops = encoder_ops.EncoderOps(fx_host, "small", fx_spec, max_open_stream=1) stream_inode = ops._name_to_inode[f"small{fx_spec.streaming_suffix}"] - fx_client.raise_on_next_open(OSError(errno.EACCES, "denied")) + fx_host.raise_on_next_open(OSError(errno.EACCES, "denied")) await _expect_fuse_error(ops.open(stream_inode, os.O_RDONLY), errno.EACCES) info = await ops.open(stream_inode, os.O_RDONLY) await ops.release(info.fh) async def test_open_returns_eagain_when_limiter_starved( - self, fx_client, fx_spec, monkeypatch + self, fx_host, fx_spec, monkeypatch ): """A leaked limiter slot must not pin FUSE_OPEN forever. @@ -492,7 +489,7 @@ async def test_open_returns_eagain_when_limiter_starved( released, a competing open must surface ``EAGAIN`` once the per-mount limiter deadline expires.""" monkeypatch.setattr(encoder_ops, "_LIMITER_TIMEOUT_S", 0.2) - ops = encoder_ops.EncoderOps(fx_client, "small", fx_spec, max_open_stream=1) + ops = encoder_ops.EncoderOps(fx_host, "small", fx_spec, max_open_stream=1) stream_inode = ops._name_to_inode[f"small{fx_spec.streaming_suffix}"] held = await ops.open(stream_inode, os.O_RDONLY) try: @@ -501,7 +498,7 @@ async def test_open_returns_eagain_when_limiter_starved( await ops.release(held.fh) async def test_limiter_timeout_records_access_event( - self, fx_client, fx_spec, monkeypatch + self, fx_host, fx_spec, monkeypatch ): """A timed-out FUSE_OPEN must leave a ``limiter_timeout`` event in the access log so post-hoc analysis can attribute an @@ -510,7 +507,7 @@ async def test_limiter_timeout_records_access_event( monkeypatch.setattr(encoder_ops, "_LIMITER_TIMEOUT_S", timeout_s) log = access_log.AccessLogger() ops = encoder_ops.EncoderOps( - fx_client, "small", fx_spec, max_open_stream=1, access_logger=log + fx_host, "small", fx_spec, max_open_stream=1, access_logger=log ) stream_inode = ops._name_to_inode[f"small{fx_spec.streaming_suffix}"] held = await ops.open(stream_inode, os.O_RDONLY) @@ -532,9 +529,9 @@ class TestLifecycleEvents: are emitted on the access logger so we can localise where time is spent in the lifecycle without changing the read trace.""" - async def test_stream_emits_full_lifecycle(self, fx_client, fx_spec): + async def test_stream_emits_full_lifecycle(self, fx_host, fx_spec): log = access_log.AccessLogger() - ops = encoder_ops.EncoderOps(fx_client, "small", fx_spec, access_logger=log) + ops = encoder_ops.EncoderOps(fx_host, "small", fx_spec, access_logger=log) stream_inode = ops._name_to_inode[f"small{fx_spec.streaming_suffix}"] info = await ops.open(stream_inode, os.O_RDONLY) await ops.read(info.fh, 0, 8) @@ -552,10 +549,10 @@ async def test_stream_emits_full_lifecycle(self, fx_client, fx_spec): assert acl.fh == info.fh async def test_static_emits_open_and_release_only( - self, fx_client, fx_spec, fx_static_suffixes + self, fx_host, fx_spec, fx_static_suffixes ): log = access_log.AccessLogger() - ops = encoder_ops.EncoderOps(fx_client, "small", fx_spec, access_logger=log) + ops = encoder_ops.EncoderOps(fx_host, "small", fx_spec, access_logger=log) static_inode = ops._name_to_inode[f"small{fx_static_suffixes[0]}"] info = await ops.open(static_inode, os.O_RDONLY) await ops.release(info.fh) @@ -597,11 +594,9 @@ async def test_returns_statvfs_data_with_expected_fields(self, fx_ops): out = await fx_ops.statfs() assert isinstance(out, pyfuse3.StatvfsData) - async def test_block_count_matches_sum_of_file_sizes(self, fx_ops, fx_client): + async def test_block_count_matches_sum_of_file_sizes(self, fx_ops, fx_host): out = await fx_ops.statfs() - total = fx_client.stream_size + sum( - len(b) for b in fx_client.static_files.values() - ) + total = fx_host.stream_size + sum(len(b) for b in fx_host.static_files.values()) assert out.f_bsize > 0 assert out.f_frsize == out.f_bsize assert out.f_blocks == (total + out.f_bsize - 1) // out.f_bsize diff --git a/tests/test_encoder_protocol.py b/tests/test_encoder_protocol.py deleted file mode 100644 index 3fba80c..0000000 --- a/tests/test_encoder_protocol.py +++ /dev/null @@ -1,124 +0,0 @@ -"""Pure unit tests for the encoder-server wire protocol. - -The protocol module has no I/O — these tests pin its byte layouts and -roundtrip semantics. Metadata-reply shape is variable-arity (any -number of static-file bodies in any order, plus one streaming-file -size); both 2-static layouts used in production (PLINK bim/fam, BGEN -sample/bgi) are exercised here. -""" - -import errno -import struct - -import pytest - -from biofuse import encoder_protocol - - -class TestRequestFraming: - def test_get_metadata_request_is_just_tag(self): - assert encoder_protocol.pack_get_metadata_request() == b"M" - - def test_read_request_layout(self): - buf = encoder_protocol.pack_read_request(4096, 8192) - assert buf[:1] == b"R" - off, size = struct.unpack(" bytes: - buf = bytearray() - while len(buf) < n: - chunk = sock.recv(n - len(buf)) - if len(chunk) == 0: - raise EOFError(f"socket closed after {len(buf)}/{n} bytes") - buf.extend(chunk) - return bytes(buf) - - -def _read_status_and_body(sock: socket.socket) -> tuple[int, bytes]: - status = encoder_protocol.parse_status( - _recv_exact(sock, encoder_protocol.REPLY_STATUS_SIZE) - ) - if status <= 0: - return status, b"" - return status, _recv_exact(sock, status) - - -def _spawn_handle_connection( - session: encoder_server._ServerSession, -) -> tuple[socket.socket, threading.Thread]: - parent_sock, child_sock = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) - thread = threading.Thread( - target=encoder_server._handle_connection, - args=(child_sock, session), - daemon=True, - ) - thread.start() - return parent_sock, thread - - -class _FakeEncoder: - """In-memory encoder stand-in for direct unit tests of the - per-tag reply helpers. ``read(off, size)`` returns deterministic - bytes so callers can assert exact payloads.""" - - total_size = 4096 - - def __init__(self): - self.calls: list[tuple[int, int]] = [] - - def read(self, off: int, size: int) -> bytes: - self.calls.append((off, size)) - return bytes((off + i) & 0xFF for i in range(size)) - - -def _spawn_serve_connection( - session: encoder_server._ServerSession, -) -> tuple[socket.socket, threading.Thread]: - """Run ``_serve_connection`` in a daemon thread with a fresh - encoder bound to the thread. Returns ``(parent_sock, thread)``; - caller closes ``parent_sock`` and joins ``thread``. The child - socket is closed by the target wrapper once the loop exits.""" - parent_sock, child_sock = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) - - def target(): - try: - with session.spec.encoder_factory(session.reader, session.opts) as encoder: - encoder_server._serve_connection( - child_sock, session, encoder, "test-thread" - ) - finally: - try: - child_sock.close() - except OSError: - pass - - thread = threading.Thread(target=target, daemon=True) - thread.start() - return parent_sock, thread - - -class TestMakeErrorReply: - def test_oserror_uses_exc_errno_without_logging(self, caplog): - exc = OSError(errno.ENOENT, "missing") - with caplog.at_level(logging.DEBUG, logger="biofuse.encoder_server"): - reply = encoder_server._make_error_reply(exc, "ctx") - assert encoder_protocol.parse_status(reply) == -errno.ENOENT - emitted = [r for r in caplog.records if r.name == "biofuse.encoder_server"] - assert emitted == [] - - def test_non_oserror_uses_eio_and_logs_error_plus_debug_traceback(self, caplog): - exc = ValueError("boom") - with caplog.at_level(logging.DEBUG, logger="biofuse.encoder_server"): - reply = encoder_server._make_error_reply(exc, "during dispatch") - assert encoder_protocol.parse_status(reply) == -errno.EIO - encoder_records = [ - r for r in caplog.records if r.name == "biofuse.encoder_server" - ] - error_records = [r for r in encoder_records if r.levelno == logging.ERROR] - assert len(error_records) == 1 - msg = error_records[0].getMessage() - assert "during dispatch" in msg - assert f"errno {errno.EIO}" in msg - assert "boom" in msg - debug_records = [ - r - for r in encoder_records - if r.levelno == logging.DEBUG - and "during dispatch traceback" in r.getMessage() - ] - assert len(debug_records) == 1 - assert debug_records[0].exc_info is not None - - -class TestMakeMetadataReply: - def test_happy_path_matches_expected_static(self, fx_session, fx_expected): - spec, expected_static, expected_stream_size = fx_expected - reply = encoder_server._make_metadata_reply(fx_session) - status = encoder_protocol.parse_status( - reply[: encoder_protocol.REPLY_STATUS_SIZE] - ) - body = reply[encoder_protocol.REPLY_STATUS_SIZE :] - assert status == len(body) - n_static, stream_size = encoder_protocol.parse_metadata_prefix( - body[: encoder_protocol.META_PREFIX_SIZE] - ) - assert n_static == len(spec.static_suffixes(fx_session.opts)) - assert stream_size == expected_stream_size - sizes_size = n_static * encoder_protocol.META_SIZE_ENTRY_SIZE - sizes = encoder_protocol.parse_static_sizes( - body[ - encoder_protocol.META_PREFIX_SIZE : encoder_protocol.META_PREFIX_SIZE - + sizes_size - ], - n_static, - ) - suffixes = spec.static_suffixes(fx_session.opts) - for suffix, size in zip(suffixes, sizes, strict=True): - assert size == len(expected_static[suffix]) - - def test_missing_suffix_raises_value_error(self, fx_reader, fx_spec, fx_opts): - bad_spec = dataclasses.replace( - fx_spec, build_static_files=lambda reader, opts: {} - ) - session = encoder_server._ServerSession(fx_reader, bad_spec, fx_opts) - with pytest.raises(ValueError, match="returned keys"): - encoder_server._make_metadata_reply(session) - - def test_extra_suffix_raises_value_error(self, fx_reader, fx_spec, fx_opts): - original_build = fx_spec.build_static_files - - def with_extra(r, o): - return {**original_build(r, o), ".unexpected": b"x"} - - bad_spec = dataclasses.replace(fx_spec, build_static_files=with_extra) - session = encoder_server._ServerSession(fx_reader, bad_spec, fx_opts) - with pytest.raises(ValueError, match="returned keys"): - encoder_server._make_metadata_reply(session) - - -class TestMakeReadReply: - def test_full_payload_returns_packed_reply(self): - # ``_make_read_reply`` is called after the tag has already been - # consumed from the wire — send only the payload bytes. - parent, child = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) - encoder = _FakeEncoder() - try: - parent.sendall(struct.pack(" tuple[socket.socket, pathlib.Path]: - sock_path = tmp_path / "encoder.sock" - listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - listener.bind(str(sock_path)) - listener.listen(8) - return listener, sock_path - - def _start_server_thread( - self, listener: socket.socket, session: encoder_server._ServerSession - ) -> tuple[socket.socket, threading.Thread]: - parent_stop, child_stop = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) - thread = threading.Thread( - target=encoder_server.serve_forever, - args=(listener, child_stop, session), - daemon=True, - ) - thread.start() - return parent_stop, thread - - def test_concurrent_connections_each_get_metadata(self, fx_session, tmp_path): - listener, sock_path = self._bind_listener(tmp_path) - parent_stop, server_thread = self._start_server_thread(listener, fx_session) - try: - replies: list[bytes] = [] - errors: list[BaseException] = [] - lock = threading.Lock() - - def client_worker(): - try: - s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - s.connect(str(sock_path)) - try: - s.sendall(encoder_protocol.pack_get_metadata_request()) - _, body = _read_status_and_body(s) - with lock: - replies.append(body) - finally: - s.close() - except BaseException as exc: # noqa: BLE001 - with lock: - errors.append(exc) - - threads = [threading.Thread(target=client_worker) for _ in range(4)] - for t in threads: - t.start() - for t in threads: - t.join(timeout=10) - assert errors == [] - assert len(replies) == 4 - assert all(r == replies[0] for r in replies) - finally: - parent_stop.close() - server_thread.join(timeout=5) - assert not server_thread.is_alive() - - def test_concurrent_stream_reads_independent_state( - self, fx_session, fx_expected, tmp_path - ): - """Each connection runs in its own server thread with its own - encoder; full reads on distinct connections see byte-identical - streams regardless of interleaving.""" - _, _, expected_stream_size = fx_expected - listener, sock_path = self._bind_listener(tmp_path) - parent_stop, server_thread = self._start_server_thread(listener, fx_session) - try: - n = 4 - results: dict[int, bytes] = {} - errors: list[BaseException] = [] - lock = threading.Lock() - barrier = threading.Barrier(n) - - def worker(tid): - try: - s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - s.connect(str(sock_path)) - try: - barrier.wait(timeout=5) - s.sendall( - encoder_protocol.pack_read_request(0, expected_stream_size) - ) - _, body = _read_status_and_body(s) - with lock: - results[tid] = body - finally: - s.close() - except BaseException as exc: # noqa: BLE001 - with lock: - errors.append(exc) - - threads = [threading.Thread(target=worker, args=(i,)) for i in range(n)] - for t in threads: - t.start() - for t in threads: - t.join(timeout=30) - assert errors == [] - assert len(results) == n - reference = results[0] - assert len(reference) == expected_stream_size - for body in results.values(): - assert body == reference - finally: - parent_stop.close() - server_thread.join(timeout=5) - assert not server_thread.is_alive() - - def test_stop_signal_exits_accept_loop(self, fx_session, tmp_path): - listener, sock_path = self._bind_listener(tmp_path) - parent_stop, server_thread = self._start_server_thread(listener, fx_session) - # Server is in select(); close parent's stop end → child wakes. - parent_stop.close() - server_thread.join(timeout=5) - assert not server_thread.is_alive() - # The listener socket should now be closed; new connects fail. - s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - s.settimeout(1) - with pytest.raises(ConnectionRefusedError): - s.connect(str(sock_path)) - - -class TestServerMainHandshakeFailure: - """Static-file build failures (e.g. multi-allelic input) surface at - handshake time as errno replies on the wire. The subprocess must - keep running through a failed handshake and exit cleanly when - stop-signalled.""" - - @pytest.mark.parametrize( - "spec", [formats.PLINK_SPEC, formats.BGEN_SPEC], ids=["plink", "bgen"] - ) - def test_multiallelic_handshake_returns_errno( - self, fx_multiallelic_vcz, tmp_path, spec - ): - sock_path = tmp_path / "encoder.sock" - listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - listener.bind(str(sock_path)) - listener.listen(8) - parent_stop, child_stop = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) - ctx = mp.get_context("spawn") - opts = ( - vcztools.ViewPlinkOptions() - if spec.name == "plink" - else vcztools.ViewBgenOptions() - ) - proc = ctx.Process( - target=encoder_server._server_main, - args=( - listener, - child_stop, - str(fx_multiallelic_vcz.path), - spec, - opts, - ), - ) - proc.start() - listener.close() - child_stop.close() - parent_stop_closed = False - try: - client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - client_sock.settimeout(30) - deadline = time.monotonic() + 10.0 - while True: - try: - client_sock.connect(str(sock_path)) - break - except (FileNotFoundError, ConnectionRefusedError): - if time.monotonic() > deadline: - raise - time.sleep(0.05) - client_sock.sendall(encoder_protocol.pack_get_metadata_request()) - status_buf = _recv_exact(client_sock, encoder_protocol.REPLY_STATUS_SIZE) - status = encoder_protocol.parse_status(status_buf) - assert status < 0, f"expected errno reply, got status={status}" - assert -status == errno.EIO - client_sock.close() - parent_stop.close() - parent_stop_closed = True - proc.join(timeout=10) - assert not proc.is_alive(), "subprocess did not exit after stop signal" - assert proc.exitcode == 0, ( - f"subprocess exited with {proc.exitcode}; expected 0" - ) - finally: - if not parent_stop_closed: - parent_stop.close() - if proc.is_alive(): - proc.terminate() - proc.join(timeout=5) - try: - os.unlink(sock_path) - except OSError: - pass - - -class TestServerMainSmoke: - """End-to-end check that ``_server_main`` runs in a real subprocess.""" - - @pytest.mark.parametrize( - "spec", [formats.PLINK_SPEC, formats.BGEN_SPEC], ids=["plink", "bgen"] - ) - def test_spawn_metadata_handshake(self, fx_small_vcz, tmp_path, spec): - sock_path = tmp_path / "encoder.sock" - listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - listener.bind(str(sock_path)) - listener.listen(8) - parent_stop, child_stop = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) - ctx = mp.get_context("spawn") - opts = ( - vcztools.ViewPlinkOptions() - if spec.name == "plink" - else vcztools.ViewBgenOptions() - ) - proc = ctx.Process( - target=encoder_server._server_main, - args=( - listener, - child_stop, - str(fx_small_vcz.path), - spec, - opts, - ), - ) - proc.start() - listener.close() - child_stop.close() - try: - # Connect with a brief retry while the server's accept loop - # spins up. - deadline = time.monotonic() + 10 - client_sock = None - while time.monotonic() < deadline: - try: - client_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - client_sock.connect(str(sock_path)) - break - except OSError: - if client_sock is not None: - client_sock.close() - client_sock = None - time.sleep(0.05) - assert client_sock is not None, "could not connect to spawned server" - try: - client_sock.sendall(encoder_protocol.pack_get_metadata_request()) - status, body = _read_status_and_body(client_sock) - assert status > 0 - n_static, stream_size = encoder_protocol.parse_metadata_prefix( - body[: encoder_protocol.META_PREFIX_SIZE] - ) - assert n_static == len(spec.static_suffixes(opts)) - assert stream_size > 0 - finally: - client_sock.close() - finally: - parent_stop.close() - proc.join(timeout=10) - if proc.is_alive(): - proc.terminate() - proc.join(timeout=5) - assert not proc.is_alive() - assert proc.exitcode == 0 - try: - os.unlink(sock_path) - except OSError: - pass diff --git a/tests/test_formats.py b/tests/test_formats.py index 96ecb05..6beb876 100644 --- a/tests/test_formats.py +++ b/tests/test_formats.py @@ -1,6 +1,6 @@ """Tests for biofuse.formats — the per-format spec table. -Pins the duck-typed contract that ``encoder_server`` / ``encoder_ops`` +Pins the duck-typed contract that ``encoder_host`` / ``encoder_ops`` rely on: each spec produces the expected static sidecar bytes for the shared :class:`VczReader` under a given options dataclass, and each ``encoder_factory`` yields an encoder whose ``total_size`` reflects diff --git a/tests/test_plink_apps.py b/tests/test_plink_apps.py index 1317118..6475733 100644 --- a/tests/test_plink_apps.py +++ b/tests/test_plink_apps.py @@ -27,7 +27,7 @@ import vcztools from vcztools.plink import write_plink -from biofuse import access_log, encoder_client, encoder_ops, formats, fuse_adapter +from biofuse import access_log, encoder_host, encoder_ops, formats, fuse_adapter def _open_reader(path) -> object: @@ -74,12 +74,13 @@ async def fx_mounted_plink(tmp_path, fx_medium_vcz): write_plink(_open_reader(fx_medium_vcz.path), golden / "medium") log = access_log.AccessLogger() - sock_path = tmp_path / "plink.sock" - async with await encoder_client.EncoderClient.start( - str(fx_medium_vcz.path), sock_path, formats.PLINK_SPEC - ) as client: + async with await encoder_host.EncoderHost.start( + str(fx_medium_vcz.path), + formats.PLINK_SPEC, + opts=vcztools.ViewPlinkOptions(), + ) as host: ops = encoder_ops.EncoderOps( - client, "medium", formats.PLINK_SPEC, access_logger=log + host, "medium", formats.PLINK_SPEC, access_logger=log ) async with fuse_adapter.mount(ops, str(mnt)): await _wait_for_mount(mnt) @@ -102,11 +103,12 @@ async def _mount_plink(tmp_path, vcz): mnt = tmp_path / "mnt" mnt.mkdir() basename = vcz.path.stem - sock_path = tmp_path / "plink.sock" - async with await encoder_client.EncoderClient.start( - str(vcz.path), sock_path, formats.PLINK_SPEC - ) as client: - ops = encoder_ops.EncoderOps(client, basename, formats.PLINK_SPEC) + async with await encoder_host.EncoderHost.start( + str(vcz.path), + formats.PLINK_SPEC, + opts=vcztools.ViewPlinkOptions(), + ) as host: + ops = encoder_ops.EncoderOps(host, basename, formats.PLINK_SPEC) async with fuse_adapter.mount(ops, str(mnt)): await _wait_for_mount(mnt) yield mnt, basename