Skip to content

Commit

Permalink
readme
Browse files Browse the repository at this point in the history
  • Loading branch information
maximdanilchenko committed Oct 16, 2018
1 parent 86596d3 commit 1ba08c5
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 13 deletions.
6 changes: 3 additions & 3 deletions async_pq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from asyncpg import Connection

DELETE_LIMIT_SM = 1000 # limit of requests deleted in one query
DELETE_LIMIT = 1000 # limit of requests deleted in one query


class Queue:
Expand Down Expand Up @@ -87,7 +87,7 @@ async def unack(self, request_id: int) -> bool:
return True
return False

async def return_unacked(self, timeout: int, limit: int=DELETE_LIMIT_SM) -> int:
async def return_unacked(self, timeout: int, limit: int=DELETE_LIMIT) -> int:
""" Delete unacked request (queue entities will be with request_id=NULL) """
return await self._connection.fetchval(
f"""
Expand All @@ -107,7 +107,7 @@ async def return_unacked(self, timeout: int, limit: int=DELETE_LIMIT_SM) -> int:
limit,
)

async def clean_acked_queue(self, limit: int=DELETE_LIMIT_SM) -> int:
async def clean_acked_queue(self, limit: int=DELETE_LIMIT) -> int:
""" Delete acked queue entities (request will not be deleted) """
return await self._connection.fetchval(
f"""
Expand Down
30 changes: 20 additions & 10 deletions docs/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
Python async api for creating and managing queues in postgres

[![Travis CI](https://travis-ci.org/maximdanilchenko/async-pq.svg?branch=master)](https://travis-ci.org/maximdanilchenko/async-pq)
[![PyPI version](https://badge.fury.io/py/async-pq.svg)](https://badge.fury.io/py/async-pq)
[![Documentation Status](https://readthedocs.org/projects/async-pq/badge/?version=latest)](https://async-pq.readthedocs.io/en/latest/?badge=latest)
[![codecov](https://codecov.io/gh/maximdanilchenko/async-pq/branch/master/graph/badge.svg)](https://codecov.io/gh/maximdanilchenko/async-pq)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/ambv/black)

Postgres is not best solution for storing and managing queues,
but sometimes we have no choice.
Can work with millions of entities and thousands of
requests in one queue. Production ready.

## Install
```
> pip install async-pq
Expand All @@ -18,27 +26,29 @@ conn = await asyncpg.connect('postgresql://postgres@localhost/test')
```

```QueueFabric.find_queue``` method will create needed
tables in database if it is new queue.
tables (one for requests and one for messages) in database if it is new queue.
Also it has ```is_exists_queue``` method for situations when you
need to know that it will be the new queue.
need to know that it is exists or not and will be the new queue.
```python
from async_pq import Queue, QueueFabric

queue: Queue = await QueueFabric(conn).find_queue('items')
```
## Operations with queue
Put new items (dumped JSONs) in queue:
Put new items (should be dumped JSONs) in queue:
```python
await queue.put('{"id":1,"data":[1,2,3]}', '{"id":2,"data":[3,2,6]}')
```

Pop items from queue with some ```limit``` (it is possible to use acknowledge pattern):
Pop items from queue with some ```limit```.
It will create one request and return its id.
It is useful when you want to use acknowledgement pattern:
```python
# If with_ack=False (default from > 0.2.1), massage will be acknowledged in place automatically
request_id, data = await queue.pop(limit=2, with_ack=True)
```

Acknowledge request:
To acknowledge request use ```ack``` method:
```python
# returns False if request does not found or acked already
is_acked: bool = await queue.ack(request_id)
Expand All @@ -50,14 +60,14 @@ Or vice versa:
is_unacked: bool = await queue.unack(request_id)
```

Return to queue all unacknowledged massages older than ```timeout``` seconds
(default limit=1000 entities):
You can return unacknowledged massages older than ```timeout``` seconds
(default limit=1000 requests) to queue:
```python
await queue.return_unacked(timeout=300)
requests_number = await queue.return_unacked(timeout=300, limit=500)
```

Clean queue (delete acknowledged massages) to not overfill database with old data
(default limit=1000 entities):
(default limit= messages of 1000 requests):
```python
await queue.clean_acked_queue()
requests_number = await queue.clean_acked_queue(limit=500)
```

0 comments on commit 1ba08c5

Please sign in to comment.