Skip to content

Commit

Permalink
Merge pull request #185 from onefinestay/rework_standalone_event_disp…
Browse files Browse the repository at this point in the history
…acher

standalone event dispatcher to connect on demand
  • Loading branch information
davidszotten committed Dec 30, 2014
2 parents 6cef7cd + 42bc7ec commit 8a96cae
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 37 deletions.
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Unreleased
* Rename the standalone rpc proxy to `ServiceRpcProxy` and add a
`ClusterRpcProxy`, using a single reply queue for communicating with multiple
remote services.
* Make the standalone event dispatcher more shell-friendly, connecting on
demand


Version 1.14.0
Expand Down
4 changes: 2 additions & 2 deletions nameko/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ def _run_worker(self, worker_ctx, handle_result):
with _log_time('ran handler for %s', worker_ctx):
result = method(*worker_ctx.args, **worker_ctx.kwargs)
except Exception as exc:
_log.debug('error handling worker %s: %s', worker_ctx, exc,
exc_info=True)
_log.info('error handling worker %s: %s', worker_ctx, exc,
exc_info=True)
exc_info = sys.exc_info()

if handle_result is not None:
Expand Down
39 changes: 21 additions & 18 deletions nameko/standalone/events.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,41 @@
from contextlib import contextmanager

from kombu.common import maybe_declare
from kombu.pools import producers, connections
from kombu import Connection

from nameko.constants import DEFAULT_RETRY_POLICY
from nameko.events import get_event_exchange
from nameko.events import get_event_exchange, Event
from nameko.messaging import AMQP_URI_CONFIG_KEY


@contextmanager
def event_dispatcher(container_service_name, nameko_config, **kwargs):
""" Yield a function that dispatches events claiming to originate from
def event_dispatcher(nameko_config, **kwargs):
""" Returns a function that dispatches events claiming to originate from
a service called `container_service_name`.
Enables services not hosted by nameko to dispatch events into a nameko
cluster.
"""
conn = Connection(nameko_config[AMQP_URI_CONFIG_KEY])
exchange = get_event_exchange(container_service_name)

kwargs = kwargs.copy()
retry = kwargs.pop('retry', True)
retry_policy = kwargs.pop('retry_policy', DEFAULT_RETRY_POLICY)

with connections[conn].acquire(block=True) as connection:
maybe_declare(exchange, connection)
def dispatch(service_name, event_type, event_data):
conn = Connection(nameko_config[AMQP_URI_CONFIG_KEY])

with producers[conn].acquire(block=True) as producer:
exchange = get_event_exchange(service_name)
if isinstance(event_type, type) and issubclass(event_type, Event):
event_type = event_type.type

def dispatch(evt):
msg = evt.data
routing_key = evt.type
with connections[conn].acquire(block=True) as connection:
maybe_declare(exchange, connection)
with producers[conn].acquire(block=True) as producer:
msg = event_data
routing_key = event_type
producer.publish(
msg, exchange=exchange, routing_key=routing_key,
retry=retry, retry_policy=retry_policy, **kwargs)

yield dispatch
msg,
exchange=exchange,
routing_key=routing_key,
retry=retry,
retry_policy=retry_policy,
**kwargs)
return dispatch
6 changes: 3 additions & 3 deletions test/standalone/test_event_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_dispatch(container_factory, rabbit_config):

msg = "msg"

with event_dispatcher('srcservice', config) as dispatch:
with entrypoint_waiter(container, 'handler', timeout=1):
dispatch(TestEvent(msg))
dispatch = event_dispatcher(config)
with entrypoint_waiter(container, 'handler', timeout=1):
dispatch('srcservice', TestEvent.type, msg)
handler_called.assert_called_once_with(msg)
4 changes: 2 additions & 2 deletions test/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ def test_service_disconnect_with_active_async_worker(

# dispatch an event
data = uuid.uuid4().hex
with event_dispatcher('srcservice', rabbit_config) as dispatch:
dispatch(ExampleEvent(data))
dispatch = event_dispatcher(rabbit_config)
dispatch('srcservice', ExampleEvent, data)

# `handle` will have been called twice with the same the `data`, because
# rabbit will have redelivered the un-ack'd message from the first call
Expand Down
4 changes: 2 additions & 2 deletions test/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,8 @@ def test_custom_event_handler(rabbit_manager, rabbit_config, start_containers):
start_containers(CustomHandler, ('custom-events',))

payload = {'custom': 'data'}
with standalone_dispatcher('srcservice', rabbit_config) as dispatch:
dispatch(ExampleEvent(payload))
dispatch = standalone_dispatcher(rabbit_config)
dispatch('srcservice', ExampleEvent.type, payload)

# wait for it to arrive
with eventlet.timeout.Timeout(EVENTS_TIMEOUT):
Expand Down
8 changes: 4 additions & 4 deletions test/test_service_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ def check_consumers():

# test events (both services will receive if in "broadcast" mode)
event_data = "msg"
with event_dispatcher('srcservice', rabbit_config) as dispatch:
dispatch(TestEvent(event_data))
dispatch = event_dispatcher(rabbit_config)
dispatch('srcservice', TestEvent, event_data)

with eventlet.Timeout(1):
while len(received) < 2:
Expand Down Expand Up @@ -240,8 +240,8 @@ def test_runner_with_duplicate_services(runner_factory, rabbit_config):

# test events (only one service is hosted)
event_data = "msg"
with event_dispatcher('srcservice', rabbit_config) as dispatch:
dispatch(TestEvent(event_data))
dispatch = event_dispatcher(rabbit_config)
dispatch('srcservice', TestEvent, event_data)

with eventlet.Timeout(1):
while len(received) == 0:
Expand Down
12 changes: 6 additions & 6 deletions test/testing/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,10 @@ def handler_two(self, msg):

# dispatch an event to handler_two
msg = "msg"
with event_dispatcher('srcservice', rabbit_config) as dispatch:
dispatch = event_dispatcher(rabbit_config)

with entrypoint_waiter(container, 'handler_two'):
dispatch(ExampleEvent(msg))
with entrypoint_waiter(container, 'handler_two'):
dispatch('srcservice', ExampleEvent, msg)

# method_called should have exactly one call, derived from the event
# handler and not from the disabled @once entrypoint
Expand Down Expand Up @@ -368,9 +368,9 @@ def test_entrypoint_waiter(container_factory, rabbit_config):
class ExampleEvent(Event):
type = "eventtype"

with event_dispatcher('srcservice', rabbit_config) as dispatch:
with entrypoint_waiter(container, 'handle'):
dispatch(ExampleEvent(""))
dispatch = event_dispatcher(rabbit_config)
with entrypoint_waiter(container, 'handle'):
dispatch('srcservice', ExampleEvent, "")


def test_entrypoint_waiter_bad_entrypoint(container_factory, rabbit_config):
Expand Down

0 comments on commit 8a96cae

Please sign in to comment.