Skip to content

Commit

Permalink
feat: added option to specify error pubtopic (#102)
Browse files Browse the repository at this point in the history
* feat: added option to specify error pubtopic

* simplified; added tests and debug statements

* misc fixes

* autopropagate errors in other test modules

* maintain current ergo error queue auto delete status quo for now

* updated ergo graph

* updated version

* pinned importlib-metadata to 4.13.0

Co-authored-by: Zach Schubert <zschubert@nautiluslabs.co>
  • Loading branch information
xytxytxyt and zachschubert committed Oct 19, 2022
1 parent e9da646 commit 75788fe
Show file tree
Hide file tree
Showing 13 changed files with 109 additions and 11 deletions.
1 change: 1 addition & 0 deletions dev-requirements.txt
Expand Up @@ -12,3 +12,4 @@ requests
docker
pytest
pytest-timeout
importlib-metadata==4.13.0
2 changes: 2 additions & 0 deletions ergo/amqp_invoker.py
Expand Up @@ -125,6 +125,8 @@ def _handle_message_inner(self, message_in: Message) -> None:
message_in.traceback = str(err)
message_in.scope.metadata['timestamp'] = dt.isoformat()
self._publish(message_in, self._error_queue.name)
if self._invocable.config.error_pubtopic is not None:
self._publish(message_in, str(PubTopic(self._invocable.config.error_pubtopic)))

def _publish(self, ergo_message: Message, routing_key: str) -> None:
amqp_message = encodes(ergo_message).encode("utf-8")
Expand Down
10 changes: 10 additions & 0 deletions ergo/config.py
Expand Up @@ -22,6 +22,7 @@ def __init__(self, config: Dict[str, str]):
self._namespace: Optional[str] = config.get('namespace', 'local')
self._pubtopic: str = config.get('pubtopic')
self._subtopic: str = config.get('subtopic')
self._error_pubtopic: Optional[str] = config.get('error_pubtopic')
self._host: Optional[str] = config.get('host')
self._exchange: Optional[str] = config.get('exchange')
self._protocol: str = config.get('protocol', 'stack') # http, amqp, stdio, stack
Expand Down Expand Up @@ -86,6 +87,15 @@ def pubtopic(self, val: str) -> None:
"""
self._pubtopic = val

@property
def error_pubtopic(self) -> Optional[str]:
"""Summary.
Returns:
TYPE: Description
"""
return self._error_pubtopic

@property
def func(self) -> Optional[str]:
"""Summary.
Expand Down
9 changes: 8 additions & 1 deletion ergo/schematic.py
@@ -1,5 +1,6 @@
"""Summary."""
import glob
import itertools
import os
import sys
from typing import Dict, Generator, List, Tuple, Union
Expand Down Expand Up @@ -80,6 +81,9 @@ def topics(dot: graphviz.Digraph, configs: List[Dict[str, Union[None, str, List[
for topic_element in format_topic('pubtopic', config):
dot.edge(format_component(config)[0], topic_element[0])
dot.node(*topic_element, shape='box')
for topic_element in format_topic('error_pubtopic', config):
dot.edge(format_component(config)[0], topic_element[0])
dot.node(*topic_element, shape='octagon')
for topic_element in format_topic('subtopic', config):
dot.node(*topic_element, shape='box')
dot.edge(topic_element[0], format_component(config)[0])
Expand All @@ -94,7 +98,10 @@ def derived_topics(dot: graphviz.Digraph, configs: List[Dict[str, Union[None, st
"""
for pub in configs:
for sub in configs:
for pub_topic in format_topic('pubtopic', pub):
for pub_topic in itertools.chain(
format_topic('pubtopic', pub),
format_topic('error_pubtopic', pub),
):
for sub_topic in format_topic('subtopic', sub):
if sub_topic[0] == pub_topic[0]:
continue
Expand Down
2 changes: 1 addition & 1 deletion ergo/version.py
Expand Up @@ -7,7 +7,7 @@
import subprocess
import sys

VERSION = '0.10.1'
VERSION = '0.11.0'


def get_version() -> str:
Expand Down
2 changes: 1 addition & 1 deletion test/integration/conftest.py
Expand Up @@ -14,7 +14,7 @@ def amqp_broker():
start_rabbitmq_broker()


@pytest.fixture(autouse=True)
@pytest.fixture()
def propagate_amqp_errors():
with propagate_errors():
yield
Expand Down
9 changes: 8 additions & 1 deletion test/integration/test_amqp/test_amqp.py
@@ -1,9 +1,16 @@
from test.integration.utils.amqp import AMQPComponent, ComponentFailure
from test.integration.utils.amqp import AMQPComponent, ComponentFailure, propagate_errors

import pytest

from ergo.context import Context


@pytest.fixture(autouse=True)
def propagate_amqp_errors():
with propagate_errors():
yield


"""
test_product
"""
Expand Down
8 changes: 7 additions & 1 deletion test/integration/test_amqp/test_argument_binding.py
@@ -1,10 +1,16 @@
from test.integration.utils.amqp import AMQPComponent, ComponentFailure, Queue, publish
from test.integration.utils.amqp import AMQPComponent, ComponentFailure, Queue, publish, propagate_errors

import pytest

from ergo.context import Context


@pytest.fixture(autouse=True)
def propagate_amqp_errors():
with propagate_errors():
yield


"""
test_bind_falsey_argument
Expand Down
35 changes: 35 additions & 0 deletions test/integration/test_amqp/test_error_pubtopic.py
@@ -0,0 +1,35 @@
from ergo.message import Message

from test.integration.utils.amqp import AMQPComponent, Queue, publish


def assert_false():
assert False


def test_no_error_pubtopic():
component = AMQPComponent(assert_false)
error_queue = Queue(name=component.error_queue_name, auto_delete=False)
with component, error_queue:
publish({}, component.subtopic)
error_message = error_queue.get()

assert isinstance(error_message, Message)
assert error_message.error['type'] == 'AssertionError'


def test_error_pubtopic():
error_key = 'test.error_pubtopic'
component = AMQPComponent(assert_false, error_pubtopic=error_key)
error_queue = Queue(name=component.error_queue_name, auto_delete=False)
error_pubtopic = Queue(routing_key=error_key)
with component, error_queue, error_pubtopic:
publish({}, component.subtopic)

error_queue_message = error_queue.get()
assert isinstance(error_queue_message, Message)
assert error_queue_message.error['type'] == 'AssertionError'

error_pubtopic_message = error_pubtopic.get()
assert isinstance(error_pubtopic_message, Message)
assert error_pubtopic_message.error['type'] == 'AssertionError'
9 changes: 8 additions & 1 deletion test/integration/test_amqp/test_reply_to.py
@@ -1,10 +1,17 @@
from test.integration.utils.amqp import SHORT_TIMEOUT, AMQPComponent, Queue, publish
from test.integration.utils.amqp import SHORT_TIMEOUT, AMQPComponent, Queue, publish, propagate_errors
from typing import List, Optional

import pytest

from ergo.context import Context


@pytest.fixture(autouse=True)
def propagate_amqp_errors():
with propagate_errors():
yield


"""
test_shout
"""
Expand Down
10 changes: 9 additions & 1 deletion test/integration/test_amqp/test_scope.py
@@ -1,9 +1,17 @@
from test.integration.utils.amqp import AMQPComponent
from test.integration.utils.amqp import AMQPComponent, propagate_errors
from typing import Optional
import pytest

from ergo.context import Context
from ergo.scope import Scope


@pytest.fixture(autouse=True)
def propagate_amqp_errors():
with propagate_errors():
yield


"""
test_simple_scope
"""
Expand Down
10 changes: 9 additions & 1 deletion test/integration/test_gateway/test_gateway.py
@@ -1,7 +1,15 @@
from functools import partial
from multiprocessing.pool import ThreadPool
from test.integration.utils.amqp import AMQPComponent, await_components
from test.integration.utils.amqp import AMQPComponent, await_components, propagate_errors
from test.integration.utils.gateway import HTTPGateway
import pytest


@pytest.fixture(autouse=True)
def propagate_amqp_errors():
with propagate_errors():
yield


"""
test_double
Expand Down
13 changes: 10 additions & 3 deletions test/integration/utils/amqp.py
Expand Up @@ -130,10 +130,13 @@ def await_components(channel: Optional[Channel]=None):


class Queue:
def __init__(self, routing_key, name: Optional[str] = None, **kombu_opts):
def __init__(self, routing_key: Optional[str] = None, name: Optional[str] = None, **kombu_opts):
assert routing_key is not None or name is not None, 'routing_key and name cannot both be missing'
self.name = name or f"test:{routing_key}"
self.routing_key = routing_key
self._kombu_opts = {"auto_delete": True, "durable": False, **kombu_opts}
if 'auto_delete' not in kombu_opts:
kombu_opts['auto_delete'] = True
self._kombu_opts = {"durable": False, **kombu_opts}
self._in_context: bool = False

def get(self, block=True, timeout=LONG_TIMEOUT) -> Message:
Expand All @@ -145,7 +148,11 @@ def __enter__(self):
self._in_context = True
self._channel: Channel = CONNECTION.channel()
exchange = kombu.Exchange(EXCHANGE, type="topic", channel=self._channel)
self._spec = kombu.Queue(self.name, exchange=exchange, routing_key=str(SubTopic(self.routing_key)), no_ack=True, **self._kombu_opts)
if self.routing_key:
routing_key = str(SubTopic(self.routing_key))
else:
routing_key = None
self._spec = kombu.Queue(self.name, exchange=exchange, routing_key=routing_key, no_ack=True, **self._kombu_opts)
self._queue = kombu.simple.SimpleQueue(self._channel, self._spec, serializer="raw")
return self

Expand Down

0 comments on commit 75788fe

Please sign in to comment.