Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added option to specify error pubtopic #102

Merged
merged 8 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions ergo/amqp_invoker.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ def _handle_message_inner(self, message_in: Message) -> None:
message_in.traceback = str(err)
message_in.scope.metadata['timestamp'] = dt.isoformat()
self._publish(message_in, self._error_queue.name)
if self._invocable.config.error_pubtopic is not None:
self._publish(message_in, str(PubTopic(self._invocable.config.error_pubtopic)))

def _publish(self, ergo_message: Message, routing_key: str) -> None:
amqp_message = encodes(ergo_message).encode("utf-8")
Expand Down
10 changes: 10 additions & 0 deletions ergo/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(self, config: Dict[str, str]):
self._namespace: Optional[str] = config.get('namespace', 'local')
self._pubtopic: str = config.get('pubtopic')
self._subtopic: str = config.get('subtopic')
self._error_pubtopic: Optional[str] = config.get('error_pubtopic')
self._host: Optional[str] = config.get('host')
self._exchange: Optional[str] = config.get('exchange')
self._protocol: str = config.get('protocol', 'stack') # http, amqp, stdio, stack
Expand Down Expand Up @@ -86,6 +87,15 @@ def pubtopic(self, val: str) -> None:
"""
self._pubtopic = val

@property
def error_pubtopic(self) -> Optional[str]:
"""Summary.

Returns:
TYPE: Description
"""
return self._error_pubtopic

@property
def func(self) -> Optional[str]:
"""Summary.
Expand Down
9 changes: 8 additions & 1 deletion ergo/schematic.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Summary."""
import glob
import itertools
import os
import sys
from typing import Dict, Generator, List, Tuple, Union
Expand Down Expand Up @@ -80,6 +81,9 @@ def topics(dot: graphviz.Digraph, configs: List[Dict[str, Union[None, str, List[
for topic_element in format_topic('pubtopic', config):
dot.edge(format_component(config)[0], topic_element[0])
dot.node(*topic_element, shape='box')
for topic_element in format_topic('error_pubtopic', config):
dot.edge(format_component(config)[0], topic_element[0])
dot.node(*topic_element, shape='octagon')
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chose octagon because it is used for stop signs: http://www.trafficsign.us/signshape.html

sample:
image

for topic_element in format_topic('subtopic', config):
dot.node(*topic_element, shape='box')
dot.edge(topic_element[0], format_component(config)[0])
Expand All @@ -94,7 +98,10 @@ def derived_topics(dot: graphviz.Digraph, configs: List[Dict[str, Union[None, st
"""
for pub in configs:
for sub in configs:
for pub_topic in format_topic('pubtopic', pub):
for pub_topic in itertools.chain(
format_topic('pubtopic', pub),
format_topic('error_pubtopic', pub),
):
for sub_topic in format_topic('subtopic', sub):
if sub_topic[0] == pub_topic[0]:
continue
Expand Down
2 changes: 1 addition & 1 deletion ergo/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import subprocess
import sys

VERSION = '0.10.1'
VERSION = '0.11.0'


def get_version() -> str:
Expand Down
2 changes: 1 addition & 1 deletion test/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def amqp_broker():
start_rabbitmq_broker()


@pytest.fixture(autouse=True)
@pytest.fixture()
def propagate_amqp_errors():
with propagate_errors():
yield
Expand Down
9 changes: 8 additions & 1 deletion test/integration/test_amqp/test_amqp.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
from test.integration.utils.amqp import AMQPComponent, ComponentFailure
from test.integration.utils.amqp import AMQPComponent, ComponentFailure, propagate_errors

import pytest

from ergo.context import Context


@pytest.fixture(autouse=True)
def propagate_amqp_errors():
with propagate_errors():
yield


"""
test_product
"""
Expand Down
8 changes: 7 additions & 1 deletion test/integration/test_amqp/test_argument_binding.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
from test.integration.utils.amqp import AMQPComponent, ComponentFailure, Queue, publish
from test.integration.utils.amqp import AMQPComponent, ComponentFailure, Queue, publish, propagate_errors

import pytest

from ergo.context import Context


@pytest.fixture(autouse=True)
def propagate_amqp_errors():
with propagate_errors():
yield


"""
test_bind_falsey_argument

Expand Down
35 changes: 35 additions & 0 deletions test/integration/test_amqp/test_error_pubtopic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from ergo.message import Message

from test.integration.utils.amqp import AMQPComponent, Queue, publish


def assert_false():
assert False


def test_no_error_pubtopic():
component = AMQPComponent(assert_false)
error_queue = Queue(name=component.error_queue_name, auto_delete=False)
with component, error_queue:
publish({}, component.subtopic)
error_message = error_queue.get()

assert isinstance(error_message, Message)
assert error_message.error['type'] == 'AssertionError'


def test_error_pubtopic():
error_key = 'test.error_pubtopic'
component = AMQPComponent(assert_false, error_pubtopic=error_key)
error_queue = Queue(name=component.error_queue_name, auto_delete=False)
error_pubtopic = Queue(routing_key=error_key)
with component, error_queue, error_pubtopic:
publish({}, component.subtopic)

error_queue_message = error_queue.get()
assert isinstance(error_queue_message, Message)
assert error_queue_message.error['type'] == 'AssertionError'

error_pubtopic_message = error_pubtopic.get()
assert isinstance(error_pubtopic_message, Message)
assert error_pubtopic_message.error['type'] == 'AssertionError'
9 changes: 8 additions & 1 deletion test/integration/test_amqp/test_reply_to.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
from test.integration.utils.amqp import SHORT_TIMEOUT, AMQPComponent, Queue, publish
from test.integration.utils.amqp import SHORT_TIMEOUT, AMQPComponent, Queue, publish, propagate_errors
from typing import List, Optional

import pytest

from ergo.context import Context


@pytest.fixture(autouse=True)
def propagate_amqp_errors():
with propagate_errors():
yield


"""
test_shout
"""
Expand Down
10 changes: 9 additions & 1 deletion test/integration/test_amqp/test_scope.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
from test.integration.utils.amqp import AMQPComponent
from test.integration.utils.amqp import AMQPComponent, propagate_errors
from typing import Optional
import pytest

from ergo.context import Context
from ergo.scope import Scope


@pytest.fixture(autouse=True)
def propagate_amqp_errors():
with propagate_errors():
yield


"""
test_simple_scope
"""
Expand Down
10 changes: 9 additions & 1 deletion test/integration/test_gateway/test_gateway.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
from functools import partial
from multiprocessing.pool import ThreadPool
from test.integration.utils.amqp import AMQPComponent, await_components
from test.integration.utils.amqp import AMQPComponent, await_components, propagate_errors
from test.integration.utils.gateway import HTTPGateway
import pytest


@pytest.fixture(autouse=True)
def propagate_amqp_errors():
with propagate_errors():
yield


"""
test_double
Expand Down
13 changes: 10 additions & 3 deletions test/integration/utils/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,13 @@ def await_components(channel: Optional[Channel]=None):


class Queue:
def __init__(self, routing_key, name: Optional[str] = None, **kombu_opts):
def __init__(self, routing_key: Optional[str] = None, name: Optional[str] = None, **kombu_opts):
assert routing_key is not None or name is not None, 'routing_key and name cannot both be missing'
self.name = name or f"test:{routing_key}"
self.routing_key = routing_key
self._kombu_opts = {"auto_delete": True, "durable": False, **kombu_opts}
if 'auto_delete' not in kombu_opts:
kombu_opts['auto_delete'] = True
self._kombu_opts = {"durable": False, **kombu_opts}
self._in_context: bool = False

def get(self, block=True, timeout=LONG_TIMEOUT) -> Message:
Expand All @@ -145,7 +148,11 @@ def __enter__(self):
self._in_context = True
self._channel: Channel = CONNECTION.channel()
exchange = kombu.Exchange(EXCHANGE, type="topic", channel=self._channel)
self._spec = kombu.Queue(self.name, exchange=exchange, routing_key=str(SubTopic(self.routing_key)), no_ack=True, **self._kombu_opts)
if self.routing_key:
routing_key = str(SubTopic(self.routing_key))
else:
routing_key = None
self._spec = kombu.Queue(self.name, exchange=exchange, routing_key=routing_key, no_ack=True, **self._kombu_opts)
self._queue = kombu.simple.SimpleQueue(self._channel, self._spec, serializer="raw")
return self

Expand Down