Skip to content

Commit

Permalink
Merge pull request #1260 from seenureddy/adding_enum_type
Browse files Browse the repository at this point in the history
Adding enum type
  • Loading branch information
lukebakken committed Jan 27, 2021
2 parents 4fec852 + 60c9f97 commit a548024
Show file tree
Hide file tree
Showing 22 changed files with 92 additions and 39 deletions.
3 changes: 2 additions & 1 deletion docs/examples/tornado_consumer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ consumer.py::
from pika import adapters
import pika
import logging
from pika.exchange_type import ExchangeType

LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
Expand All @@ -27,7 +28,7 @@ consumer.py::

"""
EXCHANGE = 'message'
EXCHANGE_TYPE = 'topic'
EXCHANGE_TYPE = ExchangeType.topic
QUEUE = 'text'
ROUTING_KEY = 'example.text'

Expand Down
3 changes: 2 additions & 1 deletion examples/asynchronous_consumer_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import time
import pika
from pika.exchange_type import ExchangeType

LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
Expand All @@ -25,7 +26,7 @@ class ExampleConsumer(object):
"""
EXCHANGE = 'message'
EXCHANGE_TYPE = 'topic'
EXCHANGE_TYPE = ExchangeType.topic
QUEUE = 'text'
ROUTING_KEY = 'example.text'

Expand Down
3 changes: 2 additions & 1 deletion examples/asynchronous_publisher_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import json
import pika
from pika.exchange_type import ExchangeType

LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
Expand All @@ -25,7 +26,7 @@ class ExamplePublisher(object):
"""
EXCHANGE = 'message'
EXCHANGE_TYPE = 'topic'
EXCHANGE_TYPE = ExchangeType.topic
PUBLISH_INTERVAL = 1
QUEUE = 'text'
ROUTING_KEY = 'example.text'
Expand Down
3 changes: 2 additions & 1 deletion examples/asyncio_consumer_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pika

from pika.adapters.asyncio_connection import AsyncioConnection
from pika.exchange_type import ExchangeType

LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
Expand All @@ -27,7 +28,7 @@ class ExampleConsumer(object):
"""
EXCHANGE = 'message'
EXCHANGE_TYPE = 'topic'
EXCHANGE_TYPE = ExchangeType.topic
QUEUE = 'text'
ROUTING_KEY = 'example.text'

Expand Down
3 changes: 2 additions & 1 deletion examples/basic_consumer_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import threading
import time
import pika
from pika.exchange_type import ExchangeType

LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
Expand Down Expand Up @@ -54,7 +55,7 @@ def on_message(ch, method_frame, _header_frame, body, args):
channel = connection.channel()
channel.exchange_declare(
exchange="test_exchange",
exchange_type="direct",
exchange_type=ExchangeType.direct,
passive=False,
durable=True,
auto_delete=False)
Expand Down
3 changes: 2 additions & 1 deletion examples/blocking_consume_recover_multiple_hosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import functools
import random
import pika
from pika.exchange_type import ExchangeType


def on_message(ch, method_frame, _header_frame, body, userdata=None):
Expand All @@ -29,7 +30,7 @@ def on_message(ch, method_frame, _header_frame, body, userdata=None):
channel = connection.channel()
channel.exchange_declare(
exchange='test_exchange',
exchange_type='direct',
exchange_type=ExchangeType.direct,
passive=False,
durable=True,
auto_delete=False)
Expand Down
3 changes: 2 additions & 1 deletion examples/consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import functools
import logging
import pika
from pika.exchange_type import ExchangeType

LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
Expand All @@ -26,7 +27,7 @@ def main():
channel = connection.channel()
channel.exchange_declare(
exchange='test_exchange',
exchange_type='direct',
exchange_type=ExchangeType.direct,
passive=False,
durable=True,
auto_delete=False)
Expand Down
5 changes: 3 additions & 2 deletions examples/consumer_queued.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import threading
import pika
from pika.exchange_type import ExchangeType

body_buffer = []
lock = threading.Lock()
Expand All @@ -17,9 +18,9 @@
consumer_channel = connection.channel()
bind_channel = connection.channel()

main_channel.exchange_declare(exchange='com.micex.sten', exchange_type='direct')
main_channel.exchange_declare(exchange='com.micex.sten', exchange_type=ExchangeType.direct)
main_channel.exchange_declare(
exchange='com.micex.lasttrades', exchange_type='direct')
exchange='com.micex.lasttrades', exchange_type=ExchangeType.direct)

queue = main_channel.queue_declare('', exclusive=True).method.queue
queue_tickers = main_channel.queue_declare('', exclusive=True).method.queue
Expand Down
5 changes: 3 additions & 2 deletions examples/consumer_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import logging
import pika
from pika.exchange_type import ExchangeType

print('pika version: %s' % pika.__version__)

Expand All @@ -14,9 +15,9 @@
consumer_channel = connection.channel()
bind_channel = connection.channel()

main_channel.exchange_declare(exchange='com.micex.sten', exchange_type='direct')
main_channel.exchange_declare(exchange='com.micex.sten', exchange_type=ExchangeType.direct)
main_channel.exchange_declare(
exchange='com.micex.lasttrades', exchange_type='direct')
exchange='com.micex.lasttrades', exchange_type=ExchangeType.direct)

queue = main_channel.queue_declare('', exclusive=True).method.queue
queue_tickers = main_channel.queue_declare('', exclusive=True).method.queue
Expand Down
5 changes: 3 additions & 2 deletions examples/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
import json
import random
import pika
from pika.exchange_type import ExchangeType

print('pika version: %s' % pika.__version__)

connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
main_channel = connection.channel()

main_channel.exchange_declare(exchange='com.micex.sten', exchange_type='direct')
main_channel.exchange_declare(exchange='com.micex.sten', exchange_type=ExchangeType.direct)
main_channel.exchange_declare(
exchange='com.micex.lasttrades', exchange_type='direct')
exchange='com.micex.lasttrades', exchange_type=ExchangeType.direct)

tickers = {
'MXSE.EQBR.LKOH': (1933, 1940),
Expand Down
3 changes: 2 additions & 1 deletion examples/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import logging
import pika
from pika.exchange_type import ExchangeType

logging.basicConfig(level=logging.DEBUG)

Expand All @@ -12,7 +13,7 @@
channel = connection.channel()
channel.exchange_declare(
exchange="test_exchange",
exchange_type="direct",
exchange_type=ExchangeType.direct,
passive=False,
durable=True,
auto_delete=False)
Expand Down
5 changes: 3 additions & 2 deletions examples/twisted_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import pika
from pika import spec
from pika.adapters import twisted_connection
from pika.exchange_type import ExchangeType

PREFETCH_COUNT = 2

Expand Down Expand Up @@ -111,7 +112,7 @@ def setup_read(self, exchange, routing_key, callback):
if exchange:
yield self._channel.exchange_declare(
exchange=exchange,
exchange_type='topic',
exchange_type=ExchangeType.topic,
durable=True,
auto_delete=False)

Expand Down Expand Up @@ -172,7 +173,7 @@ def send_message(self, exchange, routing_key, msg):
system='Pika:=>')
yield self._channel.exchange_declare(
exchange=exchange,
exchange_type='topic',
exchange_type=ExchangeType.topic,
durable=True,
auto_delete=False)
prop = spec.BasicProperties(delivery_mode=2)
Expand Down
3 changes: 2 additions & 1 deletion pika/adapters/blocking_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

# NOTE: import SelectConnection after others to avoid circular depenency
from pika.adapters import select_connection
from pika.exchange_type import ExchangeType

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -2341,7 +2342,7 @@ def confirm_delivery(self):

def exchange_declare(self,
exchange,
exchange_type='direct',
exchange_type=ExchangeType.direct,
passive=False,
durable=False,
auto_delete=False,
Expand Down
3 changes: 2 additions & 1 deletion pika/adapters/twisted_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pika import exceptions, spec
from pika.adapters.utils import nbio_interface
from pika.adapters.utils.io_services_utils import check_callback_arg
from pika.exchange_type import ExchangeType

# Twistisms
# pylint: disable=C0111,C0103
Expand Down Expand Up @@ -786,7 +787,7 @@ def exchange_bind(self, destination, source, routing_key='',

def exchange_declare(self,
exchange,
exchange_type='direct',
exchange_type=ExchangeType.direct,
passive=False,
durable=False,
auto_delete=False,
Expand Down
6 changes: 5 additions & 1 deletion pika/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import collections
import logging
import uuid
from enum import Enum

import pika.frame as frame
import pika.exceptions as exceptions
import pika.spec as spec
import pika.validators as validators
from pika.compat import unicode_type, dictkeys, is_integer
from pika.exchange_type import ExchangeType

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -625,7 +627,7 @@ def exchange_bind(self,

def exchange_declare(self,
exchange,
exchange_type='direct',
exchange_type=ExchangeType.direct,
passive=False,
durable=False,
auto_delete=False,
Expand Down Expand Up @@ -657,6 +659,8 @@ def exchange_declare(self,
validators.require_string(exchange, 'exchange')
self._raise_if_not_open()
nowait = validators.rpc_completion_callback(callback)
if isinstance(exchange_type, Enum):
exchange_type = exchange_type.value
return self._rpc(
spec.Exchange.Declare(0, exchange, exchange_type, passive, durable,
auto_delete, internal, nowait, arguments or
Expand Down
8 changes: 8 additions & 0 deletions pika/exchange_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from enum import Enum


class ExchangeType(Enum) :
direct = 'direct'
fanout = 'fanout'
headers = 'headers'
topic = 'topic'
3 changes: 2 additions & 1 deletion pika/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from pika import amqp_object
from pika import data
from pika.compat import str_or_bytes, unicode_type
from pika.exchange_type import ExchangeType

# Python 3 support for str object
str = bytes
Expand Down Expand Up @@ -693,7 +694,7 @@ class Declare(amqp_object.Method):
INDEX = 0x0028000A # 40, 10; 2621450
NAME = 'Exchange.Declare'

def __init__(self, ticket=0, exchange=None, type='direct', passive=False, durable=False, auto_delete=False, internal=False, nowait=False, arguments=None):
def __init__(self, ticket=0, exchange=None, type=ExchangeType.direct, passive=False, durable=False, auto_delete=False, internal=False, nowait=False, arguments=None):
self.ticket = ticket
self.exchange = exchange
self.type = type
Expand Down
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ mock
nose
tornado
twisted
enum34; python_version == '2.7' or (python_version >= '3.0' and python_version <= '3.4')
9 changes: 5 additions & 4 deletions tests/acceptance/async_adapter_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from pika.compat import as_bytes, time_now
import pika.connection
import pika.exceptions
from pika.exchange_type import ExchangeType
import pika.frame

from . import async_test_base
Expand Down Expand Up @@ -569,7 +570,7 @@ def on_deleted(self, _frame):
class TestExchangeDeclareAndDelete(AsyncTestCase, AsyncAdapters):
DESCRIPTION = "Create and delete and exchange"

X_TYPE = 'direct'
X_TYPE = ExchangeType.direct

def begin(self, channel):
self.name = self.__class__.__name__ + ':' + uuid.uuid1().hex
Expand All @@ -592,8 +593,8 @@ def on_exchange_delete(self, frame):
class TestExchangeRedeclareWithDifferentValues(AsyncTestCase, AsyncAdapters):
DESCRIPTION = "should close chan: re-declared exchange w/ diff params"

X_TYPE1 = 'direct'
X_TYPE2 = 'topic'
X_TYPE1 = ExchangeType.direct
X_TYPE2 = ExchangeType.topic

def begin(self, channel):
self.name = self.__class__.__name__ + ':' + uuid.uuid1().hex
Expand Down Expand Up @@ -646,7 +647,7 @@ def begin(self, channel):
exch_name = base_exch_name + ':' + str(i)
cb = functools.partial(self.on_bad_result, exch_name)
channel.exchange_declare(exch_name,
exchange_type='direct',
exchange_type=ExchangeType.direct,
passive=True,
callback=cb)
channel.close()
Expand Down
3 changes: 2 additions & 1 deletion tests/acceptance/async_test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pika
from pika import adapters
from pika.adapters import select_connection
from pika.exchange_type import ExchangeType

from ..threaded_test_wrapper import create_run_in_thread_decorator

Expand Down Expand Up @@ -263,7 +264,7 @@ def start(self, adapter, ioloop_factory):

def begin(self, channel):
self.channel.exchange_declare(self.exchange,
exchange_type='direct',
exchange_type=ExchangeType.direct,
passive=False,
durable=False,
auto_delete=True,
Expand Down

0 comments on commit a548024

Please sign in to comment.