diff --git a/packages/core/minos-microservice-aggregate/minos/aggregate/events/repositories/pg.py b/packages/core/minos-microservice-aggregate/minos/aggregate/events/repositories/pg.py index 43dff88d1..ef7be6b61 100644 --- a/packages/core/minos-microservice-aggregate/minos/aggregate/events/repositories/pg.py +++ b/packages/core/minos-microservice-aggregate/minos/aggregate/events/repositories/pg.py @@ -10,9 +10,6 @@ UUID, ) -from psycopg2 import ( - IntegrityError, -) from psycopg2.sql import ( SQL, Composable, @@ -24,6 +21,7 @@ NULL_UUID, Config, DatabaseMixin, + IntegrityException, ) from ...exceptions import ( @@ -42,7 +40,7 @@ class PostgreSqlEventRepository(DatabaseMixin, EventRepository): @classmethod def _from_config(cls, config: Config, **kwargs) -> Optional[EventRepository]: - return super()._from_config(config, **config.get_database_by_name("event"), **kwargs) + return super()._from_config(config, database_key=None, **kwargs) async def _setup(self): """Setup miscellaneous repository thing. @@ -65,7 +63,7 @@ async def _submit(self, entry: EventEntry, **kwargs) -> EventEntry: try: response = await self.submit_query_and_fetchone(query, params, lock=lock) - except IntegrityError: + except IntegrityException: raise EventRepositoryConflictException( f"{entry!r} could not be submitted due to a key (uuid, version, transaction) collision", await self.offset, diff --git a/packages/core/minos-microservice-aggregate/minos/aggregate/snapshots/pg/abc.py b/packages/core/minos-microservice-aggregate/minos/aggregate/snapshots/pg/abc.py index 6f835a15a..88246d393 100644 --- a/packages/core/minos-microservice-aggregate/minos/aggregate/snapshots/pg/abc.py +++ b/packages/core/minos-microservice-aggregate/minos/aggregate/snapshots/pg/abc.py @@ -18,7 +18,7 @@ class PostgreSqlSnapshotSetup(DatabaseMixin): @classmethod def _from_config(cls: Type[T], config: Config, **kwargs) -> T: - return cls(**config.get_database_by_name("snapshot"), **kwargs) + return cls(database_key=None, **kwargs) async def _setup(self) -> None: await self.submit_query('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";', lock="uuid-ossp") diff --git a/packages/core/minos-microservice-aggregate/minos/aggregate/snapshots/pg/api.py b/packages/core/minos-microservice-aggregate/minos/aggregate/snapshots/pg/api.py index 9a6357171..2ed9c1c73 100644 --- a/packages/core/minos-microservice-aggregate/minos/aggregate/snapshots/pg/api.py +++ b/packages/core/minos-microservice-aggregate/minos/aggregate/snapshots/pg/api.py @@ -51,7 +51,7 @@ def _from_config(cls, config: Config, **kwargs) -> PostgreSqlSnapshotRepository: if "writer" not in kwargs: kwargs["writer"] = PostgreSqlSnapshotWriter.from_config(config, **kwargs) - return cls(**config.get_database_by_name("snapshot"), **kwargs) + return cls(database_key=None, **kwargs) async def _setup(self) -> None: await self.writer.setup() diff --git a/packages/core/minos-microservice-aggregate/minos/aggregate/snapshots/pg/readers.py b/packages/core/minos-microservice-aggregate/minos/aggregate/snapshots/pg/readers.py index 7ec0eff9d..290cfa111 100644 --- a/packages/core/minos-microservice-aggregate/minos/aggregate/snapshots/pg/readers.py +++ b/packages/core/minos-microservice-aggregate/minos/aggregate/snapshots/pg/readers.py @@ -128,15 +128,10 @@ async def find_entries( qb = PostgreSqlSnapshotQueryBuilder(name, condition, ordering, limit, transaction_uuids, exclude_deleted) query, parameters = qb.build() - async with self.cursor() as cursor: - # noinspection PyTypeChecker - await cursor.execute(query, parameters) - if streaming_mode: - async for row in cursor: - # noinspection PyArgumentList - yield SnapshotEntry(*row) - return - else: - rows = await cursor.fetchall() - for row in rows: - yield SnapshotEntry(*row) + async_iterable = self.submit_query_and_iter(query, parameters) + if streaming_mode: + async for row in async_iterable: + yield SnapshotEntry(*row) + else: + for row in [row async for row in async_iterable]: + yield SnapshotEntry(*row) diff --git a/packages/core/minos-microservice-aggregate/minos/aggregate/transactions/repositories/pg.py b/packages/core/minos-microservice-aggregate/minos/aggregate/transactions/repositories/pg.py index eee96b7fa..0a1281a39 100644 --- a/packages/core/minos-microservice-aggregate/minos/aggregate/transactions/repositories/pg.py +++ b/packages/core/minos-microservice-aggregate/minos/aggregate/transactions/repositories/pg.py @@ -34,7 +34,7 @@ class PostgreSqlTransactionRepository(DatabaseMixin, TransactionRepository): @classmethod def _from_config(cls, config: Config, **kwargs) -> Optional[PostgreSqlTransactionRepository]: - return super()._from_config(config, **config.get_database_by_name("transaction"), **kwargs) + return super()._from_config(config, database_key=None, **kwargs) async def _setup(self): await self.submit_query('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";', lock="uuid-ossp") diff --git a/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_events/test_repositories/test_pg.py b/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_events/test_repositories/test_pg.py index ec71a3979..a215d5c73 100644 --- a/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_events/test_repositories/test_pg.py +++ b/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_events/test_repositories/test_pg.py @@ -1,12 +1,11 @@ import unittest -import aiopg - from minos.aggregate import ( EventRepository, PostgreSqlEventRepository, ) from minos.common import ( + AiopgDatabaseClient, DatabaseClientPool, ) from minos.common.testing import ( @@ -18,6 +17,7 @@ ) +# noinspection SqlNoDataSourceInspection class TestPostgreSqlEventRepositorySubmit(EventRepositorySubmitTestCase, PostgresAsyncTestCase): __test__ = True @@ -30,20 +30,18 @@ def test_constructor(self): pool = DatabaseClientPool.from_config(self.config) repository = PostgreSqlEventRepository(pool) self.assertIsInstance(repository, PostgreSqlEventRepository) - self.assertEqual(pool, repository.pool) + self.assertIsInstance(repository.pool, DatabaseClientPool) def test_from_config(self): repository = PostgreSqlEventRepository.from_config(self.config) - repository_config = self.config.get_database_by_name("event") - self.assertEqual(repository_config["database"], repository.pool.database) + self.assertIsInstance(repository.pool, DatabaseClientPool) async def test_setup(self): - async with aiopg.connect(**self.config.get_default_database()) as connection: - async with connection.cursor() as cursor: - await cursor.execute( - "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'aggregate_event');" - ) - response = (await cursor.fetchone())[0] + async with AiopgDatabaseClient(**self.config.get_default_database()) as client: + await client.execute( + "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'aggregate_event');" + ) + response = (await client.fetch_one())[0] self.assertTrue(response) diff --git a/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_abc.py b/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_abc.py index ace68416c..207afc7c1 100644 --- a/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_abc.py +++ b/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_abc.py @@ -1,10 +1,11 @@ import unittest -import aiopg - from minos.aggregate import ( PostgreSqlSnapshotSetup, ) +from minos.common import ( + AiopgDatabaseClient, +) from minos.common.testing import ( PostgresAsyncTestCase, ) @@ -13,27 +14,25 @@ ) +# noinspection SqlNoDataSourceInspection class TestPostgreSqlSnapshotSetup(AggregateTestCase, PostgresAsyncTestCase): async def test_setup_snapshot_table(self): async with PostgreSqlSnapshotSetup.from_config(self.config): - async with aiopg.connect(**self.config.get_default_database()) as connection: - async with connection.cursor() as cursor: - await cursor.execute( - "SELECT EXISTS (SELECT FROM pg_tables " - "WHERE schemaname = 'public' AND tablename = 'snapshot');" - ) - observed = (await cursor.fetchone())[0] + async with AiopgDatabaseClient(**self.config.get_default_database()) as client: + await client.execute( + "SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = 'snapshot');" + ) + observed = (await client.fetch_one())[0] self.assertEqual(True, observed) async def test_setup_snapshot_aux_offset_table(self): async with PostgreSqlSnapshotSetup.from_config(self.config): - async with aiopg.connect(**self.config.get_default_database()) as connection: - async with connection.cursor() as cursor: - await cursor.execute( - "SELECT EXISTS (SELECT FROM pg_tables WHERE " - "schemaname = 'public' AND tablename = 'snapshot_aux_offset');" - ) - observed = (await cursor.fetchone())[0] + async with AiopgDatabaseClient(**self.config.get_default_database()) as client: + await client.execute( + "SELECT EXISTS (SELECT FROM pg_tables WHERE " + "schemaname = 'public' AND tablename = 'snapshot_aux_offset');" + ) + observed = (await client.fetch_one())[0] self.assertEqual(True, observed) diff --git a/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_queries.py b/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_queries.py index d4583827d..467cdd3f3 100644 --- a/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_queries.py +++ b/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_queries.py @@ -7,7 +7,6 @@ uuid4, ) -import aiopg from psycopg2.extras import ( Json, ) @@ -29,6 +28,7 @@ ) from minos.common import ( NULL_UUID, + AiopgDatabaseClient, ) from minos.common.testing import ( PostgresAsyncTestCase, @@ -426,8 +426,8 @@ async def test_build_complex(self): self.assertEqual(self._flatten_parameters(expected_parameters), self._flatten_parameters(observed[1])) async def _flatten_query(self, query) -> str: - async with aiopg.connect(**self.config.get_default_database()) as connection: - return query.as_string(connection.raw) + async with AiopgDatabaseClient(**self.config.get_default_database()) as client: + return query.as_string(client.connection.raw) @staticmethod def _flatten_parameters(parameters) -> dict: diff --git a/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_readers.py b/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_readers.py index 800e058d8..bba2f9999 100644 --- a/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_readers.py +++ b/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_readers.py @@ -21,6 +21,9 @@ TransactionEntry, TransactionStatus, ) +from minos.common import ( + DatabaseClientPool, +) from minos.common.testing import ( PostgresAsyncTestCase, ) @@ -94,8 +97,7 @@ def test_type(self): def test_from_config(self): reader = PostgreSqlSnapshotReader.from_config(self.config) - snapshot_config = self.config.get_database_by_name("snapshot") - self.assertEqual(snapshot_config["database"], reader.pool.database) + self.assertIsInstance(reader.pool, DatabaseClientPool) async def test_find_by_uuid(self): condition = Condition.IN("uuid", [self.uuid_2, self.uuid_3]) diff --git a/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_writers.py b/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_writers.py index 4cd0f2de0..e393903b5 100644 --- a/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_writers.py +++ b/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_snapshots/test_pg/test_writers.py @@ -26,6 +26,7 @@ TransactionStatus, ) from minos.common import ( + DatabaseClientPool, NotProvidedException, current_datetime, ) @@ -101,8 +102,7 @@ def test_type(self): self.assertTrue(issubclass(PostgreSqlSnapshotWriter, PostgreSqlSnapshotSetup)) def test_from_config(self): - snapshot_config = self.config.get_database_by_name("snapshot") - self.assertEqual(snapshot_config["database"], self.writer.pool.database) + self.assertIsInstance(self.writer.pool, DatabaseClientPool) def test_from_config_raises(self): with self.assertRaises(NotProvidedException): diff --git a/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_transactions/test_repositories/test_pg.py b/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_transactions/test_repositories/test_pg.py index e66c76ded..e0bd9225d 100644 --- a/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_transactions/test_repositories/test_pg.py +++ b/packages/core/minos-microservice-aggregate/tests/test_aggregate/test_transactions/test_repositories/test_pg.py @@ -3,8 +3,6 @@ uuid4, ) -import aiopg - from minos.aggregate import ( PostgreSqlTransactionRepository, TransactionEntry, @@ -13,6 +11,7 @@ TransactionStatus, ) from minos.common import ( + AiopgDatabaseClient, DatabaseClientPool, ) from minos.common.testing import ( @@ -23,6 +22,7 @@ ) +# noinspection SqlNoDataSourceInspection class TestPostgreSqlTransactionRepository(AggregateTestCase, PostgresAsyncTestCase): def setUp(self) -> None: super().setUp() @@ -50,16 +50,14 @@ def test_constructor(self): def test_from_config(self): repository = PostgreSqlTransactionRepository.from_config(self.config) - repository_config = self.config.get_database_by_name("event") - self.assertEqual(repository_config["database"], repository.pool.database) + self.assertIsInstance(repository.pool, DatabaseClientPool) async def test_setup(self): - async with aiopg.connect(**self.config.get_default_database()) as connection: - async with connection.cursor() as cursor: - await cursor.execute( - "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'aggregate_transaction');" - ) - response = (await cursor.fetchone())[0] + async with AiopgDatabaseClient(**self.config.get_default_database()) as client: + await client.execute( + "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'aggregate_transaction');" + ) + response = (await client.fetch_one())[0] self.assertTrue(response) async def test_submit(self): diff --git a/packages/core/minos-microservice-aggregate/tests/utils.py b/packages/core/minos-microservice-aggregate/tests/utils.py index 1c18b0b79..908121875 100644 --- a/packages/core/minos-microservice-aggregate/tests/utils.py +++ b/packages/core/minos-microservice-aggregate/tests/utils.py @@ -101,8 +101,11 @@ def __init__(self, key=None, *args, **kwargs): key = "fake" super().__init__(key, *args, **kwargs) - async def __aexit__(self, exc_type, exc_val, exc_tb): - return + async def acquire(self) -> None: + """For testing purposes.""" + + async def release(self): + """For testing purposes.""" class FakeLockPool(LockPool): diff --git a/packages/core/minos-microservice-common/minos/common/__init__.py b/packages/core/minos-microservice-common/minos/common/__init__.py index bc1e4fe63..a8f0b40ae 100644 --- a/packages/core/minos-microservice-common/minos/common/__init__.py +++ b/packages/core/minos-microservice-common/minos/common/__init__.py @@ -14,14 +14,20 @@ MinosConfig, ) from .database import ( + AiopgDatabaseClient, + DatabaseClient, + DatabaseClientBuilder, + DatabaseClientException, DatabaseClientPool, DatabaseLock, DatabaseLockPool, DatabaseMixin, + IntegrityException, PostgreSqlLock, PostgreSqlLockPool, PostgreSqlMinosDatabase, PostgreSqlPool, + UnableToConnectException, ) from .datetime import ( NULL_DATETIME, diff --git a/packages/core/minos-microservice-common/minos/common/config/v1.py b/packages/core/minos-microservice-common/minos/common/config/v1.py index fdac15c47..d61366029 100644 --- a/packages/core/minos-microservice-common/minos/common/config/v1.py +++ b/packages/core/minos-microservice-common/minos/common/config/v1.py @@ -282,6 +282,8 @@ def _get_database_by_name(self, prefix: str): data = self.get_by_key(prefix) data.pop("records", None) data.pop("retry", None) + if "client" in data: + data["client"] = import_module(data["client"]) return data def _get_discovery(self) -> dict[str, Any]: diff --git a/packages/core/minos-microservice-common/minos/common/config/v2.py b/packages/core/minos-microservice-common/minos/common/config/v2.py index 6a3fa4238..ff6c04a76 100644 --- a/packages/core/minos-microservice-common/minos/common/config/v2.py +++ b/packages/core/minos-microservice-common/minos/common/config/v2.py @@ -99,6 +99,9 @@ def _get_injections(self) -> list[Union[InjectableMixin, type[InjectableMixin]]] def _get_databases(self) -> dict[str, dict[str, Any]]: data = deepcopy(self.get_by_key("databases")) + for database in data.values(): + if "client" in database: + database["client"] = import_module(database["client"]) return data def _get_interfaces(self) -> dict[str, dict[str, Any]]: diff --git a/packages/core/minos-microservice-common/minos/common/database/__init__.py b/packages/core/minos-microservice-common/minos/common/database/__init__.py index f485369b7..da4b4d0ff 100644 --- a/packages/core/minos-microservice-common/minos/common/database/__init__.py +++ b/packages/core/minos-microservice-common/minos/common/database/__init__.py @@ -2,6 +2,14 @@ DatabaseMixin, PostgreSqlMinosDatabase, ) +from .clients import ( + AiopgDatabaseClient, + DatabaseClient, + DatabaseClientBuilder, + DatabaseClientException, + IntegrityException, + UnableToConnectException, +) from .locks import ( DatabaseLock, PostgreSqlLock, diff --git a/packages/core/minos-microservice-common/minos/common/database/abc.py b/packages/core/minos-microservice-common/minos/common/database/abc.py index 3db192334..9c06a070b 100644 --- a/packages/core/minos-microservice-common/minos/common/database/abc.py +++ b/packages/core/minos-microservice-common/minos/common/database/abc.py @@ -1,21 +1,9 @@ import warnings -from collections.abc import ( - Hashable, -) from typing import ( - Any, - AsyncContextManager, AsyncIterator, Optional, ) -from aiomisc.pool import ( - ContextManager, -) -from aiopg import ( - Cursor, -) - from ..exceptions import ( NotProvidedException, ) @@ -28,9 +16,6 @@ from ..setup import ( SetupMixin, ) -from .locks import ( - DatabaseLock, -) from .pools import ( DatabaseClientPool, PostgreSqlPool, @@ -45,13 +30,14 @@ def __init__( self, database_pool: Optional[DatabaseClientPool] = None, pool_factory: Optional[PoolFactory] = None, + database_key: Optional[str] = None, postgresql_pool: Optional[PostgreSqlPool] = None, *args, **kwargs, ): super().__init__(*args, **kwargs, pool_factory=pool_factory) if database_pool is None and pool_factory is not None: - database_pool = pool_factory.get_pool("database") + database_pool = pool_factory.get_pool(type_="database", identifier=database_key) if database_pool is None and postgresql_pool is not None: warnings.warn("'postgresql_pool' argument has been deprecated", DeprecationWarning) @@ -69,111 +55,33 @@ async def submit_query_and_fetchone(self, *args, **kwargs) -> tuple: :param kwargs: Additional named arguments. :return: This method does not return anything. """ - return await self.submit_query_and_iter(*args, **kwargs).__anext__() + async with self.pool.acquire() as client: + await client.execute(*args, **kwargs) + return await client.fetch_one(*args, **kwargs) # noinspection PyUnusedLocal - async def submit_query_and_iter( - self, - operation: Any, - parameters: Any = None, - *, - timeout: Optional[float] = None, - lock: Optional[int] = None, - streaming_mode: bool = False, - **kwargs, - ) -> AsyncIterator[tuple]: + async def submit_query_and_iter(self, *args, **kwargs) -> AsyncIterator[tuple]: """Submit a SQL query and return an asynchronous iterator. - :param operation: Query to be executed. - :param parameters: Parameters to be projected into the query. - :param timeout: An optional timeout. - :param lock: Optional key to perform the query with locking. If not set, the query is performed without any - lock. - :param streaming_mode: If ``True`` the data fetching is performed in streaming mode, that is iterating over the - cursor and yielding once a time (requires an opening connection to do that). Otherwise, all the data is - fetched and keep in memory before yielding it. + :param args: Additional positional arguments. :param kwargs: Additional named arguments. :return: This method does not return anything. """ - if lock is None: - context_manager = self.cursor() - else: - context_manager = self.locked_cursor(lock) - - async with context_manager as cursor: - await cursor.execute(operation=operation, parameters=parameters, timeout=timeout) - - if streaming_mode: - async for row in cursor: - yield row - return - - rows = await cursor.fetchall() - - for row in rows: - yield row + async with self.pool.acquire() as client: + await client.execute(*args, **kwargs) + async for value in client.fetch_all(*args, **kwargs): + yield value # noinspection PyUnusedLocal - async def submit_query( - self, operation: Any, parameters: Any = None, *, timeout: Optional[float] = None, lock: Any = None, **kwargs - ) -> None: + async def submit_query(self, *args, **kwargs) -> None: """Submit a SQL query. - :param operation: Query to be executed. - :param parameters: Parameters to be projected into the query. - :param timeout: An optional timeout. - :param lock: Optional key to perform the query with locking. If not set, the query is performed without any - lock. - :param kwargs: Additional named arguments. - :return: This method does not return anything. - """ - if lock is None: - context_manager = self.cursor() - else: - context_manager = self.locked_cursor(lock) - - async with context_manager as cursor: - await cursor.execute(operation=operation, parameters=parameters, timeout=timeout) - - def locked_cursor(self, key: Hashable, *args, **kwargs) -> AsyncContextManager[Cursor]: - """Get a new locked cursor. - - :param key: The key to be used for locking. :param args: Additional positional arguments. :param kwargs: Additional named arguments. - :return: A Cursor wrapped into an asynchronous context manager. - """ - lock = DatabaseLock(self.pool.acquire(), key, *args, **kwargs) - - async def _fn_enter(): - await lock.__aenter__() - return lock.cursor - - async def _fn_exit(_): - await lock.__aexit__(None, None, None) - - return ContextManager(_fn_enter, _fn_exit) - - def cursor(self, *args, **kwargs) -> AsyncContextManager[Cursor]: - """Get a new cursor. - - :param args: Additional positional arguments. - :param kwargs: Additional named arguments. - :return: A Cursor wrapped into an asynchronous context manager. + :return: This method does not return anything. """ - acquired = self.pool.acquire() - - async def _fn_enter(): - connection = await acquired.__aenter__() - cursor = await connection.cursor(*args, **kwargs).__aenter__() - return cursor - - async def _fn_exit(cursor: Cursor): - if not cursor.closed: - cursor.close() - await acquired.__aexit__(None, None, None) - - return ContextManager(_fn_enter, _fn_exit) + async with self.pool.acquire() as client: + return await client.execute(*args, **kwargs) @property def pool(self) -> DatabaseClientPool: diff --git a/packages/core/minos-microservice-common/minos/common/database/clients/__init__.py b/packages/core/minos-microservice-common/minos/common/database/clients/__init__.py new file mode 100644 index 000000000..a2884b3c4 --- /dev/null +++ b/packages/core/minos-microservice-common/minos/common/database/clients/__init__.py @@ -0,0 +1,12 @@ +from .abc import ( + DatabaseClient, + DatabaseClientBuilder, +) +from .aiopg import ( + AiopgDatabaseClient, +) +from .exceptions import ( + DatabaseClientException, + IntegrityException, + UnableToConnectException, +) diff --git a/packages/core/minos-microservice-common/minos/common/database/clients/abc.py b/packages/core/minos-microservice-common/minos/common/database/clients/abc.py new file mode 100644 index 000000000..afcab26fc --- /dev/null +++ b/packages/core/minos-microservice-common/minos/common/database/clients/abc.py @@ -0,0 +1,114 @@ +from __future__ import ( + annotations, +) + +from abc import ( + ABC, + abstractmethod, +) +from collections.abc import ( + AsyncIterator, +) +from typing import ( + Any, + Optional, +) + +from ...builders import ( + BuildableMixin, + Builder, +) +from ...config import ( + Config, +) + + +class DatabaseClient(ABC, BuildableMixin): + """Database Client base class.""" + + @classmethod + def _from_config(cls, config: Config, name: Optional[str] = None, **kwargs) -> DatabaseClient: + return super()._from_config(config, **config.get_database_by_name(name), **kwargs) + + async def is_valid(self, **kwargs) -> bool: + """Check if the instance is valid. + + :return: ``True`` if it is valid or ``False`` otherwise. + """ + return await self._is_valid(**kwargs) + + @abstractmethod + async def _is_valid(self, **kwargs) -> bool: + raise NotImplementedError + + async def reset(self, **kwargs) -> None: + """Reset the current instance status. + + :param kwargs: Additional named parameters. + :return: This method does not return anything. + """ + return await self._reset(**kwargs) + + @abstractmethod + async def _reset(self, **kwargs) -> None: + raise NotImplementedError + + async def execute(self, *args, **kwargs) -> None: + """Execute an operation. + + :param args: Additional positional arguments. + :param kwargs: Additional named arguments. + :return: This method does not return anything. + """ + await self._execute(*args, **kwargs) + + @abstractmethod + async def _execute(self, *args, **kwargs) -> None: + raise NotImplementedError + + async def fetch_one(self, *args, **kwargs) -> Any: + """Fetch one value. + + :param args: Additional positional arguments. + :param kwargs: Additional named arguments. + :return: This method does not return anything. + """ + return await self.fetch_all(*args, **kwargs).__anext__() + + def fetch_all(self, *args, **kwargs) -> AsyncIterator[Any]: + """Fetch all values with an asynchronous iterator. + + :param kwargs: Additional named arguments. + :return: This method does not return anything. + """ + return self._fetch_all(*args, **kwargs) + + @abstractmethod + def _fetch_all(self, *args, **kwargs) -> AsyncIterator[Any]: + raise NotImplementedError + + +class DatabaseClientBuilder(Builder[DatabaseClient]): + """Database Client Builder class.""" + + def with_name(self, name: str) -> DatabaseClientBuilder: + """Set name. + + :param name: The name to be added. + :return: This method return the builder instance. + """ + self.kwargs["name"] = name + return self + + def with_config(self, config: Config) -> DatabaseClientBuilder: + """Set config. + + :param config: The config to be set. + :return: This method return the builder instance. + """ + database_config = config.get_database_by_name(self.kwargs.get("name")) + self.kwargs |= database_config + return self + + +DatabaseClient.set_builder(DatabaseClientBuilder) diff --git a/packages/core/minos-microservice-common/minos/common/database/clients/aiopg.py b/packages/core/minos-microservice-common/minos/common/database/clients/aiopg.py new file mode 100644 index 000000000..7a3e4a470 --- /dev/null +++ b/packages/core/minos-microservice-common/minos/common/database/clients/aiopg.py @@ -0,0 +1,240 @@ +from __future__ import ( + annotations, +) + +import logging +from collections.abc import ( + AsyncIterator, + Hashable, +) +from typing import ( + TYPE_CHECKING, + Any, + Optional, +) + +import aiopg +from aiopg import ( + Connection, + Cursor, +) +from psycopg2 import ( + IntegrityError, + OperationalError, +) + +from .abc import ( + DatabaseClient, +) +from .exceptions import ( + IntegrityException, + UnableToConnectException, +) + +if TYPE_CHECKING: + from ..locks import ( + DatabaseLock, + ) + +logger = logging.getLogger(__name__) + + +class AiopgDatabaseClient(DatabaseClient): + """Aiopg Database Client class.""" + + _connection: Optional[Connection] + _cursor: Optional[Cursor] + _lock: Optional[DatabaseLock] + + def __init__( + self, + database: str, + host: Optional[str] = None, + port: Optional[int] = None, + user: Optional[str] = None, + password: Optional[str] = None, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + + if host is None: + host = "localhost" + if port is None: + port = 5432 + if user is None: + user = "postgres" + if password is None: + password = "" + + self._database = database + self._host = host + self._port = port + self._user = user + self._password = password + + self._connection = None + + self._lock = None + self._cursor = None + + async def _setup(self) -> None: + await super()._setup() + await self._create_connection() + + async def _destroy(self) -> None: + await self.reset() + await self._close_connection() + await super()._destroy() + + async def _create_connection(self): + try: + self._connection = await aiopg.connect( + host=self.host, port=self.port, dbname=self.database, user=self.user, password=self.password + ) + except OperationalError as exc: + msg = f"There was an {exc!r} while trying to get a database connection." + logger.warning(msg) + raise UnableToConnectException(msg) + + logger.debug(f"Created {self.database!r} database connection identified by {id(self._connection)}!") + + async def _close_connection(self): + if self._connection is not None and not self._connection.closed: + await self._connection.close() + self._connection = None + logger.debug(f"Destroyed {self.database!r} database connection identified by {id(self._connection)}!") + + async def _is_valid(self) -> bool: + if self._connection is None: + return False + + try: + # This operation connects to the database and raises an exception if something goes wrong. + self._connection.isolation_level + except OperationalError: + return False + + return not self._connection.closed + + async def _reset(self, **kwargs) -> None: + await self._destroy_cursor(**kwargs) + + # noinspection PyUnusedLocal + async def _fetch_all( + self, + *args, + timeout: Optional[float] = None, + lock: Optional[int] = None, + **kwargs, + ) -> AsyncIterator[tuple]: + await self._create_cursor() + + async for row in self._cursor: + yield row + + # noinspection PyUnusedLocal + async def _execute( + self, operation: Any, parameters: Any = None, *, timeout: Optional[float] = None, lock: Any = None, **kwargs + ) -> None: + await self._create_cursor(lock=lock) + try: + await self._cursor.execute(operation=operation, parameters=parameters, timeout=timeout) + except IntegrityError as exc: + raise IntegrityException(f"The requested operation raised a integrity error: {exc!r}") + + async def _create_cursor(self, *args, lock: Optional[Hashable] = None, **kwargs): + if self._cursor is None: + self._cursor = await self._connection.cursor(*args, **kwargs) + + if lock is not None: + await self._create_lock(lock) + + async def _destroy_cursor(self, **kwargs): + await self._destroy_lock() + if self._cursor is not None: + if not self._cursor.closed: + self._cursor.close() + self._cursor = None + + async def _create_lock(self, lock: Hashable, *args, **kwargs): + if self._lock is not None and self._lock.key == lock: + return + await self._destroy_lock() + + from ..locks import ( + DatabaseLock, + ) + + self._lock = DatabaseLock(self, lock, *args, **kwargs) + await self._lock.acquire() + + async def _destroy_lock(self): + if self._lock is not None: + logger.debug(f"Destroying {self.lock!r}...") + await self._lock.release() + self._lock = None + + @property + def lock(self) -> Optional[DatabaseLock]: + """Get the lock. + + :return: A ``DatabaseLock`` instance. + """ + return self._lock + + @property + def cursor(self) -> Optional[Cursor]: + """Get the cursor. + + :return: A ``Cursor`` instance. + """ + return self._cursor + + @property + def connection(self) -> Optional[Connection]: + """Get the connection. + + :return: A ``Connection`` instance. + """ + return self._connection + + @property + def database(self) -> str: + """Get the database's database. + + :return: A ``str`` value. + """ + return self._database + + @property + def host(self) -> str: + """Get the database's host. + + :return: A ``str`` value. + """ + return self._host + + @property + def port(self) -> int: + """Get the database's port. + + :return: An ``int`` value. + """ + return self._port + + @property + def user(self) -> str: + """Get the database's user. + + :return: A ``str`` value. + """ + return self._user + + @property + def password(self) -> str: + """Get the database's password. + + :return: A ``str`` value. + """ + return self._password diff --git a/packages/core/minos-microservice-common/minos/common/database/clients/exceptions.py b/packages/core/minos-microservice-common/minos/common/database/clients/exceptions.py new file mode 100644 index 000000000..d638a738c --- /dev/null +++ b/packages/core/minos-microservice-common/minos/common/database/clients/exceptions.py @@ -0,0 +1,15 @@ +from ...exceptions import ( + MinosException, +) + + +class DatabaseClientException(MinosException): + """Base exception for database client.""" + + +class UnableToConnectException(DatabaseClientException): + """Exception to be raised when database client is not able to connect to the database.""" + + +class IntegrityException(DatabaseClientException): + """Exception to be raised when an integrity check is not satisfied.""" diff --git a/packages/core/minos-microservice-common/minos/common/database/locks.py b/packages/core/minos-microservice-common/minos/common/database/locks.py index 842798c5b..1a0a8ae18 100644 --- a/packages/core/minos-microservice-common/minos/common/database/locks.py +++ b/packages/core/minos-microservice-common/minos/common/database/locks.py @@ -2,49 +2,36 @@ from collections.abc import ( Hashable, ) -from typing import ( - AsyncContextManager, - Optional, -) - -from aiopg import ( - Connection, - Cursor, -) from ..locks import ( Lock, ) +from .clients import ( + DatabaseClient, +) class DatabaseLock(Lock): """Database Lock class.""" - cursor: Optional[Cursor] - - def __init__(self, wrapped_connection: AsyncContextManager[Connection], key: Hashable, *args, **kwargs): + def __init__(self, client: DatabaseClient, key: Hashable, *args, **kwargs): super().__init__(key, *args, **kwargs) - self.wrapped_connection = wrapped_connection - self.cursor = None + self.client = client - self._args = args - self._kwargs = kwargs + async def acquire(self) -> None: + """Acquire the lock. - async def __aenter__(self): - connection = await self.wrapped_connection.__aenter__() - cursor = await connection.cursor(*self._args, **self._kwargs).__aenter__() + :return: This method does not return anything. + """ + await self.client.execute("select pg_advisory_lock(%(hashed_key)s)", {"hashed_key": self.hashed_key}) - self.cursor = cursor - await self.cursor.execute("select pg_advisory_lock(%(hashed_key)s)", {"hashed_key": self.hashed_key}) - return self + async def release(self) -> None: + """Release the lock. - async def __aexit__(self, exc_type, exc_val, exc_tb): - await self.cursor.execute("select pg_advisory_unlock(%(hashed_key)s)", {"hashed_key": self.hashed_key}) - if not self.cursor.closed: - self.cursor.close() - self.cursor = None - await self.wrapped_connection.__aexit__(exc_type, exc_val, exc_tb) + :return: This method does not return anything. + """ + await self.client.execute("select pg_advisory_unlock(%(hashed_key)s)", {"hashed_key": self.hashed_key}) class PostgreSqlLock(DatabaseLock): diff --git a/packages/core/minos-microservice-common/minos/common/database/pools.py b/packages/core/minos-microservice-common/minos/common/database/pools.py index 9a6d4d41e..926fca84c 100644 --- a/packages/core/minos-microservice-common/minos/common/database/pools.py +++ b/packages/core/minos-microservice-common/minos/common/database/pools.py @@ -10,17 +10,13 @@ Optional, ) -import aiopg from aiomisc.pool import ( ContextManager, ) -from aiopg import ( - Connection, -) -from psycopg2 import ( - OperationalError, -) +from ..config import ( + Config, +) from ..injections import ( Injectable, ) @@ -30,6 +26,12 @@ from ..pools import ( Pool, ) +from .clients import ( + AiopgDatabaseClient, + DatabaseClient, + DatabaseClientBuilder, + UnableToConnectException, +) from .locks import ( DatabaseLock, ) @@ -37,69 +39,56 @@ logger = logging.getLogger(__name__) -class DatabaseClientPool(Pool[ContextManager]): +class DatabaseClientPool(Pool[DatabaseClient]): """Database Client Pool class.""" - def __init__( - self, - database: str, - host: Optional[str] = None, - port: Optional[int] = None, - user: Optional[str] = None, - password: Optional[str] = None, - *args, - **kwargs, - ): + def __init__(self, client_builder: DatabaseClientBuilder, *args, **kwargs): super().__init__(*args, **kwargs) - if host is None: - host = "localhost" - if port is None: - port = 5432 - if user is None: - user = "postgres" - if password is None: - password = "" - - self.database = database - self.host = host - self.port = port - self.user = user - self.password = password + self._client_builder = client_builder @classmethod - def _from_config(cls, *args, config, **kwargs): - return cls(*args, **config.get_default_database(), **kwargs) + def _from_config(cls, config: Config, identifier: Optional[str] = None, **kwargs): + client_cls = config.get_database_by_name(identifier).get("client", AiopgDatabaseClient) + # noinspection PyTypeChecker + base_builder: DatabaseClientBuilder = client_cls.get_builder() + client_builder = base_builder.with_name(identifier).with_config(config) + + return cls(client_builder=client_builder, **kwargs) + + async def _create_instance(self) -> Optional[DatabaseClient]: + instance = self._client_builder.build() - async def _create_instance(self) -> Optional[Connection]: try: - connection = await aiopg.connect( - host=self.host, port=self.port, dbname=self.database, user=self.user, password=self.password - ) - except OperationalError as exc: - logger.warning(f"There was an {exc!r} while trying to get a database connection.") - await sleep(1) + await instance.setup() + except UnableToConnectException: + await sleep(0.1) return None - logger.info(f"Created {self.database!r} database connection identified by {id(connection)}!") - return connection + logger.info(f"Created {instance!r}!") + return instance - async def _destroy_instance(self, instance: Connection): - if not instance.closed: - await instance.close() - logger.info(f"Destroyed {self.database!r} database connection identified by {id(instance)}!") + async def _destroy_instance(self, instance: DatabaseClient): + if instance is None: + return + logger.info(f"Destroyed {instance!r}!") + await instance.destroy() - async def _check_instance(self, instance: Optional[Connection]) -> bool: + async def _check_instance(self, instance: Optional[DatabaseClient]) -> bool: if instance is None: return False + return await instance.is_valid() - try: - # This operation connects to the database and raises an exception if something goes wrong. - instance.isolation_level - except OperationalError: - return False + async def _release_instance(self, instance: DatabaseClient) -> None: + await instance.reset() - return not instance.closed + @property + def client_builder(self) -> DatabaseClientBuilder: + """Get the client builder class. + + :return: A ``DatabaseClientBuilder`` instance. + """ + return self._client_builder @Injectable("postgresql_pool") @@ -111,7 +100,7 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) -class DatabaseLockPool(LockPool, DatabaseClientPool): +class DatabaseLockPool(DatabaseClientPool, LockPool): """Database Lock Pool class.""" def acquire(self, key: Hashable, *args, **kwargs) -> DatabaseLock: @@ -120,7 +109,18 @@ def acquire(self, key: Hashable, *args, **kwargs) -> DatabaseLock: :param key: The key to be used for locking. :return: A ``PostgreSqlLock`` instance. """ - return DatabaseLock(super().acquire(), key, *args, **kwargs) + acquired = super().acquire() + + async def _fn_enter(): + client = await acquired.__aenter__() + return await DatabaseLock(client, key, *args, **kwargs).__aenter__() + + async def _fn_exit(lock: DatabaseLock): + await lock.__aexit__(None, None, None) + await acquired.__aexit__(None, None, None) + + # noinspection PyTypeChecker + return ContextManager(_fn_enter, _fn_exit) class PostgreSqlLockPool(DatabaseLockPool): diff --git a/packages/core/minos-microservice-common/minos/common/locks.py b/packages/core/minos-microservice-common/minos/common/locks.py index d52a602dd..1366370e2 100644 --- a/packages/core/minos-microservice-common/minos/common/locks.py +++ b/packages/core/minos-microservice-common/minos/common/locks.py @@ -4,13 +4,11 @@ from abc import ( ABC, + abstractmethod, ) from collections.abc import ( Hashable, ) -from contextlib import ( - AbstractAsyncContextManager, -) from cached_property import ( cached_property, @@ -24,7 +22,7 @@ ) -class Lock(AbstractAsyncContextManager): +class Lock(ABC): """Lock base class.""" key: Hashable @@ -35,6 +33,27 @@ def __init__(self, key: Hashable, *args, **kwargs): self.key = key + async def __aenter__(self) -> Lock: + await self.acquire() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.release() + + @abstractmethod + async def acquire(self) -> None: + """Acquire the lock. + + :return: This method does not return anything. + """ + + @abstractmethod + async def release(self): + """Release the lock. + + :return: This method does not return anything. + """ + @cached_property def hashed_key(self) -> int: """Get the hashed key. diff --git a/packages/core/minos-microservice-common/minos/common/pools.py b/packages/core/minos-microservice-common/minos/common/pools.py index d95ddca67..c78ee6eed 100644 --- a/packages/core/minos-microservice-common/minos/common/pools.py +++ b/packages/core/minos-microservice-common/minos/common/pools.py @@ -13,6 +13,7 @@ ) from typing import ( Any, + AsyncContextManager, Generic, Optional, TypeVar, @@ -44,7 +45,7 @@ class PoolFactory(SetupMixin): """Pool Factory class.""" - _pools: dict[str, Pool] + _pools: dict[tuple[str, ...], Pool] def __init__(self, config: Config, default_classes: dict[str, type[Pool]] = None, *args, **kwargs): super().__init__(*args, **kwargs) @@ -69,18 +70,18 @@ async def _destroy_pools(self): await gather(*futures) logger.debug("Destroyed pools!") - def get_pool(self, type_: str, key: Optional[str] = None, **kwargs) -> Pool: + def get_pool(self, type_: str, identifier: Optional[str] = None, **kwargs) -> Pool: """Get a pool from the factory. :param type_: The type of the pool. - :param key: An optional key that identifies the pool. + :param identifier: An optional key that identifies the pool. :param kwargs: Additional named arguments. :return: A ``Pool`` instance. """ - if key is None: - key = type_ + key = (type_, identifier) if key not in self._pools: - self._pools[key] = self._create_pool(type_, key=key, **kwargs) + logger.debug(f"Creating the {key!r} pool...") + self._pools[key] = self._create_pool(type_, identifier=identifier, **kwargs) return self._pools[key] def _create_pool(self, type_: str, **kwargs) -> Pool: @@ -107,7 +108,8 @@ class Pool(SetupMixin, PoolBase, Generic[P], ABC): """Base class for Pool implementations in minos""" def __init__(self, *args, maxsize: int = 10, recycle: Optional[int] = 300, already_setup: bool = True, **kwargs): - super().__init__(*args, maxsize=maxsize, recycle=recycle, already_setup=already_setup, **kwargs) + SetupMixin.__init__(self, *args, already_setup=already_setup, **kwargs) + PoolBase.__init__(self, maxsize=maxsize, recycle=recycle) # noinspection PyUnresolvedReferences async def __acquire(self) -> Any: # pragma: no cover @@ -133,10 +135,11 @@ async def __acquire(self) -> Any: # pragma: no cover # noinspection PyUnresolvedReferences async def __release(self, instance: Any) -> Any: # pragma: no cover + await self._release_instance(instance) await self._PoolBase__release(instance) logger.debug(f"Released instance: {instance!r}") - def acquire(self, *args, **kwargs) -> P: + def acquire(self, *args, **kwargs) -> AsyncContextManager[P]: """Acquire a new instance wrapped on an asynchronous context manager. :param args: Additional positional arguments. @@ -158,6 +161,9 @@ async def _destroy(self) -> None: async def _check_instance(self, instance: P) -> bool: return True + async def _release_instance(self, instance: P) -> None: + pass + class MinosPool(Pool, Generic[P], ABC): """MinosPool class.""" diff --git a/packages/core/minos-microservice-common/minos/common/setup.py b/packages/core/minos-microservice-common/minos/common/setup.py index 0832a3f59..87ae933f0 100644 --- a/packages/core/minos-microservice-common/minos/common/setup.py +++ b/packages/core/minos-microservice-common/minos/common/setup.py @@ -94,7 +94,7 @@ async def setup(self) -> None: :return: This method does not return anything. """ if not self._already_setup: - logger.info(f"Setting up a {type(self).__name__!r} instance...") + logger.debug(f"Setting up a {type(self).__name__!r} instance...") await self._setup() self._already_setup = True @@ -110,7 +110,7 @@ async def destroy(self) -> None: :return: This method does not return anything. """ if self._already_setup: - logger.info(f"Destroying a {type(self).__name__!r} instance...") + logger.debug(f"Destroying a {type(self).__name__!r} instance...") await self._destroy() self._already_setup = False diff --git a/packages/core/minos-microservice-common/minos/common/testing.py b/packages/core/minos-microservice-common/minos/common/testing.py index 77512146e..6bbdba801 100644 --- a/packages/core/minos-microservice-common/minos/common/testing.py +++ b/packages/core/minos-microservice-common/minos/common/testing.py @@ -1,5 +1,4 @@ import unittest -import warnings from abc import ( ABC, ) @@ -17,12 +16,11 @@ uuid4, ) -import aiopg - from .config import ( Config, ) from .database import ( + AiopgDatabaseClient, DatabaseClientPool, ) from .injections import ( @@ -77,24 +75,9 @@ class PostgresAsyncTestCase(MinosTestCase, ABC): def setUp(self): self.base_config = Config(self.get_config_file_path()) self._uuid = uuid4() - self._test_db = {"database": f"test_db_{self._uuid.hex}", "user": f"test_user_{self._uuid.hex}"} + self._test_db = {"database": f"test_db_{self._uuid.hex}"} super().setUp() - @property - def repository_db(self) -> dict[str, Any]: - warnings.warn("'repository_db' attribute has been deprecated.", DeprecationWarning) - return self.config.get_database_by_name("aggregate") | self._test_db - - @property - def broker_queue_db(self) -> dict[str, Any]: - warnings.warn("'broker_queue_db' attribute has been deprecated.", DeprecationWarning) - return self.config.get_database_by_name("broker") | self._test_db - - @property - def snapshot_db(self) -> dict[str, Any]: - warnings.warn("'snapshot_db' attribute has been deprecated.", DeprecationWarning) - return self.config.get_database_by_name("aggregate") | self._test_db - def get_config(self) -> Config: config = Config(self.get_config_file_path()) @@ -120,20 +103,12 @@ async def asyncTearDown(self): async def _create_database(self, meta: dict[str, Any], test: dict[str, Any]) -> None: await self._drop_database(meta, test) - async with aiopg.connect(**meta) as connection: - async with connection.cursor() as cursor: - template = "CREATE ROLE {user} WITH SUPERUSER CREATEDB LOGIN ENCRYPTED PASSWORD {password!r};" - await cursor.execute(template.format(**(meta | test))) - - template = "CREATE DATABASE {database} WITH OWNER = {user};" - await cursor.execute(template.format(**(meta | test))) + async with AiopgDatabaseClient(**meta) as client: + template = "CREATE DATABASE {database} WITH OWNER = {user};" + await client.execute(template.format(**(meta | test))) @staticmethod async def _drop_database(meta: dict[str, Any], test: dict[str, Any]) -> None: - async with aiopg.connect(**meta) as connection: - async with connection.cursor() as cursor: - template = "DROP DATABASE IF EXISTS {database}" - await cursor.execute(template.format(**(meta | test))) - - template = "DROP ROLE IF EXISTS {user};" - await cursor.execute(template.format(**(meta | test))) + async with AiopgDatabaseClient(**meta) as client: + template = "DROP DATABASE IF EXISTS {database}" + await client.execute(template.format(**(meta | test))) diff --git a/packages/core/minos-microservice-common/tests/config/v1.yml b/packages/core/minos-microservice-common/tests/config/v1.yml index cf45e9211..0b124e10c 100644 --- a/packages/core/minos-microservice-common/tests/config/v1.yml +++ b/packages/core/minos-microservice-common/tests/config/v1.yml @@ -19,6 +19,7 @@ service: - tests.utils.FakeSagaManager - tests.utils.FakeCustomInjection repository: + client: minos.common.AiopgDatabaseClient database: order_db user: minos password: min0s diff --git a/packages/core/minos-microservice-common/tests/config/v2.yml b/packages/core/minos-microservice-common/tests/config/v2.yml index 8c7771bd0..af4e8dfc9 100644 --- a/packages/core/minos-microservice-common/tests/config/v2.yml +++ b/packages/core/minos-microservice-common/tests/config/v2.yml @@ -6,6 +6,7 @@ serializer: client: tests.utils.FakeSerializer databases: default: + client: minos.common.AiopgDatabaseClient database: order_db user: minos password: min0s diff --git a/packages/core/minos-microservice-common/tests/test_common/test_database/test_abc.py b/packages/core/minos-microservice-common/tests/test_common/test_database/test_abc.py index 13fa4af99..5570d154a 100644 --- a/packages/core/minos-microservice-common/tests/test_common/test_database/test_abc.py +++ b/packages/core/minos-microservice-common/tests/test_common/test_database/test_abc.py @@ -1,9 +1,8 @@ import unittest import warnings -import aiopg - from minos.common import ( + AiopgDatabaseClient, DatabaseClientPool, DatabaseMixin, NotProvidedException, @@ -19,7 +18,7 @@ ) -# noinspection SqlNoDataSourceInspection +# noinspection SqlNoDataSourceInspection,SqlResolve class TestDatabaseMixin(CommonTestCase, PostgresAsyncTestCase): def test_constructor(self): pool = DatabaseClientPool.from_config(self.config) @@ -55,19 +54,17 @@ async def test_submit_query(self): async with DatabaseMixin() as database: await database.submit_query("CREATE TABLE foo (id INT NOT NULL);") - async with aiopg.connect(**self.config.get_default_database()) as connection: - async with connection.cursor() as cursor: - await cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'foo');") - self.assertTrue((await cursor.fetchone())[0]) + async with AiopgDatabaseClient(**self.config.get_default_database()) as client: + await client.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'foo');") + self.assertTrue((await client.fetch_one())[0]) async def test_submit_query_locked(self): async with DatabaseMixin() as database: await database.submit_query("CREATE TABLE foo (id INT NOT NULL);", lock=1234) - async with aiopg.connect(**self.config.get_default_database()) as connection: - async with connection.cursor() as cursor: - await cursor.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'foo');") - self.assertTrue((await cursor.fetchone())[0]) + async with AiopgDatabaseClient(**self.config.get_default_database()) as client: + await client.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'foo');") + self.assertTrue((await client.fetch_one())[0]) async def test_submit_query_and_fetchone(self): async with DatabaseMixin() as database: diff --git a/packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/__init__.py b/packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/test_abc.py b/packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/test_abc.py new file mode 100644 index 000000000..69f948785 --- /dev/null +++ b/packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/test_abc.py @@ -0,0 +1,118 @@ +import unittest +from abc import ( + ABC, +) +from typing import ( + Any, + AsyncIterator, +) +from unittest.mock import ( + AsyncMock, + MagicMock, + call, +) + +from minos.common import ( + AiopgDatabaseClient, + BuildableMixin, + DatabaseClient, + DatabaseClientBuilder, +) +from tests.utils import ( + CommonTestCase, + FakeAsyncIterator, +) + + +class _DatabaseClient(DatabaseClient): + """For testing purposes.""" + + async def _is_valid(self, **kwargs) -> bool: + """For testing purposes.""" + + async def _reset(self, **kwargs) -> None: + """For testing purposes.""" + + async def _execute(self, *args, **kwargs) -> None: + """For testing purposes.""" + + def _fetch_all(self, *args, **kwargs) -> AsyncIterator[Any]: + """For testing purposes.""" + + +class TestDatabaseClient(unittest.IsolatedAsyncioTestCase): + def test_abstract(self): + self.assertTrue(issubclass(DatabaseClient, (ABC, BuildableMixin))) + expected = {"_is_valid", "_execute", "_fetch_all", "_reset"} + # noinspection PyUnresolvedReferences + self.assertEqual(expected, DatabaseClient.__abstractmethods__) + + def test_get_builder(self): + self.assertIsInstance(DatabaseClient.get_builder(), DatabaseClientBuilder) + + async def test_is_valid(self): + mock = AsyncMock(side_effect=[True, False]) + client = _DatabaseClient() + client._is_valid = mock + + self.assertEqual(True, await client.is_valid()) + self.assertEqual(False, await client.is_valid()) + + self.assertEqual([call(), call()], mock.call_args_list) + + async def test_reset(self): + mock = AsyncMock() + client = _DatabaseClient() + client._reset = mock + + await client.reset() + + self.assertEqual([call()], mock.call_args_list) + + async def test_execute(self): + mock = AsyncMock() + client = _DatabaseClient() + client._execute = mock + + await client.execute("foo") + + self.assertEqual([call("foo")], mock.call_args_list) + + async def test_fetch_all(self): + mock = MagicMock(return_value=FakeAsyncIterator(["one", "two"])) + client = _DatabaseClient() + client._fetch_all = mock + + self.assertEqual(["one", "two"], [v async for v in client.fetch_all()]) + + self.assertEqual([call()], mock.call_args_list) + + async def test_fetch_one(self): + mock = MagicMock(return_value=FakeAsyncIterator(["one", "two"])) + client = _DatabaseClient() + client._fetch_all = mock + + self.assertEqual("one", await client.fetch_one()) + + self.assertEqual([call()], mock.call_args_list) + + +class TestDatabaseClientBuilder(CommonTestCase): + def test_with_name(self): + builder = DatabaseClientBuilder(AiopgDatabaseClient).with_name("query") + self.assertEqual({"name": "query"}, builder.kwargs) + + def test_with_config(self): + builder = DatabaseClientBuilder(AiopgDatabaseClient).with_name("query").with_config(self.config) + self.assertEqual({"name": "query"} | self.config.get_database_by_name("query"), builder.kwargs) + + def test_build(self): + builder = DatabaseClientBuilder(AiopgDatabaseClient).with_name("query").with_config(self.config) + client = builder.build() + + self.assertIsInstance(client, AiopgDatabaseClient) + self.assertEqual(self.config.get_database_by_name("query")["database"], client.database) + + +if __name__ == "__main__": + unittest.main() diff --git a/packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/test_aiopg.py b/packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/test_aiopg.py new file mode 100644 index 000000000..1dc7f261e --- /dev/null +++ b/packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/test_aiopg.py @@ -0,0 +1,180 @@ +import unittest +from unittest.mock import ( + PropertyMock, + call, + patch, +) + +import aiopg +from aiopg import ( + Connection, + Cursor, +) +from psycopg2 import ( + IntegrityError, + OperationalError, +) + +from minos.common import ( + AiopgDatabaseClient, + DatabaseLock, + IntegrityException, + UnableToConnectException, +) +from minos.common.testing import ( + PostgresAsyncTestCase, +) +from tests.utils import ( + CommonTestCase, +) + + +# noinspection SqlNoDataSourceInspection +class TestAiopgDatabaseClient(CommonTestCase, PostgresAsyncTestCase): + def setUp(self): + super().setUp() + self.sql = "SELECT * FROM information_schema.tables" + + def test_constructor(self): + client = AiopgDatabaseClient("foo") + self.assertEqual("foo", client.database) + self.assertEqual("postgres", client.user) + self.assertEqual("", client.password) + self.assertEqual("localhost", client.host) + self.assertEqual(5432, client.port) + + def test_from_config(self): + default_database = self.config.get_default_database() + client = AiopgDatabaseClient.from_config(self.config) + self.assertEqual(default_database["database"], client.database) + self.assertEqual(default_database["user"], client.user) + self.assertEqual(default_database["password"], client.password) + self.assertEqual(default_database["host"], client.host) + self.assertEqual(default_database["port"], client.port) + + async def test_is_valid_true(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + self.assertTrue(await client.is_valid()) + + async def test_is_valid_false_not_setup(self): + client = AiopgDatabaseClient.from_config(self.config) + self.assertFalse(await client.is_valid()) + + async def test_is_valid_false_operational_error(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + with patch.object(Connection, "isolation_level", new_callable=PropertyMock, side_effect=OperationalError): + self.assertFalse(await client.is_valid()) + + async def test_is_valid_false_closed(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + with patch.object(Connection, "closed", new_callable=PropertyMock, return_valud=False): + self.assertFalse(await client.is_valid()) + + async def test_connection(self): + client = AiopgDatabaseClient.from_config(self.config) + self.assertIsNone(client.connection) + async with client: + self.assertIsInstance(client.connection, Connection) + self.assertIsNone(client.connection) + + async def test_connection_raises(self): + with patch.object(aiopg, "connect", new_callable=PropertyMock, side_effect=OperationalError): + with self.assertRaises(UnableToConnectException): + async with AiopgDatabaseClient.from_config(self.config): + pass + + async def test_cursor(self): + client = AiopgDatabaseClient.from_config(self.config) + self.assertIsNone(client.cursor) + async with client: + self.assertIsNone(client.cursor) + await client.execute("SELECT * FROM information_schema.tables") + self.assertIsInstance(client.cursor, Cursor) + + self.assertIsNone(client.cursor) + + async def test_cursor_reset(self): + client = AiopgDatabaseClient.from_config(self.config) + async with client: + await client.execute("SELECT * FROM information_schema.tables") + self.assertIsInstance(client.cursor, Cursor) + await client.reset() + self.assertIsNone(client.cursor) + + async def test_lock(self): + client = AiopgDatabaseClient.from_config(self.config) + self.assertIsNone(client.lock) + async with client: + self.assertIsNone(client.lock) + await client.execute(self.sql, lock="foo") + self.assertIsInstance(client.lock, DatabaseLock) + + self.assertIsNone(client.lock) + + async def test_lock_reset(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + await client.execute(self.sql, lock="foo") + self.assertIsInstance(client.lock, DatabaseLock) + await client.reset() + self.assertIsNone(client.lock) + + async def test_execute(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + with patch.object(Cursor, "execute") as execute_mock: + await client.execute(self.sql) + self.assertEqual( + [call(operation=self.sql, parameters=None, timeout=None)], + execute_mock.call_args_list, + ) + + async def test_execute_with_lock(self): + with patch.object(DatabaseLock, "acquire") as enter_lock_mock: + with patch.object(DatabaseLock, "release") as exit_lock_mock: + async with AiopgDatabaseClient.from_config(self.config) as client: + await client.execute(self.sql, lock="foo") + self.assertEqual(1, enter_lock_mock.call_count) + self.assertEqual(0, exit_lock_mock.call_count) + enter_lock_mock.reset_mock() + exit_lock_mock.reset_mock() + self.assertEqual(0, enter_lock_mock.call_count) + self.assertEqual(1, exit_lock_mock.call_count) + + async def test_execute_with_lock_multiple(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + self.assertIsNone(client.lock) + + await client.execute(self.sql, lock="foo") + foo_lock = client.lock + self.assertIsInstance(foo_lock, DatabaseLock) + + await client.execute(self.sql, lock="foo") + self.assertEqual(foo_lock, client.lock) + + await client.execute(self.sql, lock="bar") + self.assertNotEqual(foo_lock, client.lock) + self.assertIsInstance(client.lock, DatabaseLock) + + async def test_execute_raises_integrity(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + with patch.object(Cursor, "execute", side_effect=IntegrityError): + with self.assertRaises(IntegrityException): + await client.execute(self.sql) + + async def test_fetch_one(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + await client.execute(self.sql) + observed = await client.fetch_one() + self.assertIsInstance(observed, tuple) + + async def test_fetch_all(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + await client.execute(self.sql) + observed = [value async for value in client.fetch_all()] + + self.assertGreater(len(observed), 0) + for obs in observed: + self.assertIsInstance(obs, tuple) + + +if __name__ == "__main__": + unittest.main() diff --git a/packages/core/minos-microservice-common/tests/test_common/test_database/test_locks.py b/packages/core/minos-microservice-common/tests/test_common/test_database/test_locks.py index d03a33418..19ff08541 100644 --- a/packages/core/minos-microservice-common/tests/test_common/test_database/test_locks.py +++ b/packages/core/minos-microservice-common/tests/test_common/test_database/test_locks.py @@ -1,12 +1,8 @@ import unittest import warnings -import aiopg -from aiopg import ( - Cursor, -) - from minos.common import ( + AiopgDatabaseClient, DatabaseLock, Lock, PostgreSqlLock, @@ -23,42 +19,37 @@ class TestDatabaseLock(CommonTestCase, PostgresAsyncTestCase): def test_base(self): self.assertTrue(issubclass(DatabaseLock, Lock)) - async def test_wrapped_connection(self): - wrapped_connection = aiopg.connect(**self.config.get_default_database()) - lock = DatabaseLock(wrapped_connection, "foo") - self.assertEqual(wrapped_connection, lock.wrapped_connection) + async def test_client(self): + client = AiopgDatabaseClient(**self.config.get_default_database()) + lock = DatabaseLock(client, "foo") + self.assertEqual(client, lock.client) async def test_key(self): - wrapped_connection = aiopg.connect(**self.config.get_default_database()) - lock = DatabaseLock(wrapped_connection, "foo") + client = AiopgDatabaseClient(**self.config.get_default_database()) + lock = DatabaseLock(client, "foo") self.assertEqual("foo", lock.key) async def test_key_raises(self): - wrapped_connection = aiopg.connect(**self.config.get_default_database()) + client = AiopgDatabaseClient(**self.config.get_default_database()) with self.assertRaises(ValueError): - DatabaseLock(wrapped_connection, []) + DatabaseLock(client, []) async def test_hashed_key(self): - wrapped_connection = aiopg.connect(**self.config.get_default_database()) - lock = DatabaseLock(wrapped_connection, "foo") + client = AiopgDatabaseClient(**self.config.get_default_database()) + lock = DatabaseLock(client, "foo") self.assertEqual(hash("foo"), lock.hashed_key) - async def test_cursor(self): - wrapped_connection = aiopg.connect(**self.config.get_default_database()) - async with DatabaseLock(wrapped_connection, "foo") as lock: - self.assertIsInstance(lock.cursor, Cursor) - class TestPostgreSqlLock(CommonTestCase, PostgresAsyncTestCase): def test_is_subclass(self): self.assertTrue(issubclass(PostgreSqlLock, DatabaseLock)) async def test_warnings(self): - wrapped_connection = aiopg.connect(**self.config.get_default_database()) + client = AiopgDatabaseClient(**self.config.get_default_database()) with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) - lock = PostgreSqlLock(wrapped_connection, "foo") + lock = PostgreSqlLock(client, "foo") self.assertIsInstance(lock, DatabaseLock) diff --git a/packages/core/minos-microservice-common/tests/test_common/test_database/test_pools.py b/packages/core/minos-microservice-common/tests/test_common/test_database/test_pools.py index 061f1c118..fe7ff66ce 100644 --- a/packages/core/minos-microservice-common/tests/test_common/test_database/test_pools.py +++ b/packages/core/minos-microservice-common/tests/test_common/test_database/test_pools.py @@ -1,24 +1,19 @@ import unittest import warnings from unittest.mock import ( - PropertyMock, patch, ) -import aiopg -from aiopg import ( - Connection, -) -from psycopg2 import ( - OperationalError, -) - from minos.common import ( + AiopgDatabaseClient, + DatabaseClient, + DatabaseClientBuilder, DatabaseClientPool, DatabaseLock, DatabaseLockPool, PostgreSqlLockPool, PostgreSqlPool, + UnableToConnectException, ) from minos.common.testing import ( PostgresAsyncTestCase, @@ -34,12 +29,9 @@ def setUp(self) -> None: self.pool = DatabaseClientPool.from_config(self.config) def test_constructor(self): - pool = DatabaseClientPool("foo") - self.assertEqual("foo", pool.database) - self.assertEqual("postgres", pool.user) - self.assertEqual("", pool.password) - self.assertEqual("localhost", pool.host) - self.assertEqual(5432, pool.port) + builder = DatabaseClientBuilder() + pool = DatabaseClientPool(builder) + self.assertEqual(builder, pool.client_builder) async def asyncSetUp(self): await super().asyncSetUp() @@ -50,37 +42,36 @@ async def asyncTearDown(self): await super().asyncTearDown() def test_from_config(self): - repository_config = self.config.get_database_by_name("event") - self.assertEqual(repository_config["database"], self.pool.database) - self.assertEqual(repository_config["user"], self.pool.user) - self.assertEqual(repository_config["password"], self.pool.password) - self.assertEqual(repository_config["host"], self.pool.host) - self.assertEqual(repository_config["port"], self.pool.port) + pool = DatabaseClientPool.from_config(self.config, key="event") + self.assertIsInstance(pool.client_builder, DatabaseClientBuilder) + self.assertEqual(AiopgDatabaseClient, pool.client_builder.instance_cls) - async def test_acquire(self): + async def test_acquire_once(self): async with self.pool.acquire() as c1: - self.assertIsInstance(c1, Connection) + self.assertIsInstance(c1, DatabaseClient) + + async def test_acquire_multiple_recycle(self): + async with self.pool.acquire() as c1: + pass async with self.pool.acquire() as c2: self.assertEqual(c1, c2) - async def test_acquire_with_error(self): - with patch("aiopg.Connection.isolation_level", new_callable=PropertyMock, side_effect=(OperationalError, None)): - async with self.pool.acquire() as connection: - self.assertIsInstance(connection, Connection) + async def test_acquire_multiple_same_time(self): + async with self.pool.acquire() as c1: + async with self.pool.acquire() as c2: + self.assertNotEqual(c1, c2) + + async def test_acquire_with_reset(self): + with patch.object(AiopgDatabaseClient, "reset") as reset_mock: + async with self.pool.acquire(): + self.assertEqual(0, reset_mock.call_count) + self.assertEqual(1, reset_mock.call_count) async def test_acquire_with_connection_error(self): - executed = [False] - original = aiopg.connect - - def _side_effect(*args, **kwargs): - if not executed[0]: - executed[0] = True - raise OperationalError - return original(*args, **kwargs) - - with patch("aiopg.connect", side_effect=_side_effect): - async with self.pool.acquire() as connection: - self.assertIsInstance(connection, Connection) + with patch.object(AiopgDatabaseClient, "_create_connection", side_effect=(UnableToConnectException(""), None)): + with patch.object(AiopgDatabaseClient, "is_valid", return_value=True): + async with self.pool.acquire() as client: + self.assertIsInstance(client, AiopgDatabaseClient) class TestPostgreSqlPool(CommonTestCase, PostgresAsyncTestCase): diff --git a/packages/core/minos-microservice-common/tests/test_common/test_testing.py b/packages/core/minos-microservice-common/tests/test_common/test_testing.py index 25ba78584..bfb899baf 100644 --- a/packages/core/minos-microservice-common/tests/test_common/test_testing.py +++ b/packages/core/minos-microservice-common/tests/test_common/test_testing.py @@ -1,5 +1,4 @@ import unittest -import warnings from minos.common import ( Config, @@ -31,71 +30,7 @@ class MyMinosTestCase(MinosTestCase): class TestPostgresAsyncTestCase(unittest.IsolatedAsyncioTestCase): - def test_repository_db(self): - test_case = MyPostgresAsyncTestCase() - test_case.setUp() - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - self.assertEqual( - { - k: v - for k, v in test_case.base_config.get_database_by_name("aggregate").items() - if k not in {"database", "user"} - }, - {k: v for k, v in test_case.repository_db.items() if k not in {"database", "user"}}, - ) - self.assertNotEqual( - test_case.base_config.get_database_by_name("aggregate")["database"], - test_case.repository_db["database"], - ) - self.assertNotEqual( - test_case.base_config.get_database_by_name("aggregate")["user"], - test_case.repository_db["user"], - ) - - def test_broker_queue_db(self): - test_case = MyPostgresAsyncTestCase() - test_case.setUp() - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - self.assertEqual( - { - k: v - for k, v in test_case.base_config.get_database_by_name("broker").items() - if k not in {"database", "user"} - }, - {k: v for k, v in test_case.broker_queue_db.items() if k not in {"database", "user"}}, - ) - self.assertNotEqual( - test_case.base_config.get_database_by_name("broker")["database"], - test_case.broker_queue_db["database"], - ) - self.assertNotEqual( - test_case.base_config.get_database_by_name("broker")["user"], - test_case.broker_queue_db["user"], - ) - - def test_snapshot_db(self): - test_case = MyPostgresAsyncTestCase() - test_case.setUp() - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - self.assertEqual( - { - k: v - for k, v in test_case.base_config.get_database_by_name("aggregate").items() - if k not in {"database", "user"} - }, - {k: v for k, v in test_case.snapshot_db.items() if k not in {"database", "user"}}, - ) - self.assertNotEqual( - test_case.base_config.get_database_by_name("aggregate")["database"], - test_case.broker_queue_db["database"], - ) - self.assertNotEqual( - test_case.base_config.get_database_by_name("aggregate")["user"], - test_case.broker_queue_db["user"], - ) + pass class MyPostgresAsyncTestCase(PostgresAsyncTestCase): diff --git a/packages/core/minos-microservice-common/tests/utils.py b/packages/core/minos-microservice-common/tests/utils.py index fb4f622a1..d54835fe6 100644 --- a/packages/core/minos-microservice-common/tests/utils.py +++ b/packages/core/minos-microservice-common/tests/utils.py @@ -73,8 +73,11 @@ def __init__(self, key=None, *args, **kwargs): key = "fake" super().__init__(key, *args, **kwargs) - async def __aexit__(self, exc_type, exc_val, exc_tb): - return + async def acquire(self) -> None: + """For testing purposes.""" + + async def release(self): + """For testing purposes.""" class FakeLockPool(LockPool): diff --git a/packages/core/minos-microservice-networks/minos/networks/brokers/collections/queues/pg.py b/packages/core/minos-microservice-networks/minos/networks/brokers/collections/queues/pg.py index dc53280bd..69d44cf9a 100644 --- a/packages/core/minos-microservice-networks/minos/networks/brokers/collections/queues/pg.py +++ b/packages/core/minos-microservice-networks/minos/networks/brokers/collections/queues/pg.py @@ -24,9 +24,6 @@ Optional, ) -from aiopg import ( - Cursor, -) from cached_property import ( cached_property, ) @@ -37,6 +34,7 @@ from minos.common import ( Builder, Config, + DatabaseClient, DatabaseMixin, ) @@ -109,7 +107,7 @@ def query_factory(self) -> PostgreSqlBrokerQueueQueryFactory: def _from_config(cls, config: Config, **kwargs) -> PostgreSqlBrokerQueue: broker_interface = config.get_interface_by_name("broker") queue_config = broker_interface.get("common", dict()).get("queue", dict()) - database_config = config.get_database_by_name("broker") + database_config = {"database_key": None} return cls(**(kwargs | database_config | queue_config)) @@ -175,59 +173,59 @@ async def _dequeue(self) -> BrokerMessage: self._queue.task_done() async def _run(self, max_wait: Optional[float] = 60.0) -> NoReturn: - async with self.cursor() as cursor: - await self._listen_entries(cursor) + async with self.pool.acquire() as client: + await self._listen_entries(client) try: while self._run_task is not None: - await self._wait_for_entries(cursor, max_wait) - await self._dequeue_batch(cursor) + await self._wait_for_entries(client, max_wait) + await self._dequeue_batch(client) finally: - await self._unlisten_entries(cursor) + await self._unlisten_entries(client) - async def _listen_entries(self, cursor: Cursor) -> None: + async def _listen_entries(self, client: DatabaseClient) -> None: # noinspection PyTypeChecker - await cursor.execute(self._query_factory.build_listen()) + await client.execute(self._query_factory.build_listen()) - async def _unlisten_entries(self, cursor: Cursor) -> None: - if not cursor.closed: + async def _unlisten_entries(self, client: DatabaseClient) -> None: + if not client.already_destroyed: # noinspection PyTypeChecker - await cursor.execute(self._query_factory.build_unlisten()) + await client.execute(self._query_factory.build_unlisten()) - async def _wait_for_entries(self, cursor: Cursor, max_wait: Optional[float]) -> None: + async def _wait_for_entries(self, client: DatabaseClient, max_wait: Optional[float]) -> None: while True: - if await self._get_count(cursor): + if await self._get_count(client): return with suppress(TimeoutError): - return await wait_for(consume_queue(cursor.connection.notifies, self._records), max_wait) + return await wait_for(consume_queue(client.connection.notifies, self._records), max_wait) - async def _get_count(self, cursor: Cursor) -> int: + async def _get_count(self, client: DatabaseClient) -> int: # noinspection PyTypeChecker - await cursor.execute(self._query_factory.build_count_not_processed(), (self._retry,)) - count = (await cursor.fetchone())[0] + await client.execute(self._query_factory.build_count_not_processed(), (self._retry,)) + count = (await client.fetch_one())[0] return count - async def _dequeue_batch(self, cursor: Cursor) -> None: - async with cursor.begin(): - rows = await self._dequeue_rows(cursor) + async def _dequeue_batch(self, client: DatabaseClient) -> None: + rows = await self._dequeue_rows(client) - if not len(rows): - return + if not len(rows): + return - entries = [_Entry(*row) for row in rows] + entries = [_Entry(*row) for row in rows] - # noinspection PyTypeChecker - await cursor.execute(self._query_factory.build_mark_processing(), (tuple(entry.id_ for entry in entries),)) + # noinspection PyTypeChecker + await client.execute(self._query_factory.build_mark_processing(), (tuple(entry.id_ for entry in entries),)) - for entry in entries: - await self._queue.put(entry) + for entry in entries: + await self._queue.put(entry) - async def _dequeue_rows(self, cursor: Cursor) -> list[Any]: + async def _dequeue_rows(self, client: DatabaseClient) -> list[Any]: # noinspection PyTypeChecker - await cursor.execute(self._query_factory.build_select_not_processed(), (self._retry, self._records)) - return await cursor.fetchall() + await client.execute(self._query_factory.build_select_not_processed(), (self._retry, self._records)) + return [row async for row in client.fetch_all()] +# noinspection SqlResolve,SqlNoDataSourceInspection class PostgreSqlBrokerQueueQueryFactory(ABC): """PostgreSql Broker Queue Query Factory class.""" @@ -363,7 +361,7 @@ def with_config(self, config: Config): :param config: The config to be set. :return: This method return the builder instance. """ - self.kwargs |= config.get_database_by_name("broker") + self.kwargs |= {"database_key": None} self.kwargs |= config.get_interface_by_name("broker").get("common", dict()).get("queue", dict()) return super().with_config(config) diff --git a/packages/core/minos-microservice-networks/minos/networks/brokers/handlers/impl.py b/packages/core/minos-microservice-networks/minos/networks/brokers/handlers/impl.py index b26cb5831..17340e926 100644 --- a/packages/core/minos-microservice-networks/minos/networks/brokers/handlers/impl.py +++ b/packages/core/minos-microservice-networks/minos/networks/brokers/handlers/impl.py @@ -38,7 +38,7 @@ class BrokerHandler(SetupMixin): """Broker Handler class.""" def __init__( - self, dispatcher: BrokerDispatcher, subscriber: BrokerSubscriber, concurrency: int = 15, *args, **kwargs + self, dispatcher: BrokerDispatcher, subscriber: BrokerSubscriber, concurrency: int = 5, *args, **kwargs ): super().__init__(*args, **kwargs) diff --git a/packages/core/minos-microservice-networks/minos/networks/brokers/subscribers/filtered/validators/duplicates/pg.py b/packages/core/minos-microservice-networks/minos/networks/brokers/subscribers/filtered/validators/duplicates/pg.py index e1a2394af..557092370 100644 --- a/packages/core/minos-microservice-networks/minos/networks/brokers/subscribers/filtered/validators/duplicates/pg.py +++ b/packages/core/minos-microservice-networks/minos/networks/brokers/subscribers/filtered/validators/duplicates/pg.py @@ -9,9 +9,6 @@ UUID, ) -from psycopg2 import ( - IntegrityError, -) from psycopg2.sql import ( SQL, ) @@ -20,6 +17,7 @@ Builder, Config, DatabaseMixin, + IntegrityException, ) from .abc import ( @@ -64,7 +62,7 @@ async def _is_unique(self, topic: str, uuid: UUID) -> bool: try: await self.submit_query(self._query_factory.build_insert_row(), {"topic": topic, "uuid": uuid}) return True - except IntegrityError: + except IntegrityException: return False @@ -77,13 +75,14 @@ def with_config(self, config: Config): :param config: The config to be set. :return: This method return the builder instance. """ - self.kwargs |= config.get_database_by_name("broker") + self.kwargs |= {"database_key": None} return super().with_config(config) PostgreSqlBrokerSubscriberDuplicateValidator.set_builder(PostgreSqlBrokerSubscriberDuplicateValidatorBuilder) +# noinspection SqlNoDataSourceInspection class PostgreSqlBrokerSubscriberDuplicateValidatorQueryFactory: """PostgreSql Broker Subscriber Duplicate Detector Query Factory class.""" diff --git a/packages/core/minos-microservice-networks/minos/networks/brokers/subscribers/queued/queues/pg.py b/packages/core/minos-microservice-networks/minos/networks/brokers/subscribers/queued/queues/pg.py index 1cd1cdeec..3a3804622 100644 --- a/packages/core/minos-microservice-networks/minos/networks/brokers/subscribers/queued/queues/pg.py +++ b/packages/core/minos-microservice-networks/minos/networks/brokers/subscribers/queued/queues/pg.py @@ -8,14 +8,15 @@ Optional, ) -from aiopg import ( - Cursor, -) from psycopg2.sql import ( SQL, Identifier, ) +from minos.common import ( + DatabaseClient, +) + from ....collections import ( PostgreSqlBrokerQueue, PostgreSqlBrokerQueueBuilder, @@ -45,29 +46,30 @@ def __init__( async def _notify_enqueued(self, message: BrokerMessage) -> None: await self.submit_query(self._query_factory.build_notify().format(Identifier(message.topic))) - async def _listen_entries(self, cursor: Cursor) -> None: + async def _listen_entries(self, client: DatabaseClient) -> None: for topic in self.topics: - await cursor.execute(self._query_factory.build_listen().format(Identifier(topic))) + await client.execute(self._query_factory.build_listen().format(Identifier(topic))) - async def _unlisten_entries(self, cursor: Cursor) -> None: - if not cursor.closed: + async def _unlisten_entries(self, client: DatabaseClient) -> None: + if not client.already_destroyed: for topic in self.topics: - await cursor.execute(self._query_factory.build_unlisten().format(Identifier(topic))) + await client.execute(self._query_factory.build_unlisten().format(Identifier(topic))) - async def _get_count(self, cursor: Cursor) -> int: + async def _get_count(self, client: DatabaseClient) -> int: # noinspection PyTypeChecker - await cursor.execute(self._query_factory.build_count_not_processed(), (self._retry, tuple(self.topics))) - count = (await cursor.fetchone())[0] + await client.execute(self._query_factory.build_count_not_processed(), (self._retry, tuple(self.topics))) + count = (await client.fetch_one())[0] return count - async def _dequeue_rows(self, cursor: Cursor) -> list[Any]: + async def _dequeue_rows(self, client: DatabaseClient) -> list[Any]: # noinspection PyTypeChecker - await cursor.execute( + await client.execute( self._query_factory.build_select_not_processed(), (self._retry, tuple(self.topics), self._records) ) - return await cursor.fetchall() + return [row async for row in client.fetch_all()] +# noinspection SqlNoDataSourceInspection,SqlResolve class PostgreSqlBrokerSubscriberQueueQueryFactory(PostgreSqlBrokerQueueQueryFactory): """PostgreSql Broker Subscriber Queue Query Factory class.""" diff --git a/packages/core/minos-microservice-networks/minos/networks/http/connectors.py b/packages/core/minos-microservice-networks/minos/networks/http/connectors.py index 5163d1571..fc9bd6f4a 100644 --- a/packages/core/minos-microservice-networks/minos/networks/http/connectors.py +++ b/packages/core/minos-microservice-networks/minos/networks/http/connectors.py @@ -8,6 +8,9 @@ ABC, abstractmethod, ) +from asyncio import ( + Semaphore, +) from collections.abc import ( Callable, ) @@ -56,7 +59,14 @@ class HttpConnector(ABC, SetupMixin, Generic[RawRequest, RawResponse]): """Http Application base class.""" - def __init__(self, adapter: HttpAdapter, host: Optional[str] = None, port: Optional[int] = None, **kwargs): + def __init__( + self, + adapter: HttpAdapter, + host: Optional[str] = None, + port: Optional[int] = None, + max_connections: int = 5, + **kwargs, + ): super().__init__(**kwargs) if host is None: host = "0.0.0.0" @@ -67,6 +77,8 @@ def __init__(self, adapter: HttpAdapter, host: Optional[str] = None, port: Optio self._host = host self._port = port + self._semaphore = Semaphore(max_connections) + @classmethod def _from_config(cls, config: Config, **kwargs) -> HttpConnector: http_config = config.get_interface_by_name("http") @@ -138,29 +150,30 @@ def adapt_callback( @wraps(callback) async def _wrapper(raw: RawRequest) -> RawResponse: - logger.info(f"Dispatching '{raw!s}'...") - - request = await self._build_request(raw) - token = REQUEST_USER_CONTEXT_VAR.set(request.user) - - # noinspection PyBroadException - try: - response = callback(request) - if isawaitable(response): - response = await response - - return await self._build_response(response) - - except ResponseException as exc: - tb = traceback.format_exc() - logger.error(f"Raised an application exception:\n {tb}") - return await self._build_error_response(tb, exc.status) - except Exception: - tb = traceback.format_exc() - logger.exception(f"Raised a system exception:\n {tb}") - return await self._build_error_response(tb, 500) - finally: - REQUEST_USER_CONTEXT_VAR.reset(token) + async with self._semaphore: + logger.info(f"Dispatching '{raw!s}'...") + + request = await self._build_request(raw) + token = REQUEST_USER_CONTEXT_VAR.set(request.user) + + # noinspection PyBroadException + try: + response = callback(request) + if isawaitable(response): + response = await response + + return await self._build_response(response) + + except ResponseException as exc: + tb = traceback.format_exc() + logger.error(f"Raised an application exception:\n {tb}") + return await self._build_error_response(tb, exc.status) + except Exception: + tb = traceback.format_exc() + logger.exception(f"Raised a system exception:\n {tb}") + return await self._build_error_response(tb, 500) + finally: + REQUEST_USER_CONTEXT_VAR.reset(token) return _wrapper diff --git a/packages/core/minos-microservice-networks/tests/test_networks/test_brokers/test_collections/test_queues/test_pg.py b/packages/core/minos-microservice-networks/tests/test_networks/test_brokers/test_collections/test_queues/test_pg.py index 52735d889..dd4c065ab 100644 --- a/packages/core/minos-microservice-networks/tests/test_networks/test_brokers/test_collections/test_queues/test_pg.py +++ b/packages/core/minos-microservice-networks/tests/test_networks/test_brokers/test_collections/test_queues/test_pg.py @@ -8,6 +8,7 @@ ) from minos.common import ( + AiopgDatabaseClient, DatabaseMixin, ) from minos.common.testing import ( @@ -23,6 +24,7 @@ PostgreSqlBrokerQueueQueryFactory, ) from tests.utils import ( + FakeAsyncIterator, NetworksTestCase, ) @@ -85,9 +87,10 @@ async def test_dequeue_with_count(self): BrokerMessageV1("bar", BrokerMessageV1Payload("foo")), ] - with patch( - "aiopg.Cursor.fetchall", - return_value=[[1, messages[0].avro_bytes], [2, bytes()], [3, messages[1].avro_bytes]], + with patch.object( + AiopgDatabaseClient, + "fetch_all", + return_value=FakeAsyncIterator([[1, messages[0].avro_bytes], [2, bytes()], [3, messages[1].avro_bytes]]), ): async with PostgreSqlBrokerQueue.from_config(self.config, query_factory=self.query_factory) as queue: queue._get_count = AsyncMock(side_effect=[3, 0]) diff --git a/packages/core/minos-microservice-networks/tests/test_networks/test_brokers/test_subscribers/test_queued/test_queues/test_pg.py b/packages/core/minos-microservice-networks/tests/test_networks/test_brokers/test_subscribers/test_queued/test_queues/test_pg.py index 922467aec..d7446dd47 100644 --- a/packages/core/minos-microservice-networks/tests/test_networks/test_brokers/test_subscribers/test_queued/test_queues/test_pg.py +++ b/packages/core/minos-microservice-networks/tests/test_networks/test_brokers/test_subscribers/test_queued/test_queues/test_pg.py @@ -7,6 +7,9 @@ patch, ) +from minos.common import ( + AiopgDatabaseClient, +) from minos.common.testing import ( PostgresAsyncTestCase, ) @@ -20,6 +23,7 @@ PostgreSqlBrokerSubscriberQueueQueryFactory, ) from tests.utils import ( + FakeAsyncIterator, NetworksTestCase, ) @@ -46,9 +50,10 @@ async def test_dequeue_with_count(self): BrokerMessageV1("bar", BrokerMessageV1Payload("foo")), ] - with patch( - "aiopg.Cursor.fetchall", - return_value=[[1, messages[0].avro_bytes], [2, bytes()], [3, messages[1].avro_bytes]], + with patch.object( + AiopgDatabaseClient, + "fetch_all", + return_value=FakeAsyncIterator([[1, messages[0].avro_bytes], [2, bytes()], [3, messages[1].avro_bytes]]), ): async with PostgreSqlBrokerSubscriberQueue.from_config(self.config, topics={"foo", "bar"}) as queue: queue._get_count = AsyncMock(side_effect=[3, 0]) diff --git a/packages/core/minos-microservice-saga/minos/saga/executions/saga.py b/packages/core/minos-microservice-saga/minos/saga/executions/saga.py index 3ae1ed5df..6c0c6548e 100644 --- a/packages/core/minos-microservice-saga/minos/saga/executions/saga.py +++ b/packages/core/minos-microservice-saga/minos/saga/executions/saga.py @@ -4,6 +4,9 @@ import logging import warnings +from asyncio import ( + shield, +) from contextlib import ( suppress, ) @@ -262,7 +265,7 @@ async def commit(self, *args, **kwargs) -> None: committer = TransactionCommitter(self.uuid, self.executed_steps, *args, **kwargs) try: - await committer.commit() + await shield(committer.commit()) except Exception as exc: # FIXME: Exception is too broad logger.warning(f"There was an exception on {TransactionCommitter.__name__!r} commit: {exc!r}") with suppress(SagaRollbackExecutionException): @@ -279,7 +282,7 @@ async def reject(self, *args, **kwargs) -> None: """ committer = TransactionCommitter(self.uuid, self.executed_steps, *args, **kwargs) try: - await committer.reject() + await shield(committer.reject()) except Exception as exc: logger.warning(f"There was an exception on {TransactionCommitter.__name__!r} rejection: {exc!r}") raise SagaFailedCommitCallbackException(exc) diff --git a/packages/core/minos-microservice-saga/minos/saga/manager.py b/packages/core/minos-microservice-saga/minos/saga/manager.py index fb56e6cf1..590b46e9f 100644 --- a/packages/core/minos-microservice-saga/minos/saga/manager.py +++ b/packages/core/minos-microservice-saga/minos/saga/manager.py @@ -181,7 +181,7 @@ async def _run( self.storage.store(execution) if raise_on_error: raise exc - logger.warning(f"The execution identified by {execution.uuid!s} failed: {exc.exception!r}") + logger.exception(f"The execution identified by {execution.uuid!s} failed") finally: if (headers := REQUEST_HEADERS_CONTEXT_VAR.get()) is not None: related_services = reduce(or_, (s.related_services for s in execution.executed_steps), set()) diff --git a/packages/core/minos-microservice-saga/tests/utils.py b/packages/core/minos-microservice-saga/tests/utils.py index 44ba22a74..f8234e5b6 100644 --- a/packages/core/minos-microservice-saga/tests/utils.py +++ b/packages/core/minos-microservice-saga/tests/utils.py @@ -85,8 +85,11 @@ def __init__(self, key=None, *args, **kwargs): key = "fake" super().__init__(key, *args, **kwargs) - async def __aexit__(self, exc_type, exc_val, exc_tb): - return + async def acquire(self) -> None: + """For testing purposes.""" + + async def release(self): + """For testing purposes.""" class FakeLockPool(LockPool):