Skip to content

Commit

Permalink
add eaxamples badges and bump to 0.1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
mosquito committed Jun 9, 2023
1 parent 623e473 commit 5a43d6f
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 16 deletions.
70 changes: 70 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,77 @@
[![PyPI - License](https://img.shields.io/pypi/l/patio-redis)](https://pypi.org/project/patio-redis) [![Wheel](https://img.shields.io/pypi/wheel/patio-redis)](https://pypi.org/project/patio-redis) [![Mypy](http://www.mypy-lang.org/static/mypy_badge.svg)]() [![PyPI](https://img.shields.io/pypi/v/patio-redis)](https://pypi.org/project/patio-redis) [![PyPI](https://img.shields.io/pypi/pyversions/patio-redis)](https://pypi.org/project/patio-redis) [![Coverage Status](https://coveralls.io/repos/github/patio-python/patio-redis/badge.svg?branch=master)](https://coveralls.io/github/patio-python/patio-redis?branch=master) ![tox](https://github.com/patio-python/patio-redis/workflows/tests/badge.svg?branch=master)

PATIO Redis
===========

PATIO is an acronym for **P**ython **A**synchronous **T**ask for Async**IO**.

This package provides Redis broker implementation.

Example
-------

### Worker

```python
import asyncio
import operator
from functools import reduce

from patio import Registry, ThreadPoolExecutor

from patio_redis import RedisBroker


rpc = Registry(project="test", strict=True)


@rpc("mul")
def mul(*args):
return reduce(operator.mul, args)


async def main():
async with ThreadPoolExecutor(rpc, max_workers=16) as executor:
async with RedisBroker(
executor, url="redis://127.0.0.1:6379", max_connections=50,
) as broker:
await broker.join()


if __name__ == "__main__":
asyncio.run(main())

```

### Producer

```python
import asyncio

from patio import NullExecutor, Registry

from patio_redis import RedisBroker


rpc = Registry(project="test", strict=True)


async def main():
async with NullExecutor(rpc) as executor:
async with RedisBroker(
executor, url="redis://127.0.0.1/", max_connections=50,
) as broker:
for i in range(50):
print(
await asyncio.gather(
*[
broker.call("mul", i, j, timeout=1)
for j in range(200)
]
),
)


if __name__ == "__main__":
asyncio.run(main())
```
4 changes: 3 additions & 1 deletion examples/multiplication-producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio

from patio import NullExecutor, Registry

from patio_redis import RedisBroker


Expand All @@ -9,7 +11,7 @@
async def main():
async with NullExecutor(rpc) as executor:
async with RedisBroker(
executor, url="redis://127.0.0.1/", max_connections=50
executor, url="redis://127.0.0.1/", max_connections=50,
) as broker:
for i in range(50):
print(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "patio-redis"
version = "0.1.0"
version = "0.1.1"
description = "Redis broker implementation for PATIO"
authors = ["Dmitry Orlov <me@mosquito.su>"]
license = "MIT"
Expand Down
29 changes: 15 additions & 14 deletions tests/test_broker.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
import asyncio
import operator
import os
import time
from functools import reduce
from typing import AsyncGenerator, Union, Any, Callable
import operator
from typing import Any, AsyncGenerator, Callable, Union

import pytest
from patio import ThreadPoolExecutor, Registry, NullExecutor
from patio import NullExecutor, Registry, ThreadPoolExecutor

from patio_redis import RedisBroker


rpc: Registry[Callable[..., Any]] = Registry(project="test", strict=True)


@rpc('mul')
@rpc("mul")
def mul(*args: Union[int, float]) -> Union[int, float]:
return reduce(operator.mul, args)


@rpc('div')
@rpc("div")
def div(*args: Union[int, float]) -> Union[int, float]:
return reduce(operator.truediv, args)


@rpc('sleeper')
@rpc("sleeper")
def sleeper(interval: int) -> None:
return time.sleep(interval)

Expand All @@ -34,12 +35,12 @@ async def thread_executor() -> AsyncGenerator[Any, ThreadPoolExecutor]:
yield executor


REDIS_URL = os.getenv('REDIS_URL', "redis://127.0.0.1:6379/")
REDIS_URL = os.getenv("REDIS_URL", "redis://127.0.0.1:6379/")


async def test_simple(thread_executor: ThreadPoolExecutor):
async with RedisBroker(
thread_executor, url=REDIS_URL, max_connections=10
thread_executor, url=REDIS_URL, max_connections=10,
) as broker:
assert await broker.call(mul, 1, 2, 3, 4, 5) == 120

Expand All @@ -52,19 +53,19 @@ async def test_simple(thread_executor: ThreadPoolExecutor):

async def test_simple_split(thread_executor: ThreadPoolExecutor):
async with RedisBroker(
thread_executor, url=REDIS_URL, max_connections=10
thread_executor, url=REDIS_URL, max_connections=10,
):
async with NullExecutor(
Registry(project="test", strict=True)
Registry(project="test", strict=True),
) as executor:
async with RedisBroker(
executor, url=REDIS_URL, max_connections=10
executor, url=REDIS_URL, max_connections=10,
) as producer:

assert await producer.call('mul', 1, 2, 3, 4, 5) == 120
assert await producer.call("mul", 1, 2, 3, 4, 5) == 120

with pytest.raises(ZeroDivisionError):
assert await producer.call('div', 1, 2, 3, 4, 0)
assert await producer.call("div", 1, 2, 3, 4, 0)

with pytest.raises(asyncio.TimeoutError):
assert await producer.call('sleeper', 2, timeout=1)
assert await producer.call("sleeper", 2, timeout=1)

0 comments on commit 5a43d6f

Please sign in to comment.