Skip to content
Browse files

Merge remote-tracking branch 'upstream/master'

  • Loading branch information...
2 parents e65062e + 1148c3f commit 64af09d1e987d3a0fe84347d4a6b663c4db85f3d @joestump joestump committed
View
2 AUTHORS
@@ -7,6 +7,7 @@ Adam Nelson <adam@varud.com>
Adam Wentz
Alexandre Bourget <alexandre.bourget@savoirfairelinux.com>
Andrew Watts
+Andrii Kostenko <andrey@kostenko.name>
Andy McCurdy <andy@andymccurdy.com>
Anton Gyllenberg <anton@iki.fi>
Ask Solem <ask@celeryproject.org>
@@ -33,6 +34,7 @@ John Spray <jcspray@gmail.com>
Keith Fitzgerald <ghostrocket@me.com>
Marcin Lulek (ergo) <info@webreactor.eu>
Mher Movsisyan <mher.movsisyan@gmail.com>
+Michael Barrett <mb@eventbrite.com>
Nitzan Miron <bug.assembla@bugbug.me>
Noah Kantrowitz <noah@coderanger.net>
Pascal Hartig <phartig@rdrei.net>
View
24 Changelog
@@ -2,6 +2,30 @@
Change history
================
+.. _version-2.1.5:
+
+2.1.5
+=====
+:release-date: 2012-04-13 3:30 P.M GMT
+
+* The url parser removed more than the first leading slash (Issue #121).
+
+* SQLAlchemy: Can now specify url using + separator
+
+ Example::
+
+ BrokerConnection("sqla+mysql://localhost/db")
+
+* Better support for anonymous queues (Issue #116).
+
+ Contributed by Michael Barrett.
+
+* ``Connection.as_uri`` now quotes url parts (Issue #117).
+
+* Beanstalk: Can now set message TTR as a message property.
+
+ Contributed by Andrii Kostenko
+
.. _version-2.1.4:
2.1.4
View
2 README.rst
@@ -2,7 +2,7 @@
kombu - Messaging Framework for Python
#############################################
-:Version: 2.1.3
+:Version: 2.1.5
Synopsis
========
View
2 kombu/__init__.py
@@ -1,7 +1,7 @@
"""Messaging Framework for Python"""
from __future__ import absolute_import
-VERSION = (2, 1, 3)
+VERSION = (2, 1, 5)
__version__ = ".".join(map(str, VERSION[0:3])) + "".join(VERSION[3:])
__author__ = "Ask Solem"
__contact__ = "ask@celeryproject.org"
View
41 kombu/connection.py
@@ -17,25 +17,21 @@
from contextlib import contextmanager
from copy import copy
-from functools import wraps
+from functools import partial, wraps
from itertools import count
+from urllib import quote
from Queue import Empty
# jython breaks on relative import for .exceptions for some reason
# (Issue #112)
from kombu import exceptions
from .transport import get_transport_cls
-from .utils import retry_over_time
+from .utils import cached_property, retry_over_time
from .utils.compat import OrderedDict, LifoQueue as _LifoQueue
from .utils.url import parse_url
_LOG_CONNECTION = os.environ.get("KOMBU_LOG_CONNECTION", False)
_LOG_CHANNEL = os.environ.get("KOMBU_LOG_CHANNEL", False)
-#: Connection info -> URI
-URI_FORMAT = """\
-%(transport)s://%(userid)s@%(hostname)s%(port)s/%(virtual_host)s\
-"""
-
__all__ = ["parse_url", "BrokerConnection", "Resource",
"ConnectionPool", "ChannelPool"]
@@ -73,8 +69,6 @@ class BrokerConnection(object):
>>> conn.release()
"""
- URI_FORMAT = URI_FORMAT
-
port = None
virtual_host = "/"
connect_timeout = 5
@@ -84,7 +78,7 @@ class BrokerConnection(object):
_default_channel = None
_transport = None
_logger = None
- skip_uri_transports = set(["sqlalchemy", "sqlakombu.transport.Transport"])
+ uri_passthrough = set(["sqla", "sqlalchemy"])
def __init__(self, hostname="localhost", userid=None,
password=None, virtual_host=None, port=None, insist=False,
@@ -96,9 +90,12 @@ def __init__(self, hostname="localhost", userid=None,
"port": port, "insist": insist, "ssl": ssl,
"transport": transport, "connect_timeout": connect_timeout,
"login_method": login_method}
- if hostname and "://" in hostname \
- and transport not in self.skip_uri_transports:
- params.update(parse_url(hostname))
+ if hostname and "://" in hostname:
+ if '+' in hostname[:hostname.index("://")]:
+ # e.g. sqla+mysql://root:masterkey@localhost/
+ params["transport"], params["hostname"] = hostname.split('+')
+ else:
+ params.update(parse_url(hostname))
self._init_params(**params)
# backend_cls argument will be removed shortly.
@@ -364,6 +361,9 @@ def __eqhash__(self):
return hash("|".join(map(str, self.info().itervalues())))
def as_uri(self, include_password=False):
+ if self.transport_cls in self.uri_passthrough:
+ return self.transport_cls + '+' + self.hostname
+ quoteS = partial(quote, safe="") # strict quote
fields = self.info()
port = fields["port"]
userid = fields["userid"]
@@ -371,11 +371,11 @@ def as_uri(self, include_password=False):
transport = fields["transport"]
url = "%s://" % transport
if userid:
- url += userid
+ url += quoteS(userid)
if include_password and password:
- url += ':' + password
+ url += ':' + quoteS(password)
url += '@'
- url += fields["hostname"]
+ url += quoteS(fields["hostname"])
# If the transport equals 'mongodb' the
# hostname contains a full mongodb connection
@@ -383,7 +383,7 @@ def as_uri(self, include_password=False):
if port and transport != "mongodb":
url += ':' + str(port)
- url += '/' + fields["virtual_host"]
+ url += '/' + quote(fields["virtual_host"])
return url
def Pool(self, limit=None, preload=None):
@@ -569,6 +569,13 @@ def transport(self):
self._transport = self.create_transport()
return self._transport
+ @cached_property
+ def manager(self):
+ return self.transport.manager
+
+ def get_manager(self, *args, **kwargs):
+ return self.transport.get_manager(*args, **kwargs)
+
@property
def connection_errors(self):
"""List of exceptions that may be raised by the connection."""
View
10 kombu/entity.py
@@ -381,12 +381,12 @@ def when_bound(self):
def declare(self, nowait=False):
"""Declares the queue, the exchange and binds the queue to
the exchange."""
- name = self.name
- if name:
- if self.exchange:
- self.exchange.declare(nowait)
+ if self.exchange:
+ self.exchange.declare(nowait)
self.queue_declare(nowait, passive=False)
- if name:
+ # self.name should be set by queue_declare in the case that
+ # we're working with anonymous queues
+ if self.name:
self.queue_bind(nowait)
return self.name
View
3 kombu/tests/test_transport_base.py
@@ -13,9 +13,12 @@ class test_StdChannel(TestCase):
def setUp(self):
self.conn = BrokerConnection("memory://")
self.channel = self.conn.channel()
+ self.channel.queues.clear()
+ self.conn.connection.state.clear()
def test_Consumer(self):
q = Queue("foo")
+ print(self.channel.queues)
cons = self.channel.Consumer(q)
self.assertIsInstance(cons, Consumer)
self.assertIs(cons.channel, self.channel)
View
5 kombu/tests/test_transport_mongodb.py
@@ -1,5 +1,7 @@
from __future__ import absolute_import
+from nose import SkipTest
+
from kombu.connection import BrokerConnection
from .utils import TestCase, skip_if_not_module
@@ -18,6 +20,9 @@ def test_url_parser(self):
from kombu.transport import mongodb
from pymongo.errors import ConfigurationError
+ raise SkipTest(
+ "Test is functional: it actually connects to mongod")
+
class Transport(mongodb.Transport):
Connection = MockConnection
View
1 kombu/transport/__init__.py
@@ -50,6 +50,7 @@ def __inner():
"couchdb": "kombu.transport.couchdb.Transport",
"django": "kombu.transport.django.Transport",
"sqlalchemy": "kombu.transport.sqlalchemy.Transport",
+ "sqla": "kombu.transport.sqlalchemy.Transport",
"ghettoq.taproot.Redis": _ghettoq("Redis", "redis", "redis"),
"ghettoq.taproot.Database": _ghettoq("Database", "django", "django"),
"ghettoq.taproot.MongoDB": _ghettoq("MongoDB", "mongodb"),
View
14 kombu/transport/amqplib.py
@@ -307,3 +307,17 @@ def default_connection_params(self):
return {"userid": "guest", "password": "guest",
"port": self.default_port,
"hostname": "localhost", "login_method": "AMQPLAIN"}
+
+ def get_manager(self, hostname=None, port=None, userid=None,
+ password=None):
+ import pyrabbit
+ c = self.client
+ opt = c.transport_options.get
+ host = (hostname if hostname is not None
+ else opt("manager_hostname", c.hostname))
+ port = port if port is not None else opt("manager_port", 55672)
+ return pyrabbit.Client("%s:%s" % (host, port),
+ userid if userid is not None
+ else opt("manager_userid", c.userid),
+ password if password is not None
+ else opt("manager_password", c.password))
View
19 kombu/transport/base.py
@@ -13,6 +13,7 @@
from ..serialization import decode
from ..compression import decompress
from ..exceptions import MessageStateError
+from ..utils import cached_property
ACKNOWLEDGED_STATES = frozenset(["ACK", "REJECTED", "REQUEUED"])
@@ -162,8 +163,19 @@ def payload(self):
return self._decoded_cache
+class Management(object):
+
+ def __init__(self, transport):
+ self.transport = transport
+
+ def list_bindings(self):
+ raise NotImplementedError(
+ "Your transport does not implement list_bindings")
+
+
class Transport(object):
"""Base class for transports."""
+ Management = Management
#: The :class:`~kombu.connection.BrokerConnection` owning this instance.
client = None
@@ -201,3 +213,10 @@ def verify_connection(self, connection):
@property
def default_connection_params(self):
return {}
+
+ def get_manager(self, *args, **kwargs):
+ return self.Management(self)
+
+ @cached_property
+ def manager(self):
+ return self.get_manager()
View
7 kombu/transport/beanstalk.py
@@ -43,9 +43,14 @@ def _parse_job(self, job):
return item, dest
def _put(self, queue, message, **kwargs):
+ extra = {}
priority = message["properties"]["delivery_info"]["priority"]
+ ttr = message["properties"].get("ttr")
+ if ttr is not None:
+ extra["ttr"] = ttr
+
self.client.use(queue)
- self.client.put(dumps(message), priority=priority)
+ self.client.put(dumps(message), priority=priority, **extra)
def _get(self, queue):
if queue not in self.client.watching():
View
2 kombu/transport/redis.py
@@ -320,7 +320,7 @@ def _create_client(self):
raise ValueError(
"Database name must be int between 0 and limit - 1")
- return self.Client(host=conninfo.hostname,
+ return self.Client(host=conninfo.hostname or "127.0.0.1",
port=conninfo.port or DEFAULT_PORT,
db=database,
password=conninfo.password)
View
30 kombu/transport/virtual/__init__.py
@@ -65,12 +65,12 @@ class BrokerState(object):
bindings = None
def __init__(self, exchanges=None, bindings=None):
- if exchanges is None:
- exchanges = {}
- if bindings is None:
- bindings = {}
- self.exchanges = exchanges
- self.bindings = bindings
+ self.exchanges = {} if exchanges is None else exchanges
+ self.bindings = {} if bindings is None else bindings
+
+ def clear(self):
+ self.exchanges.clear()
+ self.bindings.clear()
class QoS(object):
@@ -630,17 +630,29 @@ def cycle(self):
return self._cycle
+class Management(base.Management):
+
+ def __init__(self, transport):
+ super(Management, self).__init__(transport)
+ self.channel = transport.client.channel()
+
+ def get_bindings(self):
+ return [dict(destination=q, source=e, routing_key=r)
+ for q, e, r in self.channel.list_bindings()]
+
+ def close(self):
+ self.channel.close()
+
+
class Transport(base.Transport):
"""Virtual transport.
:param client: :class:`~kombu.connection.BrokerConnection` instance
"""
- #: channel class used.
Channel = Channel
-
- #: cycle class used.
Cycle = FairCycle
+ Management = Management
#: :class:`BrokerState` containing declared exchanges and
#: bindings (set by constructor).
View
2 kombu/utils/__init__.py
@@ -259,7 +259,7 @@ def reprkwargs(kwargs, sep=', ', fmt="%s=%s"):
def reprcall(name, args=(), kwargs=(), sep=', '):
- return "%s(%s%s%s)" % (name, sep.join(map(_safe_repr, args)),
+ return "%s(%s%s%s)" % (name, sep.join(map(_safe_repr, args or ())),
(args and kwargs) and sep or "",
reprkwargs(kwargs, sep))
View
3 kombu/utils/url.py
@@ -20,7 +20,8 @@ def _parse_url(url):
# This enables the use of replica sets and sharding.
# See pymongo.Connection() for more info.
hostname = schemeless if scheme == 'mongodb' else parts.hostname
- path = (parts.path or '').lstrip('/')
+ path = parts.path or ''
+ path = path[1:] if path and path[0] == '/' else path
return (scheme, unquote(hostname or '') or None,
int(parts.port) if parts.port else None,
unquote(parts.username or '') or None,

0 comments on commit 64af09d

Please sign in to comment.
Something went wrong with that request. Please try again.