Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
maximdanilchenko committed Oct 15, 2018
1 parent 7d9831e commit 2daccd9
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 3 deletions.
5 changes: 2 additions & 3 deletions async_pq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

from asyncpg import Connection

DELETE_LIMIT_SM = 100
DELETE_LIMIT_BIG = 10000
DELETE_LIMIT_SM = 10


class Queue:
Expand Down Expand Up @@ -108,7 +107,7 @@ async def return_unacked(self, timeout: int, limit: int=DELETE_LIMIT_SM) -> int:
limit,
)

async def clean_acked_queue(self, limit: int=DELETE_LIMIT_BIG) -> int:
async def clean_acked_queue(self, limit: int=DELETE_LIMIT_SM) -> int:
""" Delete acked queue entities (request will not be deleted) """
return await self._connection.fetchval(
f"""
Expand Down
96 changes: 96 additions & 0 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,32 @@ async def pop_with_double_ack(self, new_queue, put_and_pop):
async def wrong_ack(self, new_queue):
return await new_queue.ack(42)

@pytest.fixture
async def put_and_return_unacked(self, new_queue):
await new_queue.put('"first"', '"second"', '"third"', '"forth"')
await new_queue.pop(limit=3, with_ack=True)
return await new_queue.return_unacked(0)

@pytest.fixture
async def put_and_return_unacked_with_limit(self, new_queue):
await new_queue.put('"first"', '"second"', '"third"', '"forth"')
await new_queue.pop(limit=1, with_ack=True)
await new_queue.pop(limit=1, with_ack=True)
await new_queue.pop(limit=1, with_ack=True)
return await new_queue.return_unacked(0, limit=2)

@pytest.fixture
async def clean_acked(self, new_queue):
await new_queue.put('"first"', '"second"', '"third"', '"forth"')
await new_queue.pop(limit=10, with_ack=False)
return await new_queue.clean_acked_queue()

@pytest.fixture
async def clean_acked_with_limit(self, new_queue):
await new_queue.put('"first"', '"second"', '"third"', '"forth"')
await new_queue.pop(limit=10, with_ack=False)
return await new_queue.clean_acked_queue(limit=2)

async def test_fabric(self, pg_connection):
queue = await QueueFabric(pg_connection).find_queue('items')
assert isinstance(queue, Queue)
Expand Down Expand Up @@ -124,3 +150,73 @@ async def test_double_ack(self, pop_with_double_ack):

async def test_wrong_ack(self, wrong_ack):
assert wrong_ack is False

async def test_return_unacked(self, new_queue, put_and_return_unacked, pg_connection):
assert put_and_return_unacked == 1
all_queue = await pg_connection.fetch(
f"""
SELECT * from {new_queue._queue_table_name} ORDER BY q_id
"""
)
assert [list(r) for r in all_queue] == [
[1, '"first"', None],
[2, '"second"', None],
[3, '"third"', None],
[4, '"forth"', None],
]
all_requests = await pg_connection.fetch(
f"""
SELECT * from {new_queue._requests_table_name}
"""
)
assert all_requests == []

async def test_return_unacked_with_limit(self, new_queue, put_and_return_unacked_with_limit, pg_connection):
assert put_and_return_unacked_with_limit == 2
all_queue = await pg_connection.fetch(
f"""
SELECT * from {new_queue._queue_table_name} ORDER BY q_id
"""
)
assert [list(r) for r in all_queue] == [
[1, '"first"', None],
[2, '"second"', None],
[3, '"third"', 3],
[4, '"forth"', None],
]
all_requests = await pg_connection.fetch(
f"""
SELECT * from {new_queue._requests_table_name}
"""
)
assert [list(r)[:2] for r in all_requests] == [[3, 'wait']]

async def test_clean_acked(self, new_queue, clean_acked, pg_connection):
assert clean_acked == 4
all_queue = await pg_connection.fetch(
f"""
SELECT * from {new_queue._queue_table_name} ORDER BY q_id
"""
)
assert all_queue == []
all_requests = await pg_connection.fetch(
f"""
SELECT * from {new_queue._requests_table_name}
"""
)
assert [list(r)[:2] for r in all_requests] == [[1, 'done']]

async def test_clean_acked_with_limit(self, new_queue, clean_acked_with_limit, pg_connection):
assert clean_acked_with_limit == 4
all_queue = await pg_connection.fetch(
f"""
SELECT * from {new_queue._queue_table_name} ORDER BY q_id
"""
)
assert all_queue == []
all_requests = await pg_connection.fetch(
f"""
SELECT * from {new_queue._requests_table_name}
"""
)
assert [list(r)[:2] for r in all_requests] == [[1, 'done']]

0 comments on commit 2daccd9

Please sign in to comment.