diff --git a/packages/core/minos-microservice-common/minos/common/database/clients/abc.py b/packages/core/minos-microservice-common/minos/common/database/clients/abc.py index dc1f9e010..b878b301a 100644 --- a/packages/core/minos-microservice-common/minos/common/database/clients/abc.py +++ b/packages/core/minos-microservice-common/minos/common/database/clients/abc.py @@ -66,9 +66,8 @@ async def is_valid(self, **kwargs) -> bool: """ return await self._is_valid(**kwargs) - @abstractmethod async def _is_valid(self, **kwargs) -> bool: - raise NotImplementedError + return True async def _destroy(self) -> None: await self.reset() diff --git a/packages/core/minos-microservice-common/minos/common/pools.py b/packages/core/minos-microservice-common/minos/common/pools.py index c16c57176..3432db575 100644 --- a/packages/core/minos-microservice-common/minos/common/pools.py +++ b/packages/core/minos-microservice-common/minos/common/pools.py @@ -112,15 +112,15 @@ def _get_pool_cls(self, type_: str) -> type[Pool]: class _PoolBase(PoolBase, ABC): - def __init__(self, *args, maxsize: int = 10, recycle: Optional[int] = 300, **kwargs): + def __init__(self, *args, maxsize: int = 10, recycle: Optional[int] = None, **kwargs): super().__init__(maxsize=maxsize, recycle=recycle) class Pool(SetupMixin, _PoolBase, Generic[P], ABC): """Base class for Pool implementations in minos""" - def __init__(self, *args, maxsize: int = 10, recycle: Optional[int] = 300, already_setup: bool = True, **kwargs): - super().__init__(*args, maxsize=maxsize, recycle=recycle, already_setup=already_setup, **kwargs) + def __init__(self, *args, already_setup: bool = True, **kwargs): + super().__init__(*args, already_setup=already_setup, **kwargs) # noinspection PyUnresolvedReferences async def __acquire(self) -> Any: # pragma: no cover diff --git a/packages/core/minos-microservice-common/minos/common/testing/database/clients.py b/packages/core/minos-microservice-common/minos/common/testing/database/clients.py index cc6ec8aae..90b1b4d3a 100644 --- a/packages/core/minos-microservice-common/minos/common/testing/database/clients.py +++ b/packages/core/minos-microservice-common/minos/common/testing/database/clients.py @@ -21,10 +21,6 @@ def __init__(self, *args, **kwargs): self.kwargs = kwargs self._response = tuple() - async def _is_valid(self, **kwargs) -> bool: - """For testing purposes""" - return True - async def _reset(self, **kwargs) -> None: """For testing purposes""" self._response = tuple() diff --git a/packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/test_abc.py b/packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/test_abc.py index e7ee8f393..fbedcca2c 100644 --- a/packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/test_abc.py +++ b/packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/test_abc.py @@ -33,9 +33,6 @@ class _DatabaseClient(DatabaseClient): """For testing purposes.""" - async def _is_valid(self, **kwargs) -> bool: - """For testing purposes.""" - async def _reset(self, **kwargs) -> None: """For testing purposes.""" @@ -76,7 +73,7 @@ def build_release(self, hashed_key: int) -> DatabaseOperation: class TestDatabaseClient(CommonTestCase): def test_abstract(self): self.assertTrue(issubclass(DatabaseClient, (ABC, BuildableMixin))) - expected = {"_is_valid", "_execute", "_fetch_all", "_reset"} + expected = {"_execute", "_fetch_all", "_reset"} # noinspection PyUnresolvedReferences self.assertEqual(expected, DatabaseClient.__abstractmethods__) diff --git a/packages/core/minos-microservice-networks/minos/networks/brokers/pools.py b/packages/core/minos-microservice-networks/minos/networks/brokers/pools.py index ecb298245..fecf1929c 100644 --- a/packages/core/minos-microservice-networks/minos/networks/brokers/pools.py +++ b/packages/core/minos-microservice-networks/minos/networks/brokers/pools.py @@ -30,10 +30,8 @@ class BrokerClientPool(Pool): """Broker Client Pool class.""" - def __init__( - self, instance_kwargs: dict[str, Any], maxsize: int = 5, recycle: Optional[int] = 3600, *args, **kwargs - ): - super().__init__(maxsize=maxsize, recycle=recycle, *args, **kwargs) + def __init__(self, instance_kwargs: dict[str, Any], maxsize: int = 5, *args, **kwargs): + super().__init__(maxsize=maxsize, *args, **kwargs) self._instance_kwargs = instance_kwargs @classmethod diff --git a/packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py b/packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py index 825c6b364..f49610012 100644 --- a/packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py +++ b/packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py @@ -3,8 +3,15 @@ ) import logging +from asyncio import ( + TimeoutError, +) from collections.abc import ( AsyncIterator, + Iterable, +) +from functools import ( + partial, ) from typing import ( Optional, @@ -22,6 +29,7 @@ ) from minos.common import ( + CircuitBreakerMixin, ConnectionException, DatabaseClient, IntegrityException, @@ -35,7 +43,7 @@ logger = logging.getLogger(__name__) -class AiopgDatabaseClient(DatabaseClient): +class AiopgDatabaseClient(DatabaseClient, CircuitBreakerMixin): """Aiopg Database Client class.""" _connection: Optional[Connection] @@ -48,10 +56,17 @@ def __init__( port: Optional[int] = None, user: Optional[str] = None, password: Optional[str] = None, + circuit_breaker_exceptions: Iterable[type] = tuple(), + connection_timeout: Optional[float] = None, + cursor_timeout: Optional[float] = None, *args, **kwargs, ): - super().__init__(*args, **kwargs) + super().__init__( + *args, + **kwargs, + circuit_breaker_exceptions=(ConnectionException, *circuit_breaker_exceptions), + ) if host is None: host = "localhost" @@ -61,6 +76,10 @@ def __init__( user = "postgres" if password is None: password = "" + if connection_timeout is None: + connection_timeout = 1 + if cursor_timeout is None: + cursor_timeout = 60 self._database = database self._host = host @@ -68,36 +87,60 @@ def __init__( self._user = user self._password = password + self._connection_timeout = connection_timeout + self._cursor_timeout = cursor_timeout + self._connection = None self._cursor = None async def _setup(self) -> None: await super()._setup() - await self._create_connection() + await self.recreate() async def _destroy(self) -> None: await super()._destroy() - await self._close_connection() + await self.close() + + async def recreate(self) -> None: + """Recreate the database connection. + + :return: This method does not return anything. + """ + await self.close() - async def _create_connection(self): + self._connection = await self.with_circuit_breaker(self._connect) + logger.debug(f"Created {self.database!r} database connection identified by {id(self._connection)}!") + + async def _connect(self) -> Connection: try: - self._connection = await aiopg.connect( - host=self.host, port=self.port, dbname=self.database, user=self.user, password=self.password + return await aiopg.connect( + timeout=self._connection_timeout, + host=self.host, + port=self.port, + dbname=self.database, + user=self.user, + password=self.password, ) - except OperationalError as exc: - msg = f"There was an {exc!r} while trying to get a database connection." - logger.warning(msg) - raise ConnectionException(msg) + except (OperationalError, TimeoutError) as exc: + raise ConnectionException(f"There was not possible to connect to the database: {exc!r}") - logger.debug(f"Created {self.database!r} database connection identified by {id(self._connection)}!") + async def close(self) -> None: + """Close database connection. - async def _close_connection(self): - if self._connection is not None and not self._connection.closed: + :return: This method does not return anything. + """ + if await self.is_connected(): await self._connection.close() - self._connection = None - logger.debug(f"Destroyed {self.database!r} database connection identified by {id(self._connection)}!") - async def _is_valid(self) -> bool: + if self._connection is not None: + logger.debug(f"Destroyed {self.database!r} database connection identified by {id(self._connection)}!") + self._connection = None + + async def is_connected(self) -> bool: + """Check if the client is connected. + + :return: ``True`` if it is connected or ``False`` otherwise. + """ if self._connection is None: return False @@ -114,28 +157,37 @@ async def _reset(self, **kwargs) -> None: # noinspection PyUnusedLocal async def _fetch_all(self) -> AsyncIterator[tuple]: - await self._create_cursor() + if self._cursor is None: + raise ProgrammingException("An operation must be executed before fetching any value.") + try: async for row in self._cursor: yield row except ProgrammingError as exc: raise ProgrammingException(str(exc)) + except OperationalError as exc: + raise ConnectionException(f"There was not possible to connect to the database: {exc!r}") # noinspection PyUnusedLocal async def _execute(self, operation: AiopgDatabaseOperation) -> None: if not isinstance(operation, AiopgDatabaseOperation): raise ValueError(f"The operation must be a {AiopgDatabaseOperation!r} instance. Obtained: {operation!r}") - await self._create_cursor() + fn = partial(self._execute_cursor, operation=operation.query, parameters=operation.parameters) + await self.with_circuit_breaker(fn) + + async def _execute_cursor(self, operation: str, parameters: dict): + if not await self.is_connected(): + await self.recreate() + + self._cursor = await self._connection.cursor(timeout=self._cursor_timeout) try: - await self._cursor.execute(operation=operation.query, parameters=operation.parameters) + await self._cursor.execute(operation=operation, parameters=parameters) + except OperationalError as exc: + raise ConnectionException(f"There was not possible to connect to the database: {exc!r}") except IntegrityError as exc: raise IntegrityException(f"The requested operation raised a integrity error: {exc!r}") - async def _create_cursor(self): - if self._cursor is None: - self._cursor = await self._connection.cursor() - async def _destroy_cursor(self, **kwargs): if self._cursor is not None: if not self._cursor.closed: diff --git a/packages/plugins/minos-database-aiopg/tests/test_aiopg/test_clients.py b/packages/plugins/minos-database-aiopg/tests/test_aiopg/test_clients.py index efddd6269..8c1d935cc 100644 --- a/packages/plugins/minos-database-aiopg/tests/test_aiopg/test_clients.py +++ b/packages/plugins/minos-database-aiopg/tests/test_aiopg/test_clients.py @@ -13,6 +13,7 @@ from psycopg2 import ( IntegrityError, OperationalError, + ProgrammingError, ) from minos.common import ( @@ -30,7 +31,7 @@ ) -# noinspection SqlNoDataSourceInspection +# noinspection SqlNoDataSourceInspection,SqlDialectInspection class TestAiopgDatabaseClient(AiopgTestCase): def setUp(self): super().setUp() @@ -53,23 +54,27 @@ def test_from_config(self): self.assertEqual(default_database["host"], client.host) self.assertEqual(default_database["port"], client.port) - async def test_is_valid_true(self): + async def test_is_valid(self): async with AiopgDatabaseClient.from_config(self.config) as client: self.assertTrue(await client.is_valid()) - async def test_is_valid_false_not_setup(self): + async def test_is_connected_true(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + self.assertTrue(await client.is_connected()) + + async def test_is_connected_false_not_setup(self): client = AiopgDatabaseClient.from_config(self.config) - self.assertFalse(await client.is_valid()) + self.assertFalse(await client.is_connected()) - async def test_is_valid_false_operational_error(self): + async def test_is_connected_false_operational_error(self): async with AiopgDatabaseClient.from_config(self.config) as client: with patch.object(Connection, "isolation_level", new_callable=PropertyMock, side_effect=OperationalError): - self.assertFalse(await client.is_valid()) + self.assertFalse(await client.is_connected()) - async def test_is_valid_false_closed(self): + async def test_is_connected_false_closed(self): async with AiopgDatabaseClient.from_config(self.config) as client: with patch.object(Connection, "closed", new_callable=PropertyMock, return_valud=False): - self.assertFalse(await client.is_valid()) + self.assertFalse(await client.is_connected()) async def test_connection(self): client = AiopgDatabaseClient.from_config(self.config) @@ -78,11 +83,27 @@ async def test_connection(self): self.assertIsInstance(client.connection, Connection) self.assertIsNone(client.connection) - async def test_connection_raises(self): - with patch.object(aiopg, "connect", new_callable=PropertyMock, side_effect=OperationalError): - with self.assertRaises(ConnectionException): - async with AiopgDatabaseClient.from_config(self.config): - pass + async def test_connection_with_circuit_breaker(self): + async with AiopgDatabaseClient.from_config(self.config) as c1: + + async def _fn(): + return c1.connection + + with patch.object(aiopg, "connect", new_callable=PropertyMock, side_effect=(OperationalError, _fn())): + async with AiopgDatabaseClient.from_config(self.config) as c2: + self.assertEqual(c1.connection, c2.connection) + + async def test_connection_recreate(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + c1 = client.connection + self.assertIsInstance(c1, Connection) + + await client.recreate() + + c2 = client.connection + self.assertIsInstance(c2, Connection) + + self.assertNotEqual(c1, c2) async def test_cursor(self): client = AiopgDatabaseClient.from_config(self.config) @@ -111,6 +132,14 @@ async def test_execute(self): execute_mock.call_args_list, ) + async def test_execute_disconnected(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + await client.close() + self.assertFalse(await client.is_connected()) + + await client.execute(self.operation) + self.assertTrue(await client.is_connected()) + async def test_execute_raises_unsupported(self): class _DatabaseOperation(DatabaseOperation): """For testing purposes.""" @@ -125,17 +154,44 @@ async def test_execute_raises_integrity(self): with self.assertRaises(IntegrityException): await client.execute(self.operation) + async def test_execute_raises_operational(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + with patch.object(Cursor, "execute", side_effect=(OperationalError, None)) as mock: + await client.execute(self.operation) + + self.assertEqual( + [ + call(operation=self.operation.query, parameters=self.operation.parameters), + call(operation=self.operation.query, parameters=self.operation.parameters), + ], + mock.call_args_list, + ) + async def test_fetch_one(self): async with AiopgDatabaseClient.from_config(self.config) as client: await client.execute(self.operation) observed = await client.fetch_one() self.assertIsInstance(observed, tuple) - async def test_fetch_one_raises(self): + async def test_fetch_one_raises_programming_empty(self): async with AiopgDatabaseClient.from_config(self.config) as client: with self.assertRaises(ProgrammingException): await client.fetch_one() + async def test_fetch_one_raises_programming(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + await client.execute(self.operation) + with patch.object(Cursor, "fetchone", side_effect=ProgrammingError): + with self.assertRaises(ProgrammingException): + await client.fetch_one() + + async def test_fetch_one_raises_operational(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + await client.execute(self.operation) + with patch.object(Cursor, "fetchone", side_effect=OperationalError): + with self.assertRaises(ConnectionException): + await client.fetch_one() + async def test_fetch_all(self): async with AiopgDatabaseClient.from_config(self.config) as client: await client.execute(self.operation) diff --git a/packages/plugins/minos-database-lmdb/minos/plugins/lmdb/clients.py b/packages/plugins/minos-database-lmdb/minos/plugins/lmdb/clients.py index 317728701..851457a34 100644 --- a/packages/plugins/minos-database-lmdb/minos/plugins/lmdb/clients.py +++ b/packages/plugins/minos-database-lmdb/minos/plugins/lmdb/clients.py @@ -73,9 +73,6 @@ def _close_environment(self) -> None: if self._environment is not None: self._environment.close() - async def _is_valid(self, **kwargs) -> bool: - return True - async def _reset(self, **kwargs) -> None: self._prefetched = None self._environment.sync()