Skip to content

Commit

Permalink
Merge pull request #240 from onefinestay/rabbit_error_reporting
Browse files Browse the repository at this point in the history
check amqp uri for better error messages
  • Loading branch information
davidszotten committed Apr 13, 2015
2 parents 07895c7 + e639780 commit 441fe29
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 3 deletions.
44 changes: 44 additions & 0 deletions nameko/amqp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from __future__ import absolute_import

import amqp
from kombu import Connection
from kombu.transport.pyamqp import Transport
import six


class ConnectionTester(amqp.Connection):
"""Kombu doesn't have any good facilities for diagnosing rabbit
connection errors, e.g. bad credentials, or unknown vhost. This hack
attempts some heuristic diagnosis"""

def __init__(self, *args, **kwargs):
try:
super(ConnectionTester, self).__init__(*args, **kwargs)
except IOError as exc:
if not hasattr(self, '_wait_tune_ok'):
raise
elif self._wait_tune_ok:
six.raise_from(IOError(
'Error connecting to broker, probably caused by invalid'
' credentials'
), exc)
else:
six.raise_from(IOError(
'Error connecting to broker, probably caused by using an'
' invalid or unauthorized vhost'
), exc)


class TestTransport(Transport):
Connection = ConnectionTester


def verify_amqp_uri(amqp_uri):
connection = Connection(amqp_uri)
if connection.transport_cls != 'amqp':
# Can't use these heuristics. Fall back to the existing error behaviour
return

transport = TestTransport(connection.transport.client)
with transport.establish_connection():
pass
4 changes: 4 additions & 0 deletions nameko/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from kombu.mixins import ConsumerMixin
import six

from nameko.amqp import verify_amqp_uri
from nameko.constants import DEFAULT_RETRY_POLICY, AMQP_URI_CONFIG_KEY
from nameko.exceptions import ContainerBeingKilled
from nameko.extensions import (
Expand Down Expand Up @@ -115,6 +116,8 @@ def setup(self):
exchange = self.exchange
queue = self.queue

verify_amqp_uri(self.amqp_uri)

with self.get_connection() as conn:
if queue is not None:
maybe_declare(queue, conn)
Expand Down Expand Up @@ -175,6 +178,7 @@ def _handle_thread_exited(self, gt):
def setup(self):
self.amqp_uri = self.container.config[AMQP_URI_CONFIG_KEY]
self.prefetch_count = self.container.max_workers
verify_amqp_uri(self.amqp_uri)

def start(self):
if not self._starting:
Expand Down
5 changes: 4 additions & 1 deletion nameko/standalone/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from kombu import Connection
from kombu.common import maybe_declare

from nameko.amqp import verify_amqp_uri
from nameko.containers import WorkerContext
from nameko.extensions import Entrypoint
from nameko.exceptions import RpcConnectionError, RpcTimeout
Expand Down Expand Up @@ -74,7 +75,9 @@ def _setup_queue(self):

def register_provider(self, provider):
self.provider = provider
self.connection = Connection(provider.container.config['AMQP_URI'])
amqp_uri = provider.container.config['AMQP_URI']
verify_amqp_uri(amqp_uri)
self.connection = Connection(amqp_uri)
self.queue = provider.queue
self._setup_queue()
message_iterator = self._poll_messages()
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"mock>=1.0.1",
"path.py>=6.2",
"requests>=1.2.0",
"six>=1.4.0",
"six>=1.9.0",
"werkzeug>=0.9",
],
extras_require={
Expand All @@ -31,6 +31,7 @@
"pylint==1.0.0",
"pytest==2.4.2",
"pytest-timeout==0.4",
"urllib3==1.10.2",
"websocket-client==0.23.0",
],
'docs': [
Expand Down
51 changes: 51 additions & 0 deletions test/test_amqp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import socket

import pytest
from urllib3.util import parse_url, Url

from nameko.amqp import verify_amqp_uri


@pytest.fixture
def uris(rabbit_config):
amqp_uri = rabbit_config['AMQP_URI']
scheme, auth, host, port, path, _, _ = parse_url(amqp_uri)
bad_port = Url(scheme, auth, host, port + 1, path).url
bad_user = Url(scheme, 'invalid:invalid', host, port, path).url
bad_vhost = Url(scheme, auth, host, port, '/unknown').url
return {
'good': amqp_uri,
'bad_port': bad_port,
'bad_user': bad_user,
'bad_vhost': bad_vhost,
}


def test_good(uris):
amqp_uri = uris['good']
verify_amqp_uri(amqp_uri)


def test_bad_user(uris):
amqp_uri = uris['bad_user']
with pytest.raises(IOError) as exc_info:
verify_amqp_uri(amqp_uri)
message = str(exc_info.value)
assert 'Error connecting to broker' in message
assert 'invalid credentials' in message


def test_bad_vhost(uris):
amqp_uri = uris['bad_vhost']
with pytest.raises(IOError) as exc_info:
verify_amqp_uri(amqp_uri)
message = str(exc_info.value)
assert 'Error connecting to broker' in message
assert 'invalid or unauthorized vhost' in message


def test_other_error(uris):
# other errors bubble
amqp_uri = uris['bad_port']
with pytest.raises(socket.error):
verify_amqp_uri(amqp_uri)
3 changes: 2 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ deps =
oldest: path.py==6.2
py27-oldest: requests==1.2.0
{py33,py34}-oldest: requests==2.0.0
oldest: six==1.4.0
oldest: six==1.9.0
oldest: werkzeug==0.9

pinned: eventlet==0.17.1
pinned: kombu==3.0.24
pinned: mock==1.0.1
pinned: path.py==7.2
pinned: requests==2.5.3
pinned: six==1.9.0
pinned: werkzeug==0.9.6

latest: eventlet # need something
Expand Down

0 comments on commit 441fe29

Please sign in to comment.