Skip to content

Commit

Permalink
make implementation compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
mattbennett committed Nov 22, 2018
1 parent 30ade73 commit 3a10df6
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 9 deletions.
19 changes: 11 additions & 8 deletions nameko_amqp_retry/rpc.py
@@ -1,13 +1,11 @@
import sys

from nameko.messaging import HeaderEncoder, QueueConsumer
from nameko.rpc import Rpc as NamekoRpc
from nameko.rpc import RpcConsumer as NamekoRpcConsumer

from nameko_amqp_retry import (
Backoff, BackoffPublisher, expect_backoff_exception)
from nameko_amqp_retry import Backoff, BackoffPublisher, expect_backoff_exception
from nameko_amqp_retry.constants import (
CALL_ID_STACK_HEADER_KEY, RPC_METHOD_ID_HEADER_KEY)
CALL_ID_STACK_HEADER_KEY, RPC_METHOD_ID_HEADER_KEY
)


class RpcConsumer(NamekoRpcConsumer):
Expand All @@ -27,10 +25,9 @@ def handle_message(self, body, message):
self.handle_result(message, None, exc_info)


class Rpc(NamekoRpc, HeaderEncoder):
class Rpc(NamekoRpc):

rpc_consumer = RpcConsumer()
queue_consumer = QueueConsumer()
backoff_publisher = BackoffPublisher()

def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -65,7 +62,13 @@ def handle_result(self, message, worker_ctx, result, exc_info):
self.backoff_publisher.republish(
exc, message, target_queue
)
self.queue_consumer.ack_message(message)
try:
# pylint: disable=no-member
self.rpc_consumer.consumer.ack_message(message)
except AttributeError:
# nameko 2.x backwards compatibilty
# pylint: disable=no-member
self.rpc_consumer.queue_consumer.ack_message(message)
return result, exc_info

except Backoff.Expired:
Expand Down
3 changes: 3 additions & 0 deletions test/__init__.py
@@ -1,4 +1,7 @@
import sys

import pkg_resources

PY3 = sys.version_info >= (3, 0)
PY34 = PY3 and sys.version_info[1] == 4
NAMEKO3 = pkg_resources.get_distribution("nameko").version.split('.')[0] == '3'
10 changes: 9 additions & 1 deletion test/test_retry.py
Expand Up @@ -19,6 +19,8 @@
from nameko_amqp_retry.messaging import consume
from nameko_amqp_retry.rpc import rpc

from test import NAMEKO3


class QuickBackoff(Backoff):
schedule = (100,)
Expand Down Expand Up @@ -324,7 +326,7 @@ def callback(worker_ctx, result, exc_info):
with entrypoint_waiter(container, 'method', callback=callback):
rpc_proxy.service.method("msg")

assert call_stacks == [
expected = [
[
'standalone_rpc_proxy.call.0',
'service.method.1'
Expand All @@ -349,6 +351,12 @@ def callback(worker_ctx, result, exc_info):
],
]

if NAMEKO3: # pragma: no cover
for stack in expected:
stack[0] = stack[0].replace("proxy", "client").replace("call", "0")

assert call_stacks == expected

@pytest.mark.usefixtures('predictable_call_ids')
def test_events_call_stack(self, container, dispatch_event):
""" Event handler backoff extends call stack
Expand Down

0 comments on commit 3a10df6

Please sign in to comment.