Skip to content

Commit

Permalink
Merge pull request #1024 from stinovlas/1022-add-psycopg-pool-factory
Browse files Browse the repository at this point in the history
  • Loading branch information
ewjoachim committed Mar 25, 2024
2 parents 2e9a0ea + 58c1627 commit a3c9f0f
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 9 deletions.
23 changes: 23 additions & 0 deletions docs/howto/basics/connector.md
Expand Up @@ -91,6 +91,29 @@ parameters from the [psycopg_pool.AsyncConnectionPool()](https://www.psycopg.org
Similarly, the {py:class}`SyncPsycopgConnector` can handle all the parameters from the
[psycopg_pool.ConnectionPool()](https://www.psycopg.org/psycopg3/docs/api/pool.html#psycopg_pool.ConnectionPool) function.

### Custom connection pool

It's possible to use custom connection pool with {py:class}`PsycopgConnector`. It
accepts `poll_factory` keyword argument. You can pass any callable that returns
{py:class}`psycopg_pool.AsyncConnectionPool` instance:

```
import procrastinate
import psycopg_pool
app = procrastinate.App(
connector=procrastinate.PsycopgConnector(
pool_factory=psycopg_pool.AsyncNullConnectionPool,
conninfo="postgres://user:password@host:port/dbname",
)
)
```

In this case, {py:class}`AsyncNullConnectionPool` receives `conninfo` keyword argument
and creates null connection pool (which effectively disables pooling). This is useful
when you use [PgBouncer] or some other external pooler in order to resolve pooling outside
of your application.

[libpq connection string]: https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
[libpq environment variables]: https://www.postgresql.org/docs/current/libpq-envars.html
[psycopg connection arguments]: https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING-KEYWORD-VALUE
[PgBouncer]: https://www.pgbouncer.org/
28 changes: 19 additions & 9 deletions procrastinate/psycopg_connector.py
Expand Up @@ -50,19 +50,20 @@ def __init__(
*,
json_dumps: Callable | None = None,
json_loads: Callable | None = None,
pool_factory: Callable[
..., psycopg_pool.AsyncConnectionPool
] = psycopg_pool.AsyncConnectionPool,
**kwargs: Any,
):
"""
Create a PostgreSQL connector using psycopg. The connector uses an
``psycopg_pool.AsyncConnectionPool``, which is created internally, or
set into the connector by calling `App.open_async`.
set into the connector by calling `App.open_async`. You can also pass
custom callable which returns ``psycopg_pool.AsyncConnectionPool`` instance
as ``pool_factory`` kwarg.
Note that if you want to use a ``psycopg_pool.AsyncNullConnectionPool``,
you will need to initialize it yourself and pass it to the connector
through the ``App.open_async`` method.
All other arguments than ``json_dumps`` and ``json_loads`` are passed
to ``psycopg_pool.AsyncConnectionPool`` (see psycopg documentation__).
All other arguments than ``pool_factory``, ``json_dumps`` and ``json_loads`` are passed
to ``pool_factory`` callable (see psycopg documentation__).
``json_dumps`` and ``json_loads`` are used to configure new connections
created by the pool with ``psycopg.types.json.set_json_dumps`` and
Expand All @@ -81,8 +82,17 @@ def __init__(
A function to deserialize JSON objects from a string. If not
provided, JSON objects will be deserialized using psycopg's default
JSON deserializer.
pool_factory :
A callable which returns ``psycopg_pool.AsyncConnectionPool`` instance.
``kwargs`` will be passed to this callable as keyword arguments.
Default is ``psycopg_pool.AsyncConnectionPool``.
You can set this to ``psycopg_pool.AsyncNullConnectionPool`` to disable
pooling.
"""
self._async_pool: psycopg_pool.AsyncConnectionPool | None = None
self._pool_factory: Callable[..., psycopg_pool.AsyncConnectionPool] = (
pool_factory
)
self._pool_externally_set: bool = False
self._json_loads = json_loads
self._json_dumps = json_dumps
Expand Down Expand Up @@ -138,12 +148,12 @@ async def open_async(

await self._async_pool.open(wait=True) # type: ignore

@staticmethod
@wrap_exceptions()
async def _create_pool(
self,
pool_args: dict[str, Any],
) -> psycopg_pool.AsyncConnectionPool:
return psycopg_pool.AsyncConnectionPool(
return self._pool_factory(
**pool_args,
# Not specifying open=False raises a warning and will be deprecated.
# It makes sense, as we can't really make async I/Os in a constructor.
Expand Down
31 changes: 31 additions & 0 deletions tests/unit/test_psycopg_connector.py
Expand Up @@ -65,6 +65,37 @@ async def test_open_async_pool_argument_specified(mocker, connector):
assert connector._async_pool == pool


async def test_open_async_pool_factory(mocker):
pool = mocker.AsyncMock()

def pool_factory(**kwargs):
return pool

connector = psycopg_connector.PsycopgConnector(pool_factory=pool_factory)

await connector.open_async()

assert connector._async_pool is pool
assert connector._async_pool.open.await_count == 1


async def test_open_async_pool_factory_argument_specified(mocker):
pool = mocker.AsyncMock()

def pool_factory(**kwargs):
return pool

connector = psycopg_connector.PsycopgConnector(pool_factory=pool_factory)
mocker.patch.object(connector, "_create_pool")
another_pool = mocker.AsyncMock()

await connector.open_async(another_pool)

assert connector._pool_externally_set is True
assert connector._create_pool.call_count == 0
assert connector._async_pool is another_pool


def test_get_pool(connector):
with pytest.raises(exceptions.AppNotOpen):
_ = connector.pool
Expand Down

0 comments on commit a3c9f0f

Please sign in to comment.