From 6527dbb9944705331bedd33f8f245e063b9905d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Garc=C3=ADa=20Prado?= Date: Tue, 19 Apr 2022 11:40:18 +0200 Subject: [PATCH 1/3] ISSUE #43 * Implement circuit breaker for `aiopg` connection failures. --- .../minos/plugins/aiopg/clients.py | 58 ++++++++++++++----- .../tests/test_aiopg/test_clients.py | 37 ++++++++++-- 2 files changed, 76 insertions(+), 19 deletions(-) diff --git a/packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py b/packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py index 825c6b364..05d2b1c76 100644 --- a/packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py +++ b/packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py @@ -3,8 +3,12 @@ ) import logging +from asyncio import ( + TimeoutError, +) from collections.abc import ( AsyncIterator, + Iterable, ) from typing import ( Optional, @@ -22,6 +26,7 @@ ) from minos.common import ( + CircuitBreakerMixin, ConnectionException, DatabaseClient, IntegrityException, @@ -35,7 +40,7 @@ logger = logging.getLogger(__name__) -class AiopgDatabaseClient(DatabaseClient): +class AiopgDatabaseClient(DatabaseClient, CircuitBreakerMixin): """Aiopg Database Client class.""" _connection: Optional[Connection] @@ -48,10 +53,17 @@ def __init__( port: Optional[int] = None, user: Optional[str] = None, password: Optional[str] = None, + circuit_breaker_exceptions: Iterable[type] = tuple(), + connection_timeout: Optional[float] = None, + cursor_timeout: Optional[float] = None, *args, **kwargs, ): - super().__init__(*args, **kwargs) + super().__init__( + *args, + **kwargs, + circuit_breaker_exceptions=(OperationalError, TimeoutError, *circuit_breaker_exceptions), + ) if host is None: host = "localhost" @@ -61,6 +73,10 @@ def __init__( user = "postgres" if password is None: password = "" + if connection_timeout is None: + connection_timeout = 1 + if cursor_timeout is None: + cursor_timeout = 60 self._database = database self._host = host @@ -68,6 +84,9 @@ def __init__( self._user = user self._password = password + self._connection_timeout = connection_timeout + self._cursor_timeout = cursor_timeout + self._connection = None self._cursor = None @@ -80,19 +99,22 @@ async def _destroy(self) -> None: await self._close_connection() async def _create_connection(self): - try: - self._connection = await aiopg.connect( - host=self.host, port=self.port, dbname=self.database, user=self.user, password=self.password - ) - except OperationalError as exc: - msg = f"There was an {exc!r} while trying to get a database connection." - logger.warning(msg) - raise ConnectionException(msg) + self._connection = await self.with_circuit_breaker(self._connect) logger.debug(f"Created {self.database!r} database connection identified by {id(self._connection)}!") + async def _connect(self) -> Connection: + return await aiopg.connect( + timeout=self._connection_timeout, + host=self.host, + port=self.port, + dbname=self.database, + user=self.user, + password=self.password, + ) + async def _close_connection(self): - if self._connection is not None and not self._connection.closed: + if await self.is_valid(): await self._connection.close() self._connection = None logger.debug(f"Destroyed {self.database!r} database connection identified by {id(self._connection)}!") @@ -114,12 +136,18 @@ async def _reset(self, **kwargs) -> None: # noinspection PyUnusedLocal async def _fetch_all(self) -> AsyncIterator[tuple]: - await self._create_cursor() + if self._cursor is None: + raise ProgrammingException("An operation must be executed before fetching any value.") + try: async for row in self._cursor: yield row except ProgrammingError as exc: raise ProgrammingException(str(exc)) + except OperationalError as exc: + msg = f"There was an {exc!r} while trying to connect to the database." + logger.warning(msg) + raise ConnectionException(msg) # noinspection PyUnusedLocal async def _execute(self, operation: AiopgDatabaseOperation) -> None: @@ -131,10 +159,14 @@ async def _execute(self, operation: AiopgDatabaseOperation) -> None: await self._cursor.execute(operation=operation.query, parameters=operation.parameters) except IntegrityError as exc: raise IntegrityException(f"The requested operation raised a integrity error: {exc!r}") + except OperationalError as exc: + msg = f"There was an {exc!r} while trying to connect to the database." + logger.warning(msg) + raise ConnectionException(msg) async def _create_cursor(self): if self._cursor is None: - self._cursor = await self._connection.cursor() + self._cursor = await self._connection.cursor(timeout=self._cursor_timeout) async def _destroy_cursor(self, **kwargs): if self._cursor is not None: diff --git a/packages/plugins/minos-database-aiopg/tests/test_aiopg/test_clients.py b/packages/plugins/minos-database-aiopg/tests/test_aiopg/test_clients.py index efddd6269..e980edbf2 100644 --- a/packages/plugins/minos-database-aiopg/tests/test_aiopg/test_clients.py +++ b/packages/plugins/minos-database-aiopg/tests/test_aiopg/test_clients.py @@ -13,6 +13,7 @@ from psycopg2 import ( IntegrityError, OperationalError, + ProgrammingError, ) from minos.common import ( @@ -30,7 +31,7 @@ ) -# noinspection SqlNoDataSourceInspection +# noinspection SqlNoDataSourceInspection,SqlDialectInspection class TestAiopgDatabaseClient(AiopgTestCase): def setUp(self): super().setUp() @@ -79,10 +80,14 @@ async def test_connection(self): self.assertIsNone(client.connection) async def test_connection_raises(self): - with patch.object(aiopg, "connect", new_callable=PropertyMock, side_effect=OperationalError): - with self.assertRaises(ConnectionException): - async with AiopgDatabaseClient.from_config(self.config): - pass + async with AiopgDatabaseClient.from_config(self.config) as c1: + + async def _fn(): + return c1.connection + + with patch.object(aiopg, "connect", new_callable=PropertyMock, side_effect=(OperationalError, _fn())): + async with AiopgDatabaseClient.from_config(self.config) as c2: + self.assertEqual(c1.connection, c2.connection) async def test_cursor(self): client = AiopgDatabaseClient.from_config(self.config) @@ -125,17 +130,37 @@ async def test_execute_raises_integrity(self): with self.assertRaises(IntegrityException): await client.execute(self.operation) + async def test_execute_raises_operational(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + with patch.object(Cursor, "execute", side_effect=OperationalError): + with self.assertRaises(ConnectionException): + await client.execute(self.operation) + async def test_fetch_one(self): async with AiopgDatabaseClient.from_config(self.config) as client: await client.execute(self.operation) observed = await client.fetch_one() self.assertIsInstance(observed, tuple) - async def test_fetch_one_raises(self): + async def test_fetch_one_raises_programming_empty(self): async with AiopgDatabaseClient.from_config(self.config) as client: with self.assertRaises(ProgrammingException): await client.fetch_one() + async def test_fetch_one_raises_programming(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + await client.execute(self.operation) + with patch.object(Cursor, "fetchone", side_effect=ProgrammingError): + with self.assertRaises(ProgrammingException): + await client.fetch_one() + + async def test_fetch_one_raises_operational(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + await client.execute(self.operation) + with patch.object(Cursor, "fetchone", side_effect=OperationalError): + with self.assertRaises(ConnectionException): + await client.fetch_one() + async def test_fetch_all(self): async with AiopgDatabaseClient.from_config(self.config) as client: await client.execute(self.operation) From b90ddfe6bc63d12aaa91e72c42b363593816b235 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Garc=C3=ADa=20Prado?= Date: Tue, 19 Apr 2022 13:32:08 +0200 Subject: [PATCH 2/3] ISSUE #43 * Improve `AiopgDatabaseClient` adding support for connection recreation during breakages. --- .../minos/common/database/clients/abc.py | 3 +- .../minos/common/pools.py | 6 +-- .../minos/common/testing/database/clients.py | 4 -- .../test_database/test_clients/test_abc.py | 5 +- .../minos/networks/brokers/pools.py | 6 +-- .../minos/plugins/aiopg/clients.py | 53 ++++++++++++------- .../tests/test_aiopg/test_clients.py | 53 +++++++++++++++---- .../minos/plugins/lmdb/clients.py | 3 -- 8 files changed, 84 insertions(+), 49 deletions(-) diff --git a/packages/core/minos-microservice-common/minos/common/database/clients/abc.py b/packages/core/minos-microservice-common/minos/common/database/clients/abc.py index dc1f9e010..b878b301a 100644 --- a/packages/core/minos-microservice-common/minos/common/database/clients/abc.py +++ b/packages/core/minos-microservice-common/minos/common/database/clients/abc.py @@ -66,9 +66,8 @@ async def is_valid(self, **kwargs) -> bool: """ return await self._is_valid(**kwargs) - @abstractmethod async def _is_valid(self, **kwargs) -> bool: - raise NotImplementedError + return True async def _destroy(self) -> None: await self.reset() diff --git a/packages/core/minos-microservice-common/minos/common/pools.py b/packages/core/minos-microservice-common/minos/common/pools.py index c16c57176..3432db575 100644 --- a/packages/core/minos-microservice-common/minos/common/pools.py +++ b/packages/core/minos-microservice-common/minos/common/pools.py @@ -112,15 +112,15 @@ def _get_pool_cls(self, type_: str) -> type[Pool]: class _PoolBase(PoolBase, ABC): - def __init__(self, *args, maxsize: int = 10, recycle: Optional[int] = 300, **kwargs): + def __init__(self, *args, maxsize: int = 10, recycle: Optional[int] = None, **kwargs): super().__init__(maxsize=maxsize, recycle=recycle) class Pool(SetupMixin, _PoolBase, Generic[P], ABC): """Base class for Pool implementations in minos""" - def __init__(self, *args, maxsize: int = 10, recycle: Optional[int] = 300, already_setup: bool = True, **kwargs): - super().__init__(*args, maxsize=maxsize, recycle=recycle, already_setup=already_setup, **kwargs) + def __init__(self, *args, already_setup: bool = True, **kwargs): + super().__init__(*args, already_setup=already_setup, **kwargs) # noinspection PyUnresolvedReferences async def __acquire(self) -> Any: # pragma: no cover diff --git a/packages/core/minos-microservice-common/minos/common/testing/database/clients.py b/packages/core/minos-microservice-common/minos/common/testing/database/clients.py index cc6ec8aae..90b1b4d3a 100644 --- a/packages/core/minos-microservice-common/minos/common/testing/database/clients.py +++ b/packages/core/minos-microservice-common/minos/common/testing/database/clients.py @@ -21,10 +21,6 @@ def __init__(self, *args, **kwargs): self.kwargs = kwargs self._response = tuple() - async def _is_valid(self, **kwargs) -> bool: - """For testing purposes""" - return True - async def _reset(self, **kwargs) -> None: """For testing purposes""" self._response = tuple() diff --git a/packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/test_abc.py b/packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/test_abc.py index e7ee8f393..fbedcca2c 100644 --- a/packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/test_abc.py +++ b/packages/core/minos-microservice-common/tests/test_common/test_database/test_clients/test_abc.py @@ -33,9 +33,6 @@ class _DatabaseClient(DatabaseClient): """For testing purposes.""" - async def _is_valid(self, **kwargs) -> bool: - """For testing purposes.""" - async def _reset(self, **kwargs) -> None: """For testing purposes.""" @@ -76,7 +73,7 @@ def build_release(self, hashed_key: int) -> DatabaseOperation: class TestDatabaseClient(CommonTestCase): def test_abstract(self): self.assertTrue(issubclass(DatabaseClient, (ABC, BuildableMixin))) - expected = {"_is_valid", "_execute", "_fetch_all", "_reset"} + expected = {"_execute", "_fetch_all", "_reset"} # noinspection PyUnresolvedReferences self.assertEqual(expected, DatabaseClient.__abstractmethods__) diff --git a/packages/core/minos-microservice-networks/minos/networks/brokers/pools.py b/packages/core/minos-microservice-networks/minos/networks/brokers/pools.py index ecb298245..fecf1929c 100644 --- a/packages/core/minos-microservice-networks/minos/networks/brokers/pools.py +++ b/packages/core/minos-microservice-networks/minos/networks/brokers/pools.py @@ -30,10 +30,8 @@ class BrokerClientPool(Pool): """Broker Client Pool class.""" - def __init__( - self, instance_kwargs: dict[str, Any], maxsize: int = 5, recycle: Optional[int] = 3600, *args, **kwargs - ): - super().__init__(maxsize=maxsize, recycle=recycle, *args, **kwargs) + def __init__(self, instance_kwargs: dict[str, Any], maxsize: int = 5, *args, **kwargs): + super().__init__(maxsize=maxsize, *args, **kwargs) self._instance_kwargs = instance_kwargs @classmethod diff --git a/packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py b/packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py index 05d2b1c76..32ef122e6 100644 --- a/packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py +++ b/packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py @@ -10,6 +10,9 @@ AsyncIterator, Iterable, ) +from functools import ( + partial, +) from typing import ( Optional, ) @@ -92,15 +95,20 @@ def __init__( async def _setup(self) -> None: await super()._setup() - await self._create_connection() + await self.recreate() async def _destroy(self) -> None: await super()._destroy() - await self._close_connection() + await self.close() - async def _create_connection(self): - self._connection = await self.with_circuit_breaker(self._connect) + async def recreate(self) -> None: + """Recreate the database connection. + :return: This method does not return anything. + """ + await self.close() + + self._connection = await self.with_circuit_breaker(self._connect) logger.debug(f"Created {self.database!r} database connection identified by {id(self._connection)}!") async def _connect(self) -> Connection: @@ -113,13 +121,23 @@ async def _connect(self) -> Connection: password=self.password, ) - async def _close_connection(self): - if await self.is_valid(): + async def close(self) -> None: + """Close database connection. + + :return: This method does not return anything. + """ + if await self.is_connected(): await self._connection.close() - self._connection = None - logger.debug(f"Destroyed {self.database!r} database connection identified by {id(self._connection)}!") - async def _is_valid(self) -> bool: + if self._connection is not None: + logger.debug(f"Destroyed {self.database!r} database connection identified by {id(self._connection)}!") + self._connection = None + + async def is_connected(self) -> bool: + """Check if the client is connected. + + :return: ``True`` if it is connected or ``False`` otherwise. + """ if self._connection is None: return False @@ -154,19 +172,18 @@ async def _execute(self, operation: AiopgDatabaseOperation) -> None: if not isinstance(operation, AiopgDatabaseOperation): raise ValueError(f"The operation must be a {AiopgDatabaseOperation!r} instance. Obtained: {operation!r}") - await self._create_cursor() + fn = partial(self._execute_cursor, operation=operation.query, parameters=operation.parameters) try: - await self._cursor.execute(operation=operation.query, parameters=operation.parameters) + await self.with_circuit_breaker(fn) except IntegrityError as exc: raise IntegrityException(f"The requested operation raised a integrity error: {exc!r}") - except OperationalError as exc: - msg = f"There was an {exc!r} while trying to connect to the database." - logger.warning(msg) - raise ConnectionException(msg) - async def _create_cursor(self): - if self._cursor is None: - self._cursor = await self._connection.cursor(timeout=self._cursor_timeout) + async def _execute_cursor(self, operation: str, parameters: dict): + if not await self.is_connected(): + await self.recreate() + + self._cursor = await self._connection.cursor(timeout=self._cursor_timeout) + await self._cursor.execute(operation=operation, parameters=parameters) async def _destroy_cursor(self, **kwargs): if self._cursor is not None: diff --git a/packages/plugins/minos-database-aiopg/tests/test_aiopg/test_clients.py b/packages/plugins/minos-database-aiopg/tests/test_aiopg/test_clients.py index e980edbf2..8c1d935cc 100644 --- a/packages/plugins/minos-database-aiopg/tests/test_aiopg/test_clients.py +++ b/packages/plugins/minos-database-aiopg/tests/test_aiopg/test_clients.py @@ -54,23 +54,27 @@ def test_from_config(self): self.assertEqual(default_database["host"], client.host) self.assertEqual(default_database["port"], client.port) - async def test_is_valid_true(self): + async def test_is_valid(self): async with AiopgDatabaseClient.from_config(self.config) as client: self.assertTrue(await client.is_valid()) - async def test_is_valid_false_not_setup(self): + async def test_is_connected_true(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + self.assertTrue(await client.is_connected()) + + async def test_is_connected_false_not_setup(self): client = AiopgDatabaseClient.from_config(self.config) - self.assertFalse(await client.is_valid()) + self.assertFalse(await client.is_connected()) - async def test_is_valid_false_operational_error(self): + async def test_is_connected_false_operational_error(self): async with AiopgDatabaseClient.from_config(self.config) as client: with patch.object(Connection, "isolation_level", new_callable=PropertyMock, side_effect=OperationalError): - self.assertFalse(await client.is_valid()) + self.assertFalse(await client.is_connected()) - async def test_is_valid_false_closed(self): + async def test_is_connected_false_closed(self): async with AiopgDatabaseClient.from_config(self.config) as client: with patch.object(Connection, "closed", new_callable=PropertyMock, return_valud=False): - self.assertFalse(await client.is_valid()) + self.assertFalse(await client.is_connected()) async def test_connection(self): client = AiopgDatabaseClient.from_config(self.config) @@ -79,7 +83,7 @@ async def test_connection(self): self.assertIsInstance(client.connection, Connection) self.assertIsNone(client.connection) - async def test_connection_raises(self): + async def test_connection_with_circuit_breaker(self): async with AiopgDatabaseClient.from_config(self.config) as c1: async def _fn(): @@ -89,6 +93,18 @@ async def _fn(): async with AiopgDatabaseClient.from_config(self.config) as c2: self.assertEqual(c1.connection, c2.connection) + async def test_connection_recreate(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + c1 = client.connection + self.assertIsInstance(c1, Connection) + + await client.recreate() + + c2 = client.connection + self.assertIsInstance(c2, Connection) + + self.assertNotEqual(c1, c2) + async def test_cursor(self): client = AiopgDatabaseClient.from_config(self.config) self.assertIsNone(client.cursor) @@ -116,6 +132,14 @@ async def test_execute(self): execute_mock.call_args_list, ) + async def test_execute_disconnected(self): + async with AiopgDatabaseClient.from_config(self.config) as client: + await client.close() + self.assertFalse(await client.is_connected()) + + await client.execute(self.operation) + self.assertTrue(await client.is_connected()) + async def test_execute_raises_unsupported(self): class _DatabaseOperation(DatabaseOperation): """For testing purposes.""" @@ -132,9 +156,16 @@ async def test_execute_raises_integrity(self): async def test_execute_raises_operational(self): async with AiopgDatabaseClient.from_config(self.config) as client: - with patch.object(Cursor, "execute", side_effect=OperationalError): - with self.assertRaises(ConnectionException): - await client.execute(self.operation) + with patch.object(Cursor, "execute", side_effect=(OperationalError, None)) as mock: + await client.execute(self.operation) + + self.assertEqual( + [ + call(operation=self.operation.query, parameters=self.operation.parameters), + call(operation=self.operation.query, parameters=self.operation.parameters), + ], + mock.call_args_list, + ) async def test_fetch_one(self): async with AiopgDatabaseClient.from_config(self.config) as client: diff --git a/packages/plugins/minos-database-lmdb/minos/plugins/lmdb/clients.py b/packages/plugins/minos-database-lmdb/minos/plugins/lmdb/clients.py index 317728701..851457a34 100644 --- a/packages/plugins/minos-database-lmdb/minos/plugins/lmdb/clients.py +++ b/packages/plugins/minos-database-lmdb/minos/plugins/lmdb/clients.py @@ -73,9 +73,6 @@ def _close_environment(self) -> None: if self._environment is not None: self._environment.close() - async def _is_valid(self, **kwargs) -> bool: - return True - async def _reset(self, **kwargs) -> None: self._prefetched = None self._environment.sync() From a5d79af356dafb7b81e483ce1a7d5b4d79543a35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Garc=C3=ADa=20Prado?= Date: Wed, 20 Apr 2022 09:09:48 +0200 Subject: [PATCH 3/3] ISSUE #43 * Improve exception handling. --- .../minos/plugins/aiopg/clients.py | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py b/packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py index 32ef122e6..f49610012 100644 --- a/packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py +++ b/packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py @@ -65,7 +65,7 @@ def __init__( super().__init__( *args, **kwargs, - circuit_breaker_exceptions=(OperationalError, TimeoutError, *circuit_breaker_exceptions), + circuit_breaker_exceptions=(ConnectionException, *circuit_breaker_exceptions), ) if host is None: @@ -112,14 +112,17 @@ async def recreate(self) -> None: logger.debug(f"Created {self.database!r} database connection identified by {id(self._connection)}!") async def _connect(self) -> Connection: - return await aiopg.connect( - timeout=self._connection_timeout, - host=self.host, - port=self.port, - dbname=self.database, - user=self.user, - password=self.password, - ) + try: + return await aiopg.connect( + timeout=self._connection_timeout, + host=self.host, + port=self.port, + dbname=self.database, + user=self.user, + password=self.password, + ) + except (OperationalError, TimeoutError) as exc: + raise ConnectionException(f"There was not possible to connect to the database: {exc!r}") async def close(self) -> None: """Close database connection. @@ -163,9 +166,7 @@ async def _fetch_all(self) -> AsyncIterator[tuple]: except ProgrammingError as exc: raise ProgrammingException(str(exc)) except OperationalError as exc: - msg = f"There was an {exc!r} while trying to connect to the database." - logger.warning(msg) - raise ConnectionException(msg) + raise ConnectionException(f"There was not possible to connect to the database: {exc!r}") # noinspection PyUnusedLocal async def _execute(self, operation: AiopgDatabaseOperation) -> None: @@ -173,17 +174,19 @@ async def _execute(self, operation: AiopgDatabaseOperation) -> None: raise ValueError(f"The operation must be a {AiopgDatabaseOperation!r} instance. Obtained: {operation!r}") fn = partial(self._execute_cursor, operation=operation.query, parameters=operation.parameters) - try: - await self.with_circuit_breaker(fn) - except IntegrityError as exc: - raise IntegrityException(f"The requested operation raised a integrity error: {exc!r}") + await self.with_circuit_breaker(fn) async def _execute_cursor(self, operation: str, parameters: dict): if not await self.is_connected(): await self.recreate() self._cursor = await self._connection.cursor(timeout=self._cursor_timeout) - await self._cursor.execute(operation=operation, parameters=parameters) + try: + await self._cursor.execute(operation=operation, parameters=parameters) + except OperationalError as exc: + raise ConnectionException(f"There was not possible to connect to the database: {exc!r}") + except IntegrityError as exc: + raise IntegrityException(f"The requested operation raised a integrity error: {exc!r}") async def _destroy_cursor(self, **kwargs): if self._cursor is not None: