Skip to content

Commit

Permalink
limits in clearning
Browse files Browse the repository at this point in the history
  • Loading branch information
maximdanilchenko committed Oct 15, 2018
1 parent b3d8303 commit 7d9831e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 12 deletions.
41 changes: 30 additions & 11 deletions async_pq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

from asyncpg import Connection

DELETE_LIMIT_SM = 100
DELETE_LIMIT_BIG = 10000


class Queue:
def __init__(self, name: str, connection: Connection):
Expand Down Expand Up @@ -85,25 +88,41 @@ async def unack(self, request_id: int) -> bool:
return True
return False

async def return_unacked(self, timeout: int) -> None:
async def return_unacked(self, timeout: int, limit: int=DELETE_LIMIT_SM) -> int:
""" Delete unacked request (queue entities will be with request_id=NULL) """
await self._connection.execute(
return await self._connection.fetchval(
f"""
DELETE FROM {self._requests_table_name}
WHERE r_status='wait' AND created_at < current_timestamp - $1::interval
WITH deleted AS (
DELETE FROM {self._requests_table_name}
WHERE ctid IN (
SELECT ctid
FROM {self._requests_table_name}
WHERE r_status='wait' AND created_at < current_timestamp - $1::interval
LIMIT $2
)
RETURNING *
)
SELECT count(*) FROM deleted
""",
dt.timedelta(seconds=timeout),
limit,
)

async def clean_acked_queue(self) -> None:
async def clean_acked_queue(self, limit: int=DELETE_LIMIT_BIG) -> int:
""" Delete acked queue entities (request will not be deleted) """
await self._connection.execute(
return await self._connection.fetchval(
f"""
DELETE FROM {self._queue_table_name}
WHERE q_request_id in (
SELECT r_id FROM {self._requests_table_name} where r_status='done'
)
"""
WITH deleted AS (
DELETE FROM {self._queue_table_name}
WHERE q_request_id in (
SELECT r_id FROM {self._requests_table_name} where r_status='done'
LIMIT $1
)
RETURNING *
)
SELECT count(*) FROM deleted
""",
limit
)


Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def readme(fname):

setup(
name='async-pq',
version='0.2.1',
version='0.2.2',
description='Python async api for creating and managing queues in postgres',
long_description=readme('README.md'),
long_description_content_type="text/markdown",
Expand Down

0 comments on commit 7d9831e

Please sign in to comment.