From b7511e7dcbbb0d931e1af809e0a13374bd92d64e Mon Sep 17 00:00:00 2001 From: livepeer-tessa Date: Thu, 2 Apr 2026 06:47:34 +0000 Subject: [PATCH] Handle TransferEncodingError and ClientConnectorError as graceful network disconnects When an orchestrator truncates a trickle transfer mid-stream (HTTP 400 + incomplete transfer encoding), aiohttp raises ClientPayloadError (with TransferEncodingError as a subclass). Previously this propagated as an application error in both ChannelReader and JSONLReader, causing noisy ERROR-level logs and unclean session teardown. Changes: - channel_reader.py: Catch aiohttp.ClientPayloadError in both ChannelReader and JSONLReader before the generic Exception handler. Log at WARNING and return cleanly instead of wrapping as LivepeerGatewayError. This stops the error from bubbling up to livepeer_app.py's control channel handler. - trickle_publisher.py: Demote ClientConnectorError in _run_delete from ERROR to DEBUG. When the orchestrator is already down, connection-refused errors during trickle DELETE teardown are expected and not actionable. Fixes: daydreamlive/scope#805 Related: daydreamlive/scope#771 (similar but for EOFError on clean disconnect) Signed-off-by: livepeer-tessa --- src/livepeer_gateway/channel_reader.py | 17 +++++++++++++++++ src/livepeer_gateway/trickle_publisher.py | 5 ++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/livepeer_gateway/channel_reader.py b/src/livepeer_gateway/channel_reader.py index bc51b6b..4af657d 100644 --- a/src/livepeer_gateway/channel_reader.py +++ b/src/livepeer_gateway/channel_reader.py @@ -1,11 +1,16 @@ from __future__ import annotations import json +import logging from typing import Any, AsyncIterator +import aiohttp + from .errors import LivepeerGatewayError from .trickle_subscriber import TrickleSubscriber +_LOG = logging.getLogger(__name__) + class ChannelReader: def __init__(self, events_url: str) -> None: @@ -74,6 +79,12 @@ async def _iter() -> AsyncIterator[dict[str, Any]]: yield data except LivepeerGatewayError: raise + except aiohttp.ClientPayloadError as e: + # Orchestrator truncated the transfer mid-stream (e.g. TransferEncodingError 400) + # or went unreachable. Treat as a clean network disconnect — stop iterating + # rather than propagating as an application error. + _LOG.warning("Trickle events channel disconnected (network): %s: %s", e.__class__.__name__, e) + return except Exception as e: raise LivepeerGatewayError( f"Trickle events subscription error: {e.__class__.__name__}: {e}" @@ -167,6 +178,12 @@ async def _iter() -> AsyncIterator[dict[str, Any]]: await segment.close() except LivepeerGatewayError: raise + except aiohttp.ClientPayloadError as e: + # Orchestrator truncated the transfer mid-stream (e.g. TransferEncodingError 400) + # or went unreachable. Treat as a clean network disconnect — stop iterating + # rather than propagating as an application error. + _LOG.warning("Trickle JSONL channel disconnected (network): %s: %s", e.__class__.__name__, e) + return except Exception as e: raise LivepeerGatewayError( f"Trickle JSONL subscription error: {e.__class__.__name__}: {e}" diff --git a/src/livepeer_gateway/trickle_publisher.py b/src/livepeer_gateway/trickle_publisher.py index 179de50..4c45d47 100644 --- a/src/livepeer_gateway/trickle_publisher.py +++ b/src/livepeer_gateway/trickle_publisher.py @@ -290,7 +290,10 @@ async def _run_delete(self) -> None: try: resp = await self._session.delete(self.url) resp.release() - # Suppress any shutdown-time exceptions, including cancellation. + except aiohttp.ClientConnectorError as exc: + # Orchestrator already unreachable — suppress, no need to log at ERROR. + _LOG.debug("Trickle DELETE: orchestrator unreachable (suppressed) url=%s: %s", self.url, exc) + # Suppress any other shutdown-time exceptions, including cancellation. except BaseException: _LOG.error("Trickle DELETE exception url=%s", self.url, exc_info=True)