Skip to content

feature: configurable acquire timeout and burst capacity for QuerySessionPool #806

@vladkolotvin

Description

@vladkolotvin

Problem

QuerySessionPool.acquire() blocks indefinitely when the pool is exhausted and no sessions are released back. In production this is a frequent source of silent, hard-to-diagnose deadlocks: any reentrant call that needs a second session while the caller already holds one hangs forever with no timeout, no error, and no observable signal.

Current implementation in ydb/aio/query/pool.py:

async def acquire(self) -> QuerySession:
    ...
    if session is None and self._current_size == self._size:
        queue_get = asyncio.ensure_future(self._queue.get())
        task_stop = asyncio.ensure_future(asyncio.ensure_future(self._should_stop.wait()))
        done, _ = await asyncio.wait((queue_get, task_stop), return_when=asyncio.FIRST_COMPLETED)
        # no timeout branch — waits forever on queue.get()

The same pattern exists in the sync pool.

Impact (real incident)

We hit a full-service deadlock on production. Pattern:

  • As many parallel request handlers as the pool size each hold an outer session via retry_tx_async.
  • Inside each handler, helper code opens another retry_tx_async on the same pool to read auxiliary data. That nested call checks out a second session while the outer one is still held.
  • The pool is already fully occupied by the outer holders, so every nested retry_tx_async parks in pool.acquire()queue.get() forever. The outer holders cannot finish until their nested call returns, so no session is ever released.

Result: all coroutines parked in asyncio/queues.py get inside pool.py retry_tx_async, every session held by outer work, zero throughput, no timeouts, no errors. Diagnosis required pyrasite/pystack on a live pod.

An acquire timeout alone would have flipped this silent deadlock into a loud, retriable error within seconds.

Proposed changes

1. acquire_timeout on QuerySessionPool

Add a timeout for pool-level waits. Also expose it as a per-call override.

class QuerySessionPool:
    def __init__(
        self,
        driver,
        size: int = 100,
        *,
        acquire_timeout: Optional[float] = None,   # new
        ...
    ): ...

    async def acquire(self, *, timeout: Optional[float] = None) -> QuerySession:
        ...

Behavior: if the pool is at capacity and no session becomes available within timeout seconds, raise a dedicated exception (e.g. SessionPoolEmpty), distinct from QueryClientError so users can handle/retry it cleanly. When timeout is None, preserve the current infinite-wait behavior for backward compatibility.

This flows into retry_tx_async / retry_operation_async / execute_with_retries: the underlying checkout() respects the pool-level timeout, and RetrySettings should allow overriding it per call.

Note: acquire() already has a CancelledError branch that carefully cancels the queue_get future and releases a session if it was just obtained. Timeout handling must follow the same pattern — otherwise a timed-out acquire can race with a just-completed queue.get() and leak a session.

2. Burst capacity (max_size)

Optional flag to allow temporary growth above size under pressure, with automatic shrink:

QuerySessionPool(
    driver,
    size=100,          # steady-state target
    max_size=200,      # new: hard cap; pool may grow up to this under load
    shrink_delay=60,   # new: seconds an over-capacity session sits idle before it's deleted
)

Semantics:

  • acquire() never blocks while self._current_size < max_size; a new session is created on demand.
  • On release(), if self._current_size > size and there are already idle sessions in the queue, the released session is deleted instead of re-queued, until self._current_size == size. This keeps the pool shrinking gracefully without killing sessions that are about to be reused.
  • Alternative (simpler): a background reaper periodically deletes sessions idle longer than shrink_delay while current_size > size.

This handles transient spikes (bursts of parallel requests) without permanently paying for an overprovisioned pool, and without forcing operators to pick a worst-case static size.

The two features are orthogonal; adding (1) alone already removes the deadlock class. (2) additionally removes a large category of "pool too small" latency spikes.

Minimal reproduction

Works against any YDB (local docker, single-node). No auth/proxy needed.

import asyncio
import ydb
import ydb.aio


ENDPOINT = "grpc://localhost:2136"
DATABASE = "/local"


async def main() -> None:
    async with ydb.aio.Driver(endpoint=ENDPOINT, database=DATABASE) as driver:
        await driver.wait(fail_fast=True, timeout=30)
        async with ydb.aio.QuerySessionPool(driver, size=1) as pool:

            async def inner(tx):
                await tx.execute("SELECT 2;")

            async def outer(tx):
                await tx.execute("SELECT 1;")
                # Holding the only session in the pool, we start another
                # retry_tx_async that wants a second one.
                await pool.retry_tx_async(inner)

            try:
                await asyncio.wait_for(pool.retry_tx_async(outer), timeout=10)
            except asyncio.TimeoutError:
                print("DEADLOCK: nested retry_tx_async waits forever for a session.")


if __name__ == "__main__":
    asyncio.run(main())

Expected output after ~10s:

DEADLOCK: nested retry_tx_async waits forever for a session.

The inner retry_tx_async is parked in pool.acquire()self._queue.get() with no timeout. The only session is held by the outer transaction, which cannot complete until the inner call returns. The same pattern deadlocks at any size == N whenever N outer holders each need one additional session — the single-session example is just the smallest case.

With the proposed acquire_timeout=5, the inner acquire would raise a timeout exception, the outer retry_tx_async would bubble it up (or retry on a retriable class), and workers would fail loudly instead of hanging.

Alternatives considered

  • "Just size the pool correctly." Not enough: any reentrant pattern can deadlock regardless of absolute size (N parallel outer holders, each needing a second session, deadlocks at any size == N). Users cannot always know the maximum reentrancy depth statically.
  • User-space asyncio.wait_for(pool.acquire(), timeout). Technically possible but dangerous — the existing acquire() cancellation branch is non-trivial (it must return a session it already obtained from the queue back via release). Forcing users to wrap this themselves invites correctness bugs; the primitive belongs in the SDK.
  • Caller responsibility to avoid reentrancy. Works in trivial cases, breaks as soon as there are layered services, ORMs, or helpers that do their own checkouts.

References

  • ydb/aio/query/pool.py, acquire() — the blocking queue.get() call site.
  • The same pattern exists in the sync pool and should be changed symmetrically.

Backward compatibility

  • acquire_timeout=None (default) keeps current behavior → strict opt-in, no breakage.
  • max_size=None (or max_size == size) keeps current behavior.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions