Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
UUID,
)

from psycopg2 import (
IntegrityError,
)
from psycopg2.sql import (
SQL,
Composable,
Expand All @@ -24,6 +21,7 @@
NULL_UUID,
Config,
DatabaseMixin,
IntegrityException,
)

from ...exceptions import (
Expand All @@ -42,7 +40,7 @@ class PostgreSqlEventRepository(DatabaseMixin, EventRepository):

@classmethod
def _from_config(cls, config: Config, **kwargs) -> Optional[EventRepository]:
return super()._from_config(config, **config.get_database_by_name("event"), **kwargs)
return super()._from_config(config, database_key=None, **kwargs)

async def _setup(self):
"""Setup miscellaneous repository thing.
Expand All @@ -65,7 +63,7 @@ async def _submit(self, entry: EventEntry, **kwargs) -> EventEntry:

try:
response = await self.submit_query_and_fetchone(query, params, lock=lock)
except IntegrityError:
except IntegrityException:
raise EventRepositoryConflictException(
f"{entry!r} could not be submitted due to a key (uuid, version, transaction) collision",
await self.offset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class PostgreSqlSnapshotSetup(DatabaseMixin):

@classmethod
def _from_config(cls: Type[T], config: Config, **kwargs) -> T:
return cls(**config.get_database_by_name("snapshot"), **kwargs)
return cls(database_key=None, **kwargs)

async def _setup(self) -> None:
await self.submit_query('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";', lock="uuid-ossp")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def _from_config(cls, config: Config, **kwargs) -> PostgreSqlSnapshotRepository:
if "writer" not in kwargs:
kwargs["writer"] = PostgreSqlSnapshotWriter.from_config(config, **kwargs)

return cls(**config.get_database_by_name("snapshot"), **kwargs)
return cls(database_key=None, **kwargs)

async def _setup(self) -> None:
await self.writer.setup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,10 @@ async def find_entries(
qb = PostgreSqlSnapshotQueryBuilder(name, condition, ordering, limit, transaction_uuids, exclude_deleted)
query, parameters = qb.build()

async with self.cursor() as cursor:
# noinspection PyTypeChecker
await cursor.execute(query, parameters)
if streaming_mode:
async for row in cursor:
# noinspection PyArgumentList
yield SnapshotEntry(*row)
return
else:
rows = await cursor.fetchall()
for row in rows:
yield SnapshotEntry(*row)
async_iterable = self.submit_query_and_iter(query, parameters)
if streaming_mode:
async for row in async_iterable:
yield SnapshotEntry(*row)
else:
for row in [row async for row in async_iterable]:
yield SnapshotEntry(*row)
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class PostgreSqlTransactionRepository(DatabaseMixin, TransactionRepository):

@classmethod
def _from_config(cls, config: Config, **kwargs) -> Optional[PostgreSqlTransactionRepository]:
return super()._from_config(config, **config.get_database_by_name("transaction"), **kwargs)
return super()._from_config(config, database_key=None, **kwargs)

async def _setup(self):
await self.submit_query('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";', lock="uuid-ossp")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import unittest

import aiopg

from minos.aggregate import (
EventRepository,
PostgreSqlEventRepository,
)
from minos.common import (
AiopgDatabaseClient,
DatabaseClientPool,
)
from minos.common.testing import (
Expand All @@ -18,6 +17,7 @@
)


# noinspection SqlNoDataSourceInspection
class TestPostgreSqlEventRepositorySubmit(EventRepositorySubmitTestCase, PostgresAsyncTestCase):
__test__ = True

Expand All @@ -30,20 +30,18 @@ def test_constructor(self):
pool = DatabaseClientPool.from_config(self.config)
repository = PostgreSqlEventRepository(pool)
self.assertIsInstance(repository, PostgreSqlEventRepository)
self.assertEqual(pool, repository.pool)
self.assertIsInstance(repository.pool, DatabaseClientPool)

def test_from_config(self):
repository = PostgreSqlEventRepository.from_config(self.config)
repository_config = self.config.get_database_by_name("event")
self.assertEqual(repository_config["database"], repository.pool.database)
self.assertIsInstance(repository.pool, DatabaseClientPool)

async def test_setup(self):
async with aiopg.connect(**self.config.get_default_database()) as connection:
async with connection.cursor() as cursor:
await cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'aggregate_event');"
)
response = (await cursor.fetchone())[0]
async with AiopgDatabaseClient(**self.config.get_default_database()) as client:
await client.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'aggregate_event');"
)
response = (await client.fetch_one())[0]
self.assertTrue(response)


Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import unittest

import aiopg

from minos.aggregate import (
PostgreSqlSnapshotSetup,
)
from minos.common import (
AiopgDatabaseClient,
)
from minos.common.testing import (
PostgresAsyncTestCase,
)
Expand All @@ -13,27 +14,25 @@
)


# noinspection SqlNoDataSourceInspection
class TestPostgreSqlSnapshotSetup(AggregateTestCase, PostgresAsyncTestCase):
async def test_setup_snapshot_table(self):
async with PostgreSqlSnapshotSetup.from_config(self.config):
async with aiopg.connect(**self.config.get_default_database()) as connection:
async with connection.cursor() as cursor:
await cursor.execute(
"SELECT EXISTS (SELECT FROM pg_tables "
"WHERE schemaname = 'public' AND tablename = 'snapshot');"
)
observed = (await cursor.fetchone())[0]
async with AiopgDatabaseClient(**self.config.get_default_database()) as client:
await client.execute(
"SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = 'snapshot');"
)
observed = (await client.fetch_one())[0]
self.assertEqual(True, observed)

async def test_setup_snapshot_aux_offset_table(self):
async with PostgreSqlSnapshotSetup.from_config(self.config):
async with aiopg.connect(**self.config.get_default_database()) as connection:
async with connection.cursor() as cursor:
await cursor.execute(
"SELECT EXISTS (SELECT FROM pg_tables WHERE "
"schemaname = 'public' AND tablename = 'snapshot_aux_offset');"
)
observed = (await cursor.fetchone())[0]
async with AiopgDatabaseClient(**self.config.get_default_database()) as client:
await client.execute(
"SELECT EXISTS (SELECT FROM pg_tables WHERE "
"schemaname = 'public' AND tablename = 'snapshot_aux_offset');"
)
observed = (await client.fetch_one())[0]
self.assertEqual(True, observed)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
uuid4,
)

import aiopg
from psycopg2.extras import (
Json,
)
Expand All @@ -29,6 +28,7 @@
)
from minos.common import (
NULL_UUID,
AiopgDatabaseClient,
)
from minos.common.testing import (
PostgresAsyncTestCase,
Expand Down Expand Up @@ -426,8 +426,8 @@ async def test_build_complex(self):
self.assertEqual(self._flatten_parameters(expected_parameters), self._flatten_parameters(observed[1]))

async def _flatten_query(self, query) -> str:
async with aiopg.connect(**self.config.get_default_database()) as connection:
return query.as_string(connection.raw)
async with AiopgDatabaseClient(**self.config.get_default_database()) as client:
return query.as_string(client.connection.raw)

@staticmethod
def _flatten_parameters(parameters) -> dict:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
TransactionEntry,
TransactionStatus,
)
from minos.common import (
DatabaseClientPool,
)
from minos.common.testing import (
PostgresAsyncTestCase,
)
Expand Down Expand Up @@ -94,8 +97,7 @@ def test_type(self):

def test_from_config(self):
reader = PostgreSqlSnapshotReader.from_config(self.config)
snapshot_config = self.config.get_database_by_name("snapshot")
self.assertEqual(snapshot_config["database"], reader.pool.database)
self.assertIsInstance(reader.pool, DatabaseClientPool)

async def test_find_by_uuid(self):
condition = Condition.IN("uuid", [self.uuid_2, self.uuid_3])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
TransactionStatus,
)
from minos.common import (
DatabaseClientPool,
NotProvidedException,
current_datetime,
)
Expand Down Expand Up @@ -101,8 +102,7 @@ def test_type(self):
self.assertTrue(issubclass(PostgreSqlSnapshotWriter, PostgreSqlSnapshotSetup))

def test_from_config(self):
snapshot_config = self.config.get_database_by_name("snapshot")
self.assertEqual(snapshot_config["database"], self.writer.pool.database)
self.assertIsInstance(self.writer.pool, DatabaseClientPool)

def test_from_config_raises(self):
with self.assertRaises(NotProvidedException):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
uuid4,
)

import aiopg

from minos.aggregate import (
PostgreSqlTransactionRepository,
TransactionEntry,
Expand All @@ -13,6 +11,7 @@
TransactionStatus,
)
from minos.common import (
AiopgDatabaseClient,
DatabaseClientPool,
)
from minos.common.testing import (
Expand All @@ -23,6 +22,7 @@
)


# noinspection SqlNoDataSourceInspection
class TestPostgreSqlTransactionRepository(AggregateTestCase, PostgresAsyncTestCase):
def setUp(self) -> None:
super().setUp()
Expand Down Expand Up @@ -50,16 +50,14 @@ def test_constructor(self):

def test_from_config(self):
repository = PostgreSqlTransactionRepository.from_config(self.config)
repository_config = self.config.get_database_by_name("event")
self.assertEqual(repository_config["database"], repository.pool.database)
self.assertIsInstance(repository.pool, DatabaseClientPool)

async def test_setup(self):
async with aiopg.connect(**self.config.get_default_database()) as connection:
async with connection.cursor() as cursor:
await cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'aggregate_transaction');"
)
response = (await cursor.fetchone())[0]
async with AiopgDatabaseClient(**self.config.get_default_database()) as client:
await client.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'aggregate_transaction');"
)
response = (await client.fetch_one())[0]
self.assertTrue(response)

async def test_submit(self):
Expand Down
7 changes: 5 additions & 2 deletions packages/core/minos-microservice-aggregate/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,11 @@ def __init__(self, key=None, *args, **kwargs):
key = "fake"
super().__init__(key, *args, **kwargs)

async def __aexit__(self, exc_type, exc_val, exc_tb):
return
async def acquire(self) -> None:
"""For testing purposes."""

async def release(self):
"""For testing purposes."""


class FakeLockPool(LockPool):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@
MinosConfig,
)
from .database import (
AiopgDatabaseClient,
DatabaseClient,
DatabaseClientBuilder,
DatabaseClientException,
DatabaseClientPool,
DatabaseLock,
DatabaseLockPool,
DatabaseMixin,
IntegrityException,
PostgreSqlLock,
PostgreSqlLockPool,
PostgreSqlMinosDatabase,
PostgreSqlPool,
UnableToConnectException,
)
from .datetime import (
NULL_DATETIME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ def _get_database_by_name(self, prefix: str):
data = self.get_by_key(prefix)
data.pop("records", None)
data.pop("retry", None)
if "client" in data:
data["client"] = import_module(data["client"])
return data

def _get_discovery(self) -> dict[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ def _get_injections(self) -> list[Union[InjectableMixin, type[InjectableMixin]]]

def _get_databases(self) -> dict[str, dict[str, Any]]:
data = deepcopy(self.get_by_key("databases"))
for database in data.values():
if "client" in database:
database["client"] = import_module(database["client"])
return data

def _get_interfaces(self) -> dict[str, dict[str, Any]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
DatabaseMixin,
PostgreSqlMinosDatabase,
)
from .clients import (
AiopgDatabaseClient,
DatabaseClient,
DatabaseClientBuilder,
DatabaseClientException,
IntegrityException,
UnableToConnectException,
)
from .locks import (
DatabaseLock,
PostgreSqlLock,
Expand Down
Loading