Skip to content

Commit

Permalink
Merge pull request #581 from ztane/flatpatch2
Browse files Browse the repository at this point in the history
Python 3 support, rebased on top of the current pika master
  • Loading branch information
gmr committed May 19, 2015
2 parents c7a72cd + 4954d38 commit 072ba73
Show file tree
Hide file tree
Showing 33 changed files with 632 additions and 605 deletions.
6 changes: 5 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ language: python
python:
- 2.6
- 2.7
- 3.3
- 3.4
before_install:
- sudo add-apt-repository "deb http://us.archive.ubuntu.com/ubuntu/ trusty main restricted universe multiverse"
- sudo add-apt-repository "deb http://us.archive.ubuntu.com/ubuntu/ trusty-updates main restricted universe multiverse"
- sudo apt-get update -qq
- sudo apt-get install libev-dev/trusty
install:
- if [[ $TRAVIS_PYTHON_VERSION == '2.6' ]]; then pip install unittest2; fi
- if [[ $TRAVIS_PYTHON_VERSION == '2.6' ]]; then pip install unittest2 ordereddict; fi
- if [[ $TRAVIS_PYTHON_VERSION == '2.7' ]]; then pip install pyev; fi
- if [[ $TRAVIS_PYTHON_VERSION == '3.3' ]]; then pip install pyev; fi
- if [[ $TRAVIS_PYTHON_VERSION == '3.4' ]]; then pip install pyev; fi
- pip install --use-mirrors -r test-requirements.pip
services:
- rabbitmq
Expand Down
2 changes: 1 addition & 1 deletion examples/twisted_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def _read_item(self, item, queue, callback):
callback(item)

def _read_item_err(self, error):
print error
print(error)

def send(self):
"""If connected, send all waiting messages."""
Expand Down
3 changes: 2 additions & 1 deletion pika/adapters/asyncore_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time

from pika.adapters import base_connection
from pika.compat import dictkeys

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -65,7 +66,7 @@ def handle_write(self):
def process_timeouts(self):
"""Process the self._timeouts event stack"""
start_time = time.time()
for timeout_id in self._timeouts.keys():
for timeout_id in dictkeys(self._timeouts):
if self._timeouts[timeout_id]['deadline'] <= start_time:
callback = self._timeouts[timeout_id]['callback']
del self._timeouts[timeout_id]
Expand Down
13 changes: 7 additions & 6 deletions pika/adapters/blocking_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from pika import spec
from pika import utils
from pika.adapters import base_connection
from pika.compat import dictkeys, unicode_type

if os.name == 'java':
from select import cpython_compatible_select as select_function
Expand Down Expand Up @@ -248,7 +249,7 @@ def process_data_events(self):

def process_timeouts(self):
"""Process the self._timeouts event stack"""
for timeout_id in self._timeouts.keys():
for timeout_id in dictkeys(self._timeouts):
if self._deadline_passed(timeout_id):
self._call_timeout_method(self._timeouts.pop(timeout_id))

Expand Down Expand Up @@ -344,7 +345,7 @@ def _deadline_passed(self, timeout_id):
:rtype: bool
"""
if timeout_id not in self._timeouts.keys():
if timeout_id not in self._timeouts:
return False
return self._timeouts[timeout_id]['deadline'] <= time.time()

Expand Down Expand Up @@ -545,7 +546,7 @@ def basic_publish(self, exchange, routing_key, body,
if mandatory:
self._response = None

if isinstance(body, unicode):
if isinstance(body, unicode_type):
body = body.encode('utf-8')

if self._confirmation:
Expand Down Expand Up @@ -685,7 +686,7 @@ def close(self, reply_code=0, reply_text="Normal Shutdown"):
# If there are any consumers, cancel them as well
if self._consumers:
LOGGER.debug('Cancelling %i consumers', len(self._consumers))
for consumer_tag in self._consumers.keys():
for consumer_tag in dictkeys(self._consumers):
self.basic_cancel(consumer_tag=consumer_tag)
self._set_state(self.CLOSING)
self._rpc(spec.Channel.Close(reply_code, reply_text, 0, 0), None,
Expand All @@ -701,7 +702,7 @@ def consume(self, queue, no_ack=False, exclusive=False):
Example:
for method, properties, body in channel.consume('queue'):
print body
print(body)
channel.basic_ack(method.delivery_tag)
You should call BlockingChannel.cancel() when you escape out of the
Expand Down Expand Up @@ -994,7 +995,7 @@ def stop_consuming(self, consumer_tag=None):
if consumer_tag:
self.basic_cancel(consumer_tag)
else:
for consumer_tag in self._consumers.keys():
for consumer_tag in dictkeys(self._consumers):
self.basic_cancel(consumer_tag)
self.wait = True

Expand Down
6 changes: 4 additions & 2 deletions pika/adapters/select_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from operator import itemgetter
from collections import defaultdict

from pika.compat import dictkeys

from pika.adapters.base_connection import BaseConnection

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -305,7 +307,7 @@ def stop(self):

try:
# Send byte to interrupt the poll loop, use write() for consitency.
os.write(self._w_interrupt.fileno(), 'X')
os.write(self._w_interrupt.fileno(), b'X')
except OSError as err:
if err.errno != errno.EWOULDBLOCK:
raise
Expand Down Expand Up @@ -354,7 +356,7 @@ def _process_fd_events(self, fd_event_map, write_only):

self._processing_fd_event_map = fd_event_map

for fileno in fd_event_map.keys():
for fileno in dictkeys(fd_event_map):
if fileno not in fd_event_map:
# the fileno has been removed from the map under our feet.
continue
Expand Down
10 changes: 4 additions & 6 deletions pika/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from pika import frame
from pika import amqp_object
from pika.compat import xrange, canonical_str

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -35,15 +36,12 @@ def name_or_value(value):
if isinstance(value, amqp_object.AMQPObject):
return value.NAME

# Cast the value to a string, encoding it if it's unicode
try:
return str(value)
except UnicodeEncodeError:
return str(value.encode('utf-8'))
# Cast the value to a str (python 2 and python 3); encoding as UTF-8 on Python 2
return canonical_str(value)


def sanitize_prefix(function):
"""Automatically call _name_or_value on the prefix passed in."""
"""Automatically call name_or_value on the prefix passed in."""

@functools.wraps(function)
def wrapper(*args, **kwargs):
Expand Down
15 changes: 9 additions & 6 deletions pika/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import pika.exceptions as exceptions
import pika.spec as spec
from pika.utils import is_callable
from pika.compat import unicode_type, dictkeys


LOGGER = logging.getLogger(__name__)
MAX_CHANNELS = 32768
Expand Down Expand Up @@ -206,8 +208,9 @@ def basic_consume(self, consumer_callback,
self._validate_channel_and_callback(consumer_callback)

# If a consumer tag was not passed, create one
consumer_tag = consumer_tag or 'ctag%i.%s' % (self.channel_number,
uuid.uuid4().get_hex())
if not consumer_tag:
consumer_tag = ('ctag%i.%s' % (self.channel_number,
uuid.uuid4().hex)).encode()

if consumer_tag in self._consumers or consumer_tag in self._cancelled:
raise exceptions.DuplicateConsumerTag(consumer_tag)
Expand Down Expand Up @@ -296,7 +299,7 @@ def basic_publish(self, exchange, routing_key, body,
raise exceptions.ChannelClosed()
if immediate:
LOGGER.warning('The immediate flag is deprecated in RabbitMQ')
if isinstance(body, unicode):
if isinstance(body, unicode_type):
body = body.encode('utf-8')
properties = properties or spec.BasicProperties()
self._send_method(spec.Basic.Publish(exchange=exchange,
Expand Down Expand Up @@ -387,7 +390,7 @@ def close(self, reply_code=0, reply_text="Normal Shutdown"):
LOGGER.info('Channel.close(%s, %s)', reply_code, reply_text)
if self._consumers:
LOGGER.debug('Cancelling %i consumers', len(self._consumers))
for consumer_tag in self._consumers.keys():
for consumer_tag in dictkeys(self._consumers):
self.basic_cancel(consumer_tag=consumer_tag)
self._set_state(self.CLOSING)
self._rpc(spec.Channel.Close(reply_code, reply_text, 0, 0),
Expand Down Expand Up @@ -428,7 +431,7 @@ def consumer_tags(self):
:rtype: list
"""
return list(self._consumers.keys())
return dictkeys(self._consumers)

def exchange_bind(self,
callback=None,
Expand Down Expand Up @@ -1146,7 +1149,7 @@ def _finish(self):
"""
content = (self._method_frame, self._header_frame,
''.join(self._body_fragments))
b''.join(self._body_fragments))
self._reset()
return content

Expand Down
94 changes: 94 additions & 0 deletions pika/compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import sys as _sys


PY2 = _sys.version_info < (3,)
PY3 = not PY2


if not PY2:
# these were moved around for Python 3
from urllib.parse import unquote as url_unquote, urlencode

# Python 3 does not have basestring anymore; we include
# *only* the str here as this is used for textual data.
basestring = (str,)

# for assertions that the data is either encoded or non-encoded text
str_or_bytes = (str, bytes)

# xrange is gone, replace it with range
xrange = range

# the unicode type is str
unicode_type = str


def dictkeys(dct):
"""
Returns a list of keys of dictionary
dict.keys returns a view that works like .keys in Python 2
*except* any modifications in the dictionary will be visible
(and will cause errors if the view is being iterated over while
it is modified).
"""

return list(dct.keys())


def byte(*args):
"""
This is the same as Python 2 `chr(n)` for bytes in Python 3
Returns a single byte `bytes` for the given int argument (we
optimize it a bit here by passing the positional argument tuple
directly to the bytes constructor.
"""
return bytes(args)

class long(int):
"""
A marker class that signifies that the integer value should be
serialized as `l` instead of `I`
"""

def __repr__(self):
return str(self) + 'L'

def canonical_str(value):
"""
Return the canonical str value for the string.
In both Python 3 and Python 2 this is str.
"""

return str(value)

else:
from urllib import unquote as url_unquote, urlencode

basestring = basestring
str_or_bytes = basestring
xrange = xrange
unicode_type = unicode
dictkeys = dict.keys
byte = chr
long = long

def canonical_str(value):
"""
Returns the canonical string value of the given string.
In Python 2 this is the value unchanged if it is an str, otherwise
it is the unicode value encoded as UTF-8.
"""

try:
return str(value)
except UnicodeEncodeError:
return str(value.encode('utf-8'))


def as_bytes(value):
if not isinstance(value, bytes):
return value.encode('UTF-8')
return value

0 comments on commit 072ba73

Please sign in to comment.