Skip to content

Commit

Permalink
black
Browse files Browse the repository at this point in the history
  • Loading branch information
maximdanilchenko committed Aug 10, 2018
1 parent 0f0ad3f commit 9d2337a
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions async_pq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async def put(self, *entities: str) -> None:
zip(entities),
)

async def pop(self, limit: int=1, with_ack: bool=True) -> Tuple[int, list]:
async def pop(self, limit: int = 1, with_ack: bool = True) -> Tuple[int, list]:
"""
Get <limit> records from queue.
If with_ack == True, then it needs acknowledgement
Expand All @@ -34,8 +34,9 @@ async def pop(self, limit: int=1, with_ack: bool=True) -> Tuple[int, list]:
RETURNING r_id
"""
)
data = await self._connection.fetch(
f"""
data = (
await self._connection.fetch(
f"""
UPDATE {self._queue_table_name}
SET q_request_id=$1
WHERE q_id IN (
Expand All @@ -48,9 +49,11 @@ async def pop(self, limit: int=1, with_ack: bool=True) -> Tuple[int, list]:
)
RETURNING q_data;
""",
request_id,
limit,
) or []
request_id,
limit,
)
or []
)
if not data or not with_ack:
await self.ack(request_id)
return request_id, [i[0] for i in data]
Expand Down Expand Up @@ -89,7 +92,7 @@ async def return_unacked(self, timeout: int) -> None:
DELETE FROM {self._requests_table_name}
WHERE r_status='wait' AND created_at < current_timestamp - $1::interval
""",
dt.timedelta(seconds=timeout)
dt.timedelta(seconds=timeout),
)

async def clean_acked_queue(self) -> None:
Expand Down Expand Up @@ -147,4 +150,3 @@ async def find_queue(self, name: str) -> Queue:
if not await self.is_exists_queue(name):
await self._new_queue(name)
return Queue(name, self._connection)

0 comments on commit 9d2337a

Please sign in to comment.