Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions python/lib/sift_client/_internal/sync_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import annotations

import asyncio
import concurrent.futures
import inspect
import sys
from functools import wraps
Expand Down Expand Up @@ -67,16 +66,10 @@ def _run(self, coro):
coro.close()
raise RuntimeError("Sift client is closed; cannot make synchronous API calls.")

timeout = getattr(client, "sync_call_timeout", None)
future = asyncio.run_coroutine_threadsafe(coro, loop)
try:
return future.result(timeout=timeout)
except concurrent.futures.TimeoutError as exc:
future.cancel()
raise TimeoutError(
f"Sift synchronous API call exceeded its {timeout}s deadline; the server "
"did not respond in time and the request was cancelled."
) from exc
# No wall-clock cap here: stalled calls are bounded at the transport layer
# (GrpcConfig/RestConfig request_timeout), and waiting on the whole coroutine
# lets methods like wait_until_complete honor their own timeout_secs.
return asyncio.run_coroutine_threadsafe(coro, loop).result()

namespace = {
"__module__": module,
Expand Down
28 changes: 5 additions & 23 deletions python/lib/sift_client/_tests/_internal/test_sync_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
class MockClient:
"""Mock client that simulates the SiftClient with an event loop."""

def __init__(self, sync_call_timeout: float | None = None):
def __init__(self):
"""Initialize the mock client."""
self._default_loop = asyncio.new_event_loop()
self._loop_running = True
self.sync_call_timeout = sync_call_timeout
atexit.register(self.close_sync)

def _run_default_loop():
Expand Down Expand Up @@ -90,12 +89,6 @@ async def async_method_with_exception(self) -> None:
await asyncio.sleep(0.01)
raise ValueError("Test exception")

async def slow_async_method(self, delay: float = 1.0) -> str:
"""Test async method that sleeps, to exercise the sync-call deadline."""
self._record_call("slow_async_method")
await asyncio.sleep(delay)
return "slow_result"

async def async_method_with_executor(self) -> str:
"""Test async method that uses run_in_executor, like wait_and_download."""
self._record_call("async_method_with_executor")
Expand Down Expand Up @@ -246,11 +239,11 @@ async def caller():
loop.close()


class TestSyncWrapperTimeoutAndShutdown:
"""Tests for the sync-call deadline backstop and the stopped-loop guard."""
class TestSyncWrapperShutdown:
"""Tests for the stopped-loop fail-fast guard."""

def _make_sync_resource(self, sync_call_timeout: float | None = None):
mock_client = MockClient(sync_call_timeout=sync_call_timeout)
def _make_sync_resource(self):
mock_client = MockClient()
MockResource = generate_sync_api(MockResourceAsync, "MockResource") # noqa: N806
return MockResource(mock_client, value="testVal")

Expand All @@ -260,14 +253,3 @@ def test_call_after_close_raises_runtime_error(self):
resource._async_impl.client.close_sync()
with pytest.raises(RuntimeError, match="Sift client is closed"):
resource.async_method("arg", 1)

def test_slow_call_times_out(self):
"""A call that outlives the sync deadline raises TimeoutError, not a hang."""
resource = self._make_sync_resource(sync_call_timeout=0.1)
with pytest.raises(TimeoutError, match="did not respond in time"):
resource.slow_async_method(delay=5.0)

def test_fast_call_within_timeout_succeeds(self):
"""A call that finishes within the deadline returns normally."""
resource = self._make_sync_resource(sync_call_timeout=5.0)
assert resource.slow_async_method(delay=0.01) == "slow_result"
43 changes: 42 additions & 1 deletion python/lib/sift_client/_tests/_internal/test_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest

from sift_client.transport.grpc_transport import GrpcClient, GrpcConfig
from sift_client.transport.rest_transport import RestConfig
from sift_client.transport.rest_transport import DEFAULT_REST_TIMEOUT, RestClient, RestConfig


class TestGrpcConfigUrl:
Expand Down Expand Up @@ -109,3 +109,44 @@ def test_url_keeps_https(self):
def test_url_keeps_http(self):
config = RestConfig(base_url="http://rest.sift.com", api_key="api", use_ssl=False)
assert config.base_url == "http://rest.sift.com"


class TestRestRequestTimeout:
"""The REST client applies a default per-request timeout so a stalled socket
fails fast instead of blocking the calling thread forever.
"""

@staticmethod
def _client(**config_kwargs) -> RestClient:
return RestClient(
RestConfig(base_url="https://rest.sift.com", api_key="api", **config_kwargs)
)

@staticmethod
def _capture_request_kwargs(client: RestClient) -> dict:
captured: dict = {}

def fake_request(method, url, **kwargs):
captured.update(kwargs)
return object()

client._client._session.request = fake_request # type: ignore[assignment]
return captured

def test_default_timeout_applied(self):
client = self._client()
captured = self._capture_request_kwargs(client)
client.get("/v1/ping")
assert captured["timeout"] == DEFAULT_REST_TIMEOUT

def test_per_call_timeout_overrides_default(self):
client = self._client()
captured = self._capture_request_kwargs(client)
client.get("/v1/ping", timeout=3.0)
assert captured["timeout"] == 3.0

def test_timeout_disabled_when_config_none(self):
client = self._client(request_timeout=None)
captured = self._capture_request_kwargs(client)
client.get("/v1/ping")
assert "timeout" not in captured
5 changes: 0 additions & 5 deletions python/lib/sift_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,6 @@ def is_loop_running(self) -> bool:
"""Whether the background event loop is still accepting synchronous API work."""
return self._grpc_client.is_loop_running

@property
def sync_call_timeout(self) -> float | None:
"""Deadline in seconds for a blocking synchronous API call, or None if disabled."""
return self._grpc_client.sync_call_timeout

@property
def rest_client(self) -> RestClient:
"""The REST client used by the SiftClient for making REST API calls."""
Expand Down
18 changes: 0 additions & 18 deletions python/lib/sift_client/transport/grpc_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@
# Configure logging
logger = logging.getLogger(__name__)

# How far the blocking sync deadline sits above the per-RPC deadline. The gRPC
# deadline should fire first and cancel the request; the sync backstop only trips
# if that never happens.
_SYNC_CALL_TIMEOUT_MARGIN_SECONDS = 15.0


def _suppress_blocking_io(loop, context):
"""Suppress benign BlockingIOError from gRPC's PollerCompletionQueue.
Expand Down Expand Up @@ -164,19 +159,6 @@ def is_loop_running(self) -> bool:
"""
return self._loop_running and self._default_loop.is_running()

@property
def sync_call_timeout(self) -> float | None:
"""Deadline in seconds for a blocking sync API call, or None if disabled.

Sits above the per-RPC deadline by a margin so the gRPC deadline fires
first and cancels the in-flight request; this is only a backstop for the
case where the RPC deadline never trips.
"""
request_timeout = self._config.request_timeout
if request_timeout is None:
return None
return request_timeout + _SYNC_CALL_TIMEOUT_MARGIN_SECONDS

def get_stub(self, stub_class: type[Any]) -> Any:
"""Get an async stub bound to the current event loop.
Creates a channel and stub for this loop if needed.
Expand Down
15 changes: 15 additions & 0 deletions python/lib/sift_client/transport/rest_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
# Configure logging
logger = logging.getLogger(__name__)

# Default timeout (seconds) for REST requests that don't set their own; applies to
# connect and read. The read leg is the gap between bytes, not the whole transfer, so a
# stalled socket fails fast while a healthy long download is not cut off.
DEFAULT_REST_TIMEOUT: float = 60.0


class RestConfig:
"""Configuration for REST API clients."""
Expand All @@ -30,6 +35,7 @@ def __init__(
use_ssl: bool = True,
cert_via_openssl: bool = False,
retry: Retry = _DEFAULT_REST_RETRY,
request_timeout: float | tuple[float, float] | None = DEFAULT_REST_TIMEOUT,
):
"""Initialize the REST configuration.

Expand All @@ -39,6 +45,9 @@ def __init__(
use_ssl: Whether to use HTTPS.
cert_via_openssl: Whether to use OpenSSL for SSL/TLS.
retry: The retry configuration for requests.
request_timeout: Default timeout in seconds for requests that don't set their
own; applies to connect and read. Pass a (connect, read) tuple to split
them. Defaults to DEFAULT_REST_TIMEOUT; set to None to disable.
"""
if not base_url.startswith("http"):
# urljoin (used when executing requests) requires URL starting with http or https
Expand All @@ -48,6 +57,7 @@ def __init__(
self.use_ssl = use_ssl
self.cert_via_openssl = cert_via_openssl
self.retry = retry
self.request_timeout = request_timeout

def _to_sift_rest_config(self) -> SiftRestConfig:
"""Convert to a SiftRestConfig for backwards compatibility. Will be removed in the future.
Expand Down Expand Up @@ -79,6 +89,7 @@ def __init__(self, config: RestConfig):
"""
self._base_url = config.base_url
self._config = config
self._request_timeout = config.request_timeout
self._client = self._create_client()

def _create_client(self) -> _RestService:
Expand Down Expand Up @@ -119,6 +130,10 @@ def _execute(
**kwargs,
) -> requests.Response:
full_url = urljoin(self.base_url, endpoint)
# Apply the default timeout unless the caller set one, so a stalled socket
# fails instead of blocking forever.
if "timeout" not in kwargs and self._request_timeout is not None:
kwargs["timeout"] = self._request_timeout
return self._client._session.request(method, full_url, headers=headers, data=data, **kwargs)

def get(self, endpoint: str, headers: dict | None = None, **kwargs) -> requests.Response:
Expand Down
Loading