Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@ async def is_valid(self, **kwargs) -> bool:
"""
return await self._is_valid(**kwargs)

@abstractmethod
async def _is_valid(self, **kwargs) -> bool:
raise NotImplementedError
return True

async def _destroy(self) -> None:
await self.reset()
Expand Down
6 changes: 3 additions & 3 deletions packages/core/minos-microservice-common/minos/common/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,15 @@ def _get_pool_cls(self, type_: str) -> type[Pool]:


class _PoolBase(PoolBase, ABC):
def __init__(self, *args, maxsize: int = 10, recycle: Optional[int] = 300, **kwargs):
def __init__(self, *args, maxsize: int = 10, recycle: Optional[int] = None, **kwargs):
super().__init__(maxsize=maxsize, recycle=recycle)


class Pool(SetupMixin, _PoolBase, Generic[P], ABC):
"""Base class for Pool implementations in minos"""

def __init__(self, *args, maxsize: int = 10, recycle: Optional[int] = 300, already_setup: bool = True, **kwargs):
super().__init__(*args, maxsize=maxsize, recycle=recycle, already_setup=already_setup, **kwargs)
def __init__(self, *args, already_setup: bool = True, **kwargs):
super().__init__(*args, already_setup=already_setup, **kwargs)

# noinspection PyUnresolvedReferences
async def __acquire(self) -> Any: # pragma: no cover
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ def __init__(self, *args, **kwargs):
self.kwargs = kwargs
self._response = tuple()

async def _is_valid(self, **kwargs) -> bool:
"""For testing purposes"""
return True

async def _reset(self, **kwargs) -> None:
"""For testing purposes"""
self._response = tuple()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@
class _DatabaseClient(DatabaseClient):
"""For testing purposes."""

async def _is_valid(self, **kwargs) -> bool:
"""For testing purposes."""

async def _reset(self, **kwargs) -> None:
"""For testing purposes."""

Expand Down Expand Up @@ -76,7 +73,7 @@ def build_release(self, hashed_key: int) -> DatabaseOperation:
class TestDatabaseClient(CommonTestCase):
def test_abstract(self):
self.assertTrue(issubclass(DatabaseClient, (ABC, BuildableMixin)))
expected = {"_is_valid", "_execute", "_fetch_all", "_reset"}
expected = {"_execute", "_fetch_all", "_reset"}
# noinspection PyUnresolvedReferences
self.assertEqual(expected, DatabaseClient.__abstractmethods__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@
class BrokerClientPool(Pool):
"""Broker Client Pool class."""

def __init__(
self, instance_kwargs: dict[str, Any], maxsize: int = 5, recycle: Optional[int] = 3600, *args, **kwargs
):
super().__init__(maxsize=maxsize, recycle=recycle, *args, **kwargs)
def __init__(self, instance_kwargs: dict[str, Any], maxsize: int = 5, *args, **kwargs):
super().__init__(maxsize=maxsize, *args, **kwargs)
self._instance_kwargs = instance_kwargs

@classmethod
Expand Down
100 changes: 76 additions & 24 deletions packages/plugins/minos-database-aiopg/minos/plugins/aiopg/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,15 @@
)

import logging
from asyncio import (
TimeoutError,
)
from collections.abc import (
AsyncIterator,
Iterable,
)
from functools import (
partial,
)
from typing import (
Optional,
Expand All @@ -22,6 +29,7 @@
)

from minos.common import (
CircuitBreakerMixin,
ConnectionException,
DatabaseClient,
IntegrityException,
Expand All @@ -35,7 +43,7 @@
logger = logging.getLogger(__name__)


class AiopgDatabaseClient(DatabaseClient):
class AiopgDatabaseClient(DatabaseClient, CircuitBreakerMixin):
"""Aiopg Database Client class."""

_connection: Optional[Connection]
Expand All @@ -48,10 +56,17 @@ def __init__(
port: Optional[int] = None,
user: Optional[str] = None,
password: Optional[str] = None,
circuit_breaker_exceptions: Iterable[type] = tuple(),
connection_timeout: Optional[float] = None,
cursor_timeout: Optional[float] = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
super().__init__(
*args,
**kwargs,
circuit_breaker_exceptions=(ConnectionException, *circuit_breaker_exceptions),
)

if host is None:
host = "localhost"
Expand All @@ -61,43 +76,71 @@ def __init__(
user = "postgres"
if password is None:
password = ""
if connection_timeout is None:
connection_timeout = 1
if cursor_timeout is None:
cursor_timeout = 60

self._database = database
self._host = host
self._port = port
self._user = user
self._password = password

self._connection_timeout = connection_timeout
self._cursor_timeout = cursor_timeout

self._connection = None
self._cursor = None

async def _setup(self) -> None:
await super()._setup()
await self._create_connection()
await self.recreate()

async def _destroy(self) -> None:
await super()._destroy()
await self._close_connection()
await self.close()

async def recreate(self) -> None:
"""Recreate the database connection.

:return: This method does not return anything.
"""
await self.close()

async def _create_connection(self):
self._connection = await self.with_circuit_breaker(self._connect)
logger.debug(f"Created {self.database!r} database connection identified by {id(self._connection)}!")

async def _connect(self) -> Connection:
try:
self._connection = await aiopg.connect(
host=self.host, port=self.port, dbname=self.database, user=self.user, password=self.password
return await aiopg.connect(
timeout=self._connection_timeout,
host=self.host,
port=self.port,
dbname=self.database,
user=self.user,
password=self.password,
)
except OperationalError as exc:
msg = f"There was an {exc!r} while trying to get a database connection."
logger.warning(msg)
raise ConnectionException(msg)
except (OperationalError, TimeoutError) as exc:
raise ConnectionException(f"There was not possible to connect to the database: {exc!r}")

logger.debug(f"Created {self.database!r} database connection identified by {id(self._connection)}!")
async def close(self) -> None:
"""Close database connection.

async def _close_connection(self):
if self._connection is not None and not self._connection.closed:
:return: This method does not return anything.
"""
if await self.is_connected():
await self._connection.close()
self._connection = None
logger.debug(f"Destroyed {self.database!r} database connection identified by {id(self._connection)}!")

async def _is_valid(self) -> bool:
if self._connection is not None:
logger.debug(f"Destroyed {self.database!r} database connection identified by {id(self._connection)}!")
self._connection = None

async def is_connected(self) -> bool:
"""Check if the client is connected.

:return: ``True`` if it is connected or ``False`` otherwise.
"""
if self._connection is None:
return False

Expand All @@ -114,28 +157,37 @@ async def _reset(self, **kwargs) -> None:

# noinspection PyUnusedLocal
async def _fetch_all(self) -> AsyncIterator[tuple]:
await self._create_cursor()
if self._cursor is None:
raise ProgrammingException("An operation must be executed before fetching any value.")

try:
async for row in self._cursor:
yield row
except ProgrammingError as exc:
raise ProgrammingException(str(exc))
except OperationalError as exc:
raise ConnectionException(f"There was not possible to connect to the database: {exc!r}")

# noinspection PyUnusedLocal
async def _execute(self, operation: AiopgDatabaseOperation) -> None:
if not isinstance(operation, AiopgDatabaseOperation):
raise ValueError(f"The operation must be a {AiopgDatabaseOperation!r} instance. Obtained: {operation!r}")

await self._create_cursor()
fn = partial(self._execute_cursor, operation=operation.query, parameters=operation.parameters)
await self.with_circuit_breaker(fn)

async def _execute_cursor(self, operation: str, parameters: dict):
if not await self.is_connected():
await self.recreate()

self._cursor = await self._connection.cursor(timeout=self._cursor_timeout)
try:
await self._cursor.execute(operation=operation.query, parameters=operation.parameters)
await self._cursor.execute(operation=operation, parameters=parameters)
except OperationalError as exc:
raise ConnectionException(f"There was not possible to connect to the database: {exc!r}")
except IntegrityError as exc:
raise IntegrityException(f"The requested operation raised a integrity error: {exc!r}")

async def _create_cursor(self):
if self._cursor is None:
self._cursor = await self._connection.cursor()

async def _destroy_cursor(self, **kwargs):
if self._cursor is not None:
if not self._cursor.closed:
Expand Down
Loading