Skip to content

Commit

Permalink
Merge pull request #14 from olxbr/feature/hook-fail-ignore
Browse files Browse the repository at this point in the history
Hook can fail without crashing all process.
  • Loading branch information
gligneul committed Feb 27, 2020
2 parents 8cfeb26 + 04f9d88 commit 8653bd7
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 8 deletions.
2 changes: 1 addition & 1 deletion barterdude/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from barterdude.monitor import Monitor


class BarterDude():
class BarterDude:
def __init__( # nosec
self,
hostname: str = "127.0.0.1",
Expand Down
38 changes: 31 additions & 7 deletions barterdude/monitor.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,39 @@
from logging import getLogger
from asyncio import gather
from asyncworker.rabbitmq.message import RabbitMQMessage
from typing import Iterable, Callable, Optional, Any


class Monitor:
def __init__(self, *hooks):
def __init__(self, *hooks: Iterable):
self.__hooks = hooks
self.__logger = getLogger('barterdude')

async def dispatch_before_consume(self, message):
await gather(*[hook.before_consume(message) for hook in self.__hooks])
async def _callback(self,
method: Callable[[RabbitMQMessage], Optional[Any]],
message: RabbitMQMessage,
error: Optional[Exception] = None):
try:
return await (method(message, error) if error else method(message))
except Exception as e:
self.__logger.warning(f"Error on hook method {method}: %s", e)

async def dispatch_on_success(self, message):
await gather(*[hook.on_success(message) for hook in self.__hooks])
def _prepare_callbacks(self, method_name: str,
message: RabbitMQMessage,
error: Optional[Exception] = None):
callbacks = []
for hook in self.__hooks:
callbacks.append(
self._callback(getattr(hook, method_name), message, error)
)
return callbacks

async def dispatch_on_fail(self, message, error):
await gather(*[hook.on_fail(message, error) for hook in self.__hooks])
async def dispatch_before_consume(self, message: RabbitMQMessage):
await gather(*self._prepare_callbacks("before_consume", message))

async def dispatch_on_success(self, message: RabbitMQMessage):
await gather(*self._prepare_callbacks("on_success", message))

async def dispatch_on_fail(self, message: RabbitMQMessage,
error: Exception):
await gather(*self._prepare_callbacks("on_fail", message, error))
13 changes: 13 additions & 0 deletions tests_integration/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from barterdude.hooks import BaseHook
from asyncworker.rabbitmq.message import RabbitMQMessage


class ErrorHook(BaseHook):
async def on_success(self, message: RabbitMQMessage):
raise NotImplementedError

async def on_fail(self, message: RabbitMQMessage, error: Exception):
raise NotImplementedError

async def before_consume(self, message: RabbitMQMessage):
raise NotImplementedError
46 changes: 46 additions & 0 deletions tests_integration/test_rabbitmq_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from barterdude.hooks.healthcheck import Healthcheck
from barterdude.hooks.logging import Logging
from barterdude.hooks.metrics.prometheus import Prometheus
from helpers import ErrorHook
from asyncworker.connections import AMQPConnection
from random import choices
from string import ascii_uppercase
Expand Down Expand Up @@ -87,6 +88,25 @@ async def handler(message):

await self.app.shutdown()

async def test_process_messages_successfully_even_with_crashed_hook(self):
received_messages = set()

monitor = Monitor(ErrorHook())
@self.app.consume_amqp([self.input_queue], coroutines=1,
monitor=monitor)
async def handler(message):
nonlocal received_messages
received_messages.add(message.body["key"])

await self.app.startup()
await self.send_all_messages()
await asyncio.sleep(1)

for message in self.messages:
self.assertTrue(message["key"] in received_messages)

await self.app.shutdown()

async def test_process_one_message_and_publish(self):
@self.app.consume_amqp([self.input_queue], coroutines=1)
async def forward(message):
Expand Down Expand Up @@ -226,6 +246,32 @@ async def handler(message):

await self.app.shutdown()

async def test_obtains_healthcheck_even_with_crashed_hook(self):
monitor = Monitor(ErrorHook(), Healthcheck(self.app))

@self.app.consume_amqp([self.input_queue], monitor)
async def handler(message):
pass

await self.app.startup()
await self.queue_manager.put(
routing_key=self.input_queue,
data=self.messages[0]
)
await asyncio.sleep(1)

async with aiohttp.ClientSession() as session:
timeout = aiohttp.ClientTimeout(total=1)
url = 'http://localhost:8080/healthcheck'
async with session.get(url, timeout=timeout) as response:
status_code = response.status
text = await response.text()

self.assertEquals(status_code, 200)
self.assertEquals(text, "Bater like a pro! Success rate: 1.0")

await self.app.shutdown()

async def test_obtains_prometheus_metrics(self):
labels = {"app_name": "barterdude_consumer"}
monitor = Monitor(Prometheus(self.app, labels))
Expand Down

0 comments on commit 8653bd7

Please sign in to comment.