Skip to content

Commit

Permalink
stronger linter rules
Browse files Browse the repository at this point in the history
  • Loading branch information
mosquito committed Sep 17, 2018
1 parent 29ae3a5 commit 62d3bfd
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 44 deletions.
5 changes: 1 addition & 4 deletions pylama.ini
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
[pylama]
ignore=C901,E0603,E252,E402,E711,E712,E722,W0401
ignore=C901
skip = aio_pika/pika

[pylama:pycodestyle]
max_line_length = 80

[pylama:tests/_*.py]
ignore=E0100
10 changes: 5 additions & 5 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from functools import wraps

import shortuuid
from typing import Generator, Any
from yarl import URL

from aio_pika import Connection, connect, Channel, Queue, Exchange
Expand Down Expand Up @@ -39,15 +38,16 @@ def get_random_name(self, *args):


class BaseTestCase(AsyncTestCase):
async def create_connection(self, cleanup=True) -> Generator[Any, None, Connection]:
async def create_connection(self, cleanup=True) -> Connection:
client = await connect(AMQP_URL, loop=self.loop)

if cleanup:
self.addCleanup(client.close)

return client

async def create_channel(self, connection=None, cleanup=True, **kwargs) -> Generator[Any, None, Channel]:
async def create_channel(self, connection=None,
cleanup=True, **kwargs) -> Channel:
if connection is None:
connection = await self.create_connection()

Expand All @@ -58,7 +58,7 @@ async def create_channel(self, connection=None, cleanup=True, **kwargs) -> Gener

return channel

async def declare_queue(self, *args, **kwargs) -> Generator[Any, None, Queue]:
async def declare_queue(self, *args, **kwargs) -> Queue:
if 'channel' not in kwargs:
channel = await self.create_channel()
else:
Expand All @@ -68,7 +68,7 @@ async def declare_queue(self, *args, **kwargs) -> Generator[Any, None, Queue]:
self.addCleanup(queue.delete)
return queue

async def declare_exchange(self, *args, **kwargs) -> Generator[Any, None, Exchange]:
async def declare_exchange(self, *args, **kwargs) -> Exchange:
if 'channel' not in kwargs:
channel = await self.create_channel()
else:
Expand Down
151 changes: 116 additions & 35 deletions tests/test_amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import aio_pika.exceptions
from copy import copy
from aio_pika import connect, Message, DeliveryMode, Channel
from aio_pika.exceptions import MessageProcessError, ProbableAuthenticationError
from aio_pika.exceptions import (
MessageProcessError, ProbableAuthenticationError
)
from aio_pika.exchange import ExchangeType
from aio_pika.tools import wait
from unittest import mock
Expand Down Expand Up @@ -69,7 +71,10 @@ async def test_temporary_queue(self):

body = os.urandom(32)

await channel.default_exchange.publish(Message(body=body), routing_key=queue.name)
await channel.default_exchange.publish(
Message(body=body),
routing_key=queue.name
)

message = await queue.get()

Expand All @@ -84,7 +89,12 @@ async def test_internal_exchange(self):
exchange_name = self.get_random_name("internal", "exchange")

channel = await client.channel()
exchange = await self.declare_exchange(exchange_name, auto_delete=True, internal=True, channel=channel)
exchange = await self.declare_exchange(
exchange_name,
auto_delete=True,
internal=True,
channel=channel
)
queue = await self.declare_queue(auto_delete=True, channel=channel)

await queue.bind(exchange, routing_key)
Expand All @@ -110,23 +120,42 @@ async def test_declare_exchange_with_passive_flag(self):
channel = await client.channel()

with pytest.raises(aio_pika.exceptions.ChannelClosed):
await self.declare_exchange(exchange_name, auto_delete=True, passive=True, channel=channel)
await self.declare_exchange(
exchange_name,
auto_delete=True,
passive=True,
channel=channel
)

channel1 = await client.channel()
channel2 = await client.channel()

await self.declare_exchange(exchange_name, auto_delete=True, passive=False, channel=channel1)
await self.declare_exchange(
exchange_name,
auto_delete=True,
passive=False,
channel=channel1
)

# Check ignoring different exchange options
await self.declare_exchange(exchange_name, auto_delete=False, passive=True, channel=channel2)
await self.declare_exchange(
exchange_name,
auto_delete=False,
passive=True,
channel=channel2
)

async def test_simple_publish_and_receive(self):
queue_name = self.get_random_name("test_connection")
routing_key = self.get_random_name()

channel = await self.create_channel()
exchange = await self.declare_exchange('direct', auto_delete=True, channel=channel)
queue = await self.declare_queue(queue_name, auto_delete=True, channel=channel)
exchange = await self.declare_exchange(
'direct', auto_delete=True, channel=channel
)
queue = await self.declare_queue(
queue_name, auto_delete=True, channel=channel
)

await queue.bind(exchange, routing_key)

Expand All @@ -153,8 +182,12 @@ async def test_simple_publish_without_confirm(self):
routing_key = self.get_random_name()

channel = await self.create_channel(publisher_confirms=False)
exchange = await self.declare_exchange('direct', auto_delete=True, channel=channel)
queue = await self.declare_queue(queue_name, auto_delete=True, channel=channel)
exchange = await self.declare_exchange(
'direct', auto_delete=True, channel=channel
)
queue = await self.declare_queue(
queue_name, auto_delete=True, channel=channel
)

await queue.bind(exchange, routing_key)

Expand All @@ -176,13 +209,17 @@ async def test_simple_publish_without_confirm(self):

await queue.unbind(exchange, routing_key)

async def test_simple_publish_and_receive_delivery_mode_explicitly_none(self):
async def test_simple_publish_and_receive_delivery_mode_explicitly(self):
queue_name = self.get_random_name("test_connection")
routing_key = self.get_random_name()

channel = await self.create_channel()
exchange = await self.declare_exchange('direct', auto_delete=True, channel=channel)
queue = await self.declare_queue(queue_name, auto_delete=True, channel=channel)
exchange = await self.declare_exchange(
'direct', auto_delete=True, channel=channel
)
queue = await self.declare_queue(
queue_name, auto_delete=True, channel=channel
)

await queue.bind(exchange, routing_key)

Expand Down Expand Up @@ -210,8 +247,12 @@ async def test_simple_publish_and_receive_to_bound_exchange(self):
dest_name = self.get_random_name("destination", "exchange")

channel = await self.create_channel()
src_exchange = await self.declare_exchange(src_name, auto_delete=True, channel=channel)
dest_exchange = await self.declare_exchange(dest_name, auto_delete=True, channel=channel)
src_exchange = await self.declare_exchange(
src_name, auto_delete=True, channel=channel
)
dest_exchange = await self.declare_exchange(
dest_name, auto_delete=True, channel=channel
)
queue = await self.declare_queue(auto_delete=True, channel=channel)

await queue.bind(dest_exchange, routing_key)
Expand Down Expand Up @@ -241,8 +282,12 @@ async def test_incoming_message_info(self):
routing_key = self.get_random_name()

channel = await self.create_channel()
exchange = await self.declare_exchange('direct', auto_delete=True, channel=channel)
queue = await self.declare_queue(queue_name, auto_delete=True, channel=channel)
exchange = await self.declare_exchange(
'direct', auto_delete=True, channel=channel
)
queue = await self.declare_queue(
queue_name, auto_delete=True, channel=channel
)

await queue.bind(exchange, routing_key)

Expand Down Expand Up @@ -307,8 +352,12 @@ async def test_context_process(self):
routing_key = self.get_random_name()

channel = await self.create_channel()
exchange = await self.declare_exchange('direct', auto_delete=True, channel=channel)
queue = await self.declare_queue(queue_name, auto_delete=True, channel=channel)
exchange = await self.declare_exchange(
'direct', auto_delete=True, channel=channel
)
queue = await self.declare_queue(
queue_name, auto_delete=True, channel=channel
)

await queue.bind(exchange, routing_key)

Expand Down Expand Up @@ -370,12 +419,16 @@ async def test_context_process(self):

incoming_message = await queue.get(timeout=5)
with pytest.raises(AssertionError):
with incoming_message.process(requeue=True, reject_on_redelivered=True):
with incoming_message.process(
requeue=True, reject_on_redelivered=True
):
raise AssertionError

incoming_message = await queue.get(timeout=5)
with pytest.raises(AssertionError):
with incoming_message.process(requeue=True, reject_on_redelivered=True):
with incoming_message.process(
requeue=True, reject_on_redelivered=True
):
raise AssertionError

self.assertEqual(incoming_message.locked, True)
Expand All @@ -387,8 +440,12 @@ async def test_context_process_redelivery(self):
routing_key = self.get_random_name()

channel = await self.create_channel()
exchange = await self.declare_exchange('direct', auto_delete=True, channel=channel)
queue = await self.declare_queue(queue_name, auto_delete=True, channel=channel)
exchange = await self.declare_exchange(
'direct', auto_delete=True, channel=channel
)
queue = await self.declare_queue(
queue_name, auto_delete=True, channel=channel
)

await queue.bind(exchange, routing_key)

Expand All @@ -405,18 +462,25 @@ async def test_context_process_redelivery(self):
incoming_message = await queue.get(timeout=5)

with pytest.raises(AssertionError):
with incoming_message.process(requeue=True, reject_on_redelivered=True):
with incoming_message.process(
requeue=True, reject_on_redelivered=True
):
raise AssertionError

incoming_message = await queue.get(timeout=5)

with mock.patch('aio_pika.message.log') as message_logger:
with pytest.raises(Exception):
with incoming_message.process(requeue=True, reject_on_redelivered=True):
with incoming_message.process(
requeue=True, reject_on_redelivered=True
):
raise Exception

self.assertTrue(message_logger.info.called)
self.assertEqual(message_logger.info.mock_calls[0][1][1].body, incoming_message.body)
self.assertEqual(
message_logger.info.mock_calls[0][1][1].body,
incoming_message.body
)

self.assertEqual(incoming_message.body, body)

Expand Down Expand Up @@ -478,7 +542,8 @@ async def test_ack_multiple(self):
msg = Message(body)
await exchange.publish(msg, routing_key)

# ack only last mesage with multiple flag, first message should be acked too
# ack only last mesage with multiple flag, first
# message should be acked too
await queue.get(timeout=5)
last_message = await queue.get(timeout=5)
last_message.ack(multiple=True)
Expand Down Expand Up @@ -736,7 +801,11 @@ async def test_connection_refused(self):
await connect('amqp://guest:guest@localhost:9999', loop=self.loop)

async def test_wrong_credentials(self):
amqp_url = AMQP_URL.with_user(uuid.uuid4().hex).with_password(uuid.uuid4().hex)
amqp_url = AMQP_URL.with_user(
uuid.uuid4().hex
).with_password(
uuid.uuid4().hex
)

with pytest.raises(ProbableAuthenticationError):
await connect(
Expand Down Expand Up @@ -768,9 +837,13 @@ async def dlx_handle(message):
self.assertEqual(message.routing_key, dlx_routing_key)
f.set_result(True)

direct_exchange = await self.declare_exchange('direct', channel=channel, auto_delete=True) # type:
# aio_pika.Exchange
dlx_exchange = await channel.declare_exchange('dlx', ExchangeType.DIRECT, auto_delete=True)
direct_exchange = await self.declare_exchange(
'direct', channel=channel, auto_delete=True
) # type: aio_pika.Exchange

dlx_exchange = await channel.declare_exchange(
'dlx', ExchangeType.DIRECT, auto_delete=True
)

direct_queue = await channel.declare_queue(
"%s_direct_queue" % suffix,
Expand Down Expand Up @@ -831,7 +904,9 @@ async def test_connection_close(self):
await exchange.publish(msg, routing_key)

channel = await client.channel()
exchange = await channel.declare_exchange('direct', auto_delete=True)
exchange = await channel.declare_exchange(
'direct', auto_delete=True
)
finally:
await exchange.delete()
await wait((client.close(), client.closing), loop=self.loop)
Expand Down Expand Up @@ -935,7 +1010,9 @@ async def test_expiration(self):
message = await f

self.assertEqual(message.body, body)
self.assertEqual(message.headers['x-death'][0]['original-expiration'], '500')
self.assertEqual(
message.headers['x-death'][0]['original-expiration'], '500'
)

await wait((client.close(), client.closing), loop=self.loop)

Expand Down Expand Up @@ -1091,7 +1168,9 @@ async def test_message_nack(self):
channel = await client.channel()
queue = await channel.declare_queue(queue_name, auto_delete=True)

await channel.default_exchange.publish(Message(body=body), routing_key=queue_name)
await channel.default_exchange.publish(
Message(body=body), routing_key=queue_name
)

message = await queue.get() # type: aio_pika.IncomingMessage

Expand Down Expand Up @@ -1227,7 +1306,9 @@ async def test_async_with_connection(self):

async with conn:

channel2 = await self.create_channel(connection=conn, cleanup=False)
channel2 = await self.create_channel(
connection=conn, cleanup=False
)

queue = await channel2.declare_queue(
self.get_random_name("queue", "is_async", "for"),
Expand Down

0 comments on commit 62d3bfd

Please sign in to comment.