Skip to content

Commit

Permalink
Merge pull request #356 from minos-framework/0.6.1
Browse files Browse the repository at this point in the history
0.6.1
  • Loading branch information
Sergio García Prado committed Apr 1, 2022
2 parents 72cd513 + b86750e commit 4cd6aed
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 30 deletions.
52 changes: 51 additions & 1 deletion packages/core/minos-microservice-common/HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,54 @@ History
* Add `Object` base class with the purpose to avoid issues related with multi-inheritance and mixins.
* Add `Port` base class as the base class for ports.
* Add `CircuitBreakerMixin` class to provide circuit breaker functionalities.
* Add `SetupMixin` class as a replacement of the `MinosSetup` class.
* Add `SetupMixin` class as a replacement of the `MinosSetup` class.

### Update Guide (from 0.5.x to 0.6.0)

* Add `@Injectable` decorator to classes that injections:

```python
from minos.common import Injectable


@Injectable("THE_INJECTION_NAME")
class MyInjectableClass:
...
```

* Add `minos-http-aiohttp` package:

```shell
poetry add minos-http-aiohttp@^0.6
```

* Add `HttpConnector` instance to `service.injections` section of `config.yml` file:

```yaml
...
service:
injections:
http_connector: minos.plugins.aiohttp.AioHttpConnector
...
...
...
```

* Add `routers` section to `config.yml` file:

```yaml
...
routers:
- minos.networks.BrokerRouter
- minos.networks.PeriodicRouter
- minos.networks.RestHttpRouter
...
```

* Update `minos.common.Config` usages according to the new provided API:
* Most common issues come from calls like `config.query_repository._asdict()`, that must be transformed to `config.get_database_by_name("query")`

0.6.1 (2022-04-01)
------------------

* Fix bug that didn't show the correct exception traceback when microservice failures occurred.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__author__ = "Minos Framework Devs"
__email__ = "hey@minos.run"
__version__ = "0.6.0"
__version__ = "0.6.1"

from .builders import (
BuildableMixin,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def launch(self) -> NoReturn:
logger.info("Stopping microservice...")
exception = exc
except Exception as exc: # pragma: no cover
logger.exception("Stopping microservice due to an unhandled exception...")
exception = exc
finally:
self.graceful_shutdown(exception)
Expand All @@ -143,14 +144,14 @@ def graceful_launch(self) -> None:
:return: This method does not return anything.
"""
self.loop.run_until_complete(gather(self.setup(), self.entrypoint.__aenter__()))
self.loop.run_until_complete(self.setup())

def graceful_shutdown(self, err: Exception = None) -> None:
"""Shutdown the execution gracefully.
:return: This method does not return anything.
"""
self.loop.run_until_complete(gather(self.entrypoint.__aexit__(None, err, None), self.destroy()))
self.loop.run_until_complete(self.destroy())

@cached_property
def entrypoint(self) -> Entrypoint:
Expand Down Expand Up @@ -199,9 +200,10 @@ async def _setup(self) -> None:
:return: This method does not return anything.
"""
await self.injector.wire_and_setup_injections(
modules=self._external_modules + self._internal_modules, packages=self._external_packages
)
modules = self._external_modules + self._internal_modules
packages = self._external_packages
self.injector.wire_injections(modules=modules, packages=packages)
await gather(self.injector.setup_injections(), self.entrypoint.__aenter__())

@property
def _internal_modules(self) -> list[ModuleType]:
Expand All @@ -212,7 +214,8 @@ async def _destroy(self) -> None:
:return: This method does not return anything.
"""
await self.injector.unwire_and_destroy_injections()
await gather(self.entrypoint.__aexit__(None, None, None), self.injector.destroy_injections())
self.injector.unwire_injections()

@property
def injections(self) -> dict[str, InjectableMixin]:
Expand Down
2 changes: 1 addition & 1 deletion packages/core/minos-microservice-common/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "minos-microservice-common"
version = "0.6.0"
version = "0.6.1"
description = "The common core of the Minos Framework"
readme = "README.md"
repository = "https://github.com/minos-framework/minos-python"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import warnings
from unittest.mock import (
AsyncMock,
MagicMock,
call,
patch,
)
Expand Down Expand Up @@ -106,37 +107,67 @@ async def test_loop(self):
self.assertEqual(call(), mock_loop.call_args)

async def test_setup(self):
mock = AsyncMock()
self.launcher.injector.wire_and_setup_injections = mock
await self.launcher.setup()
wire_mock = MagicMock()
setup_mock = AsyncMock()
mock_entrypoint_aenter = AsyncMock()

self.launcher.injector.wire_injections = wire_mock
self.launcher.injector.setup_injections = setup_mock

with patch("minos.common.launchers._create_loop") as mock_loop:
loop = FakeLoop()
mock_loop.return_value = loop
with patch("minos.common.launchers._create_entrypoint") as mock_entrypoint:
entrypoint = FakeEntrypoint()
mock_entrypoint.return_value = entrypoint

entrypoint.__aenter__ = mock_entrypoint_aenter

await self.launcher.setup()

self.assertEqual(1, mock.call_count)
self.assertEqual(1, wire_mock.call_count)
import tests
from minos import (
common,
)

self.assertEqual(0, len(mock.call_args.args))
self.assertEqual(2, len(mock.call_args.kwargs))
observed = mock.call_args.kwargs["modules"]
self.assertEqual(0, len(wire_mock.call_args.args))
self.assertEqual(2, len(wire_mock.call_args.kwargs))
observed = wire_mock.call_args.kwargs["modules"]

self.assertIn(tests, observed)
self.assertIn(common, observed)

self.assertEqual(["tests"], mock.call_args.kwargs["packages"])
self.assertEqual(["tests"], wire_mock.call_args.kwargs["packages"])

await self.launcher.destroy()
self.assertEqual(1, setup_mock.call_count)
self.assertEqual(1, mock_entrypoint_aenter.call_count)

async def test_destroy(self):
self.launcher.injector.wire_and_setup_injections = AsyncMock()
self.launcher._setup = AsyncMock()
await self.launcher.setup()

mock = AsyncMock()
self.launcher.injector.unwire_and_destroy_injections = mock
await self.launcher.destroy()
destroy_mock = AsyncMock()
unwire_mock = MagicMock()
mock_entrypoint_aexit = AsyncMock()

self.launcher.injector.destroy_injections = destroy_mock
self.launcher.injector.unwire_injections = unwire_mock

with patch("minos.common.launchers._create_loop") as mock_loop:
loop = FakeLoop()
mock_loop.return_value = loop
with patch("minos.common.launchers._create_entrypoint") as mock_entrypoint:
entrypoint = FakeEntrypoint()
mock_entrypoint.return_value = entrypoint

entrypoint.__aexit__ = mock_entrypoint_aexit

await self.launcher.destroy()

self.assertEqual(1, mock.call_count)
self.assertEqual(call(), mock.call_args)
self.assertEqual(1, unwire_mock.call_count)
self.assertEqual(1, destroy_mock.call_count)
self.assertEqual(1, mock_entrypoint_aexit.call_count)

def test_launch(self):
mock_setup = AsyncMock()
Expand All @@ -157,8 +188,8 @@ def test_launch(self):

self.launcher.launch()

self.assertEqual(1, mock_entrypoint.call_count)
self.assertEqual(1, mock_loop.call_count)
self.assertEqual(1, mock_setup.call_count)
self.assertEqual(1, mock_destroy.call_count)


class TestEntryPointLauncherLoop(unittest.TestCase):
Expand Down
6 changes: 5 additions & 1 deletion packages/plugins/minos-broker-kafka/HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@
## 0.6.0 (2022-03-28)

* Add `KafkaCircuitBreakerMixin` to integrate `minos.common.CircuitBreakerMixin` into the `KafkaBrokerPublisher` and `KafkaBrokerSubscriber` classes to be tolerant to connection failures to `kafka`.
* Add `KafkaBrokerPublisherBuilder` and `KafkaBrokerBuilderMixin` classes to ease the building process.
* Add `KafkaBrokerPublisherBuilder` and `KafkaBrokerBuilderMixin` classes to ease the building process.

## 0.6.1 (2022-04-01)

* Improve `KafkaBrokerSubscriber`'s destroying process.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__author__ = "Minos Framework Devs"
__email__ = "hey@minos.run"
__version__ = "0.6.0"
__version__ = "0.6.1"

from .common import (
KafkaBrokerBuilderMixin,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from aiokafka import (
AIOKafkaConsumer,
ConsumerStoppedError,
)
from cached_property import (
cached_property,
Expand Down Expand Up @@ -166,7 +167,12 @@ def admin_client(self):
return KafkaAdminClient(bootstrap_servers=f"{self.host}:{self.port}")

async def _receive(self) -> BrokerMessage:
record = await self.client.getone()
try:
record = await self.client.getone()
except ConsumerStoppedError as exc:
if self.already_destroyed:
raise StopAsyncIteration
raise exc
bytes_ = record.value
message = BrokerMessage.from_avro_bytes(bytes_)
return message
Expand Down
2 changes: 1 addition & 1 deletion packages/plugins/minos-broker-kafka/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "minos-broker-kafka"
version = "0.6.0"
version = "0.6.1"
description = "The kafka plugin of the Minos Framework"
readme = "README.md"
repository = "https://github.com/minos-framework/minos-python"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from aiokafka import (
AIOKafkaConsumer,
ConsumerStoppedError,
)
from kafka import (
KafkaAdminClient,
Expand Down Expand Up @@ -206,6 +207,22 @@ async def test_receive(self):
self.assertEqual(messages[0], await subscriber.receive())
self.assertEqual(messages[1], await subscriber.receive())

async def test_receive_already_stopped_raises(self):
subscriber = KafkaBrokerSubscriber.from_config(CONFIG_FILE_PATH, topics={"foo", "bar"})
get_mock = AsyncMock(side_effect=ConsumerStoppedError)
subscriber.client.getone = get_mock

with self.assertRaises(StopAsyncIteration):
await subscriber.receive()

async def test_receive_stopped(self):
async with KafkaBrokerSubscriber.from_config(CONFIG_FILE_PATH, topics={"foo", "bar"}) as subscriber:
get_mock = AsyncMock(side_effect=ConsumerStoppedError)
subscriber.client.getone = get_mock

with self.assertRaises(ConsumerStoppedError):
await subscriber.receive()


class TestKafkaBrokerSubscriberBuilder(unittest.TestCase):
def setUp(self) -> None:
Expand Down

0 comments on commit 4cd6aed

Please sign in to comment.