diff --git a/dev-requirements.txt b/dev-requirements.txt index 8141779d..a7b6262b 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -12,3 +12,4 @@ requests docker pytest pytest-timeout +importlib-metadata==4.13.0 diff --git a/ergo/amqp_invoker.py b/ergo/amqp_invoker.py index 53373db8..e4ae1653 100644 --- a/ergo/amqp_invoker.py +++ b/ergo/amqp_invoker.py @@ -125,6 +125,8 @@ def _handle_message_inner(self, message_in: Message) -> None: message_in.traceback = str(err) message_in.scope.metadata['timestamp'] = dt.isoformat() self._publish(message_in, self._error_queue.name) + if self._invocable.config.error_pubtopic is not None: + self._publish(message_in, str(PubTopic(self._invocable.config.error_pubtopic))) def _publish(self, ergo_message: Message, routing_key: str) -> None: amqp_message = encodes(ergo_message).encode("utf-8") diff --git a/ergo/config.py b/ergo/config.py index b0e70947..0ab66d3c 100644 --- a/ergo/config.py +++ b/ergo/config.py @@ -22,6 +22,7 @@ def __init__(self, config: Dict[str, str]): self._namespace: Optional[str] = config.get('namespace', 'local') self._pubtopic: str = config.get('pubtopic') self._subtopic: str = config.get('subtopic') + self._error_pubtopic: Optional[str] = config.get('error_pubtopic') self._host: Optional[str] = config.get('host') self._exchange: Optional[str] = config.get('exchange') self._protocol: str = config.get('protocol', 'stack') # http, amqp, stdio, stack @@ -86,6 +87,15 @@ def pubtopic(self, val: str) -> None: """ self._pubtopic = val + @property + def error_pubtopic(self) -> Optional[str]: + """Summary. + + Returns: + TYPE: Description + """ + return self._error_pubtopic + @property def func(self) -> Optional[str]: """Summary. diff --git a/ergo/schematic.py b/ergo/schematic.py index 5a366600..d1bca22b 100644 --- a/ergo/schematic.py +++ b/ergo/schematic.py @@ -1,5 +1,6 @@ """Summary.""" import glob +import itertools import os import sys from typing import Dict, Generator, List, Tuple, Union @@ -80,6 +81,9 @@ def topics(dot: graphviz.Digraph, configs: List[Dict[str, Union[None, str, List[ for topic_element in format_topic('pubtopic', config): dot.edge(format_component(config)[0], topic_element[0]) dot.node(*topic_element, shape='box') + for topic_element in format_topic('error_pubtopic', config): + dot.edge(format_component(config)[0], topic_element[0]) + dot.node(*topic_element, shape='octagon') for topic_element in format_topic('subtopic', config): dot.node(*topic_element, shape='box') dot.edge(topic_element[0], format_component(config)[0]) @@ -94,7 +98,10 @@ def derived_topics(dot: graphviz.Digraph, configs: List[Dict[str, Union[None, st """ for pub in configs: for sub in configs: - for pub_topic in format_topic('pubtopic', pub): + for pub_topic in itertools.chain( + format_topic('pubtopic', pub), + format_topic('error_pubtopic', pub), + ): for sub_topic in format_topic('subtopic', sub): if sub_topic[0] == pub_topic[0]: continue diff --git a/ergo/version.py b/ergo/version.py index 4aa8585c..9be134f4 100644 --- a/ergo/version.py +++ b/ergo/version.py @@ -7,7 +7,7 @@ import subprocess import sys -VERSION = '0.10.1' +VERSION = '0.11.0' def get_version() -> str: diff --git a/test/integration/conftest.py b/test/integration/conftest.py index 1a8bc2a9..82905e41 100644 --- a/test/integration/conftest.py +++ b/test/integration/conftest.py @@ -14,7 +14,7 @@ def amqp_broker(): start_rabbitmq_broker() -@pytest.fixture(autouse=True) +@pytest.fixture() def propagate_amqp_errors(): with propagate_errors(): yield diff --git a/test/integration/test_amqp/test_amqp.py b/test/integration/test_amqp/test_amqp.py index c19876ed..a815e7b5 100644 --- a/test/integration/test_amqp/test_amqp.py +++ b/test/integration/test_amqp/test_amqp.py @@ -1,9 +1,16 @@ -from test.integration.utils.amqp import AMQPComponent, ComponentFailure +from test.integration.utils.amqp import AMQPComponent, ComponentFailure, propagate_errors import pytest from ergo.context import Context + +@pytest.fixture(autouse=True) +def propagate_amqp_errors(): + with propagate_errors(): + yield + + """ test_product """ diff --git a/test/integration/test_amqp/test_argument_binding.py b/test/integration/test_amqp/test_argument_binding.py index 89993b99..2afd2a2b 100644 --- a/test/integration/test_amqp/test_argument_binding.py +++ b/test/integration/test_amqp/test_argument_binding.py @@ -1,10 +1,16 @@ -from test.integration.utils.amqp import AMQPComponent, ComponentFailure, Queue, publish +from test.integration.utils.amqp import AMQPComponent, ComponentFailure, Queue, publish, propagate_errors import pytest from ergo.context import Context +@pytest.fixture(autouse=True) +def propagate_amqp_errors(): + with propagate_errors(): + yield + + """ test_bind_falsey_argument diff --git a/test/integration/test_amqp/test_error_pubtopic.py b/test/integration/test_amqp/test_error_pubtopic.py new file mode 100644 index 00000000..9716b342 --- /dev/null +++ b/test/integration/test_amqp/test_error_pubtopic.py @@ -0,0 +1,35 @@ +from ergo.message import Message + +from test.integration.utils.amqp import AMQPComponent, Queue, publish + + +def assert_false(): + assert False + + +def test_no_error_pubtopic(): + component = AMQPComponent(assert_false) + error_queue = Queue(name=component.error_queue_name, auto_delete=False) + with component, error_queue: + publish({}, component.subtopic) + error_message = error_queue.get() + + assert isinstance(error_message, Message) + assert error_message.error['type'] == 'AssertionError' + + +def test_error_pubtopic(): + error_key = 'test.error_pubtopic' + component = AMQPComponent(assert_false, error_pubtopic=error_key) + error_queue = Queue(name=component.error_queue_name, auto_delete=False) + error_pubtopic = Queue(routing_key=error_key) + with component, error_queue, error_pubtopic: + publish({}, component.subtopic) + + error_queue_message = error_queue.get() + assert isinstance(error_queue_message, Message) + assert error_queue_message.error['type'] == 'AssertionError' + + error_pubtopic_message = error_pubtopic.get() + assert isinstance(error_pubtopic_message, Message) + assert error_pubtopic_message.error['type'] == 'AssertionError' diff --git a/test/integration/test_amqp/test_reply_to.py b/test/integration/test_amqp/test_reply_to.py index ef108c36..2f54a6a6 100644 --- a/test/integration/test_amqp/test_reply_to.py +++ b/test/integration/test_amqp/test_reply_to.py @@ -1,10 +1,17 @@ -from test.integration.utils.amqp import SHORT_TIMEOUT, AMQPComponent, Queue, publish +from test.integration.utils.amqp import SHORT_TIMEOUT, AMQPComponent, Queue, publish, propagate_errors from typing import List, Optional import pytest from ergo.context import Context + +@pytest.fixture(autouse=True) +def propagate_amqp_errors(): + with propagate_errors(): + yield + + """ test_shout """ diff --git a/test/integration/test_amqp/test_scope.py b/test/integration/test_amqp/test_scope.py index fbe088ee..d828176b 100644 --- a/test/integration/test_amqp/test_scope.py +++ b/test/integration/test_amqp/test_scope.py @@ -1,9 +1,17 @@ -from test.integration.utils.amqp import AMQPComponent +from test.integration.utils.amqp import AMQPComponent, propagate_errors from typing import Optional +import pytest from ergo.context import Context from ergo.scope import Scope + +@pytest.fixture(autouse=True) +def propagate_amqp_errors(): + with propagate_errors(): + yield + + """ test_simple_scope """ diff --git a/test/integration/test_gateway/test_gateway.py b/test/integration/test_gateway/test_gateway.py index f5bb3c21..294729a8 100644 --- a/test/integration/test_gateway/test_gateway.py +++ b/test/integration/test_gateway/test_gateway.py @@ -1,7 +1,15 @@ from functools import partial from multiprocessing.pool import ThreadPool -from test.integration.utils.amqp import AMQPComponent, await_components +from test.integration.utils.amqp import AMQPComponent, await_components, propagate_errors from test.integration.utils.gateway import HTTPGateway +import pytest + + +@pytest.fixture(autouse=True) +def propagate_amqp_errors(): + with propagate_errors(): + yield + """ test_double diff --git a/test/integration/utils/amqp.py b/test/integration/utils/amqp.py index 8e2b6477..e3dc791d 100644 --- a/test/integration/utils/amqp.py +++ b/test/integration/utils/amqp.py @@ -130,10 +130,13 @@ def await_components(channel: Optional[Channel]=None): class Queue: - def __init__(self, routing_key, name: Optional[str] = None, **kombu_opts): + def __init__(self, routing_key: Optional[str] = None, name: Optional[str] = None, **kombu_opts): + assert routing_key is not None or name is not None, 'routing_key and name cannot both be missing' self.name = name or f"test:{routing_key}" self.routing_key = routing_key - self._kombu_opts = {"auto_delete": True, "durable": False, **kombu_opts} + if 'auto_delete' not in kombu_opts: + kombu_opts['auto_delete'] = True + self._kombu_opts = {"durable": False, **kombu_opts} self._in_context: bool = False def get(self, block=True, timeout=LONG_TIMEOUT) -> Message: @@ -145,7 +148,11 @@ def __enter__(self): self._in_context = True self._channel: Channel = CONNECTION.channel() exchange = kombu.Exchange(EXCHANGE, type="topic", channel=self._channel) - self._spec = kombu.Queue(self.name, exchange=exchange, routing_key=str(SubTopic(self.routing_key)), no_ack=True, **self._kombu_opts) + if self.routing_key: + routing_key = str(SubTopic(self.routing_key)) + else: + routing_key = None + self._spec = kombu.Queue(self.name, exchange=exchange, routing_key=routing_key, no_ack=True, **self._kombu_opts) self._queue = kombu.simple.SimpleQueue(self._channel, self._spec, serializer="raw") return self