Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V1.2 #6

Merged
merged 11 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

# -- Project information -----------------------------------------------------

project = 'Purse'
copyright = '2021, Plataux LLC'
author = 'Plataux Tech'
project = 'Redis Purse'
author = 'MK'

# The full version, including alpha/beta/rc tags
release = '0.1'
release = '1.2'


# -- General configuration ---------------------------------------------------
Expand Down
377 changes: 186 additions & 191 deletions poetry.lock

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

[tool.poetry]
name = "redis-purse"
version = "1.1.0"
version = "1.2.0"
description = "High Level Asyncio interface to redis"
license = "Apache-2.0"
authors = ["mk <mk@plataux.com>"]
Expand All @@ -26,15 +26,15 @@ packages = [
[tool.poetry.dependencies]
python = ">=3.8,<4"
pydantic = "^1.10"
redis = "^4.5"
redis = "^4.6"

[tool.poetry.dev-dependencies]
flake8 = "^4.0"
tox = "^3.24"
pytest = "^7.3"
pytest-cov = "^4.0"
mypy = "^1.2"
sphinx = "^4.4"
tox = "^3.28"
pytest = "^7.4"
pytest-cov = "^4.1"
mypy = "^1.5"
sphinx = "^4.5"
sphinx-rtd-theme = "^1.2"
recommonmark = "^0.7"

Expand Down
68 changes: 52 additions & 16 deletions src/purse/collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class RedisKeySpace(Generic[T]):
__slots__ = ('redis', 'prefix', '_value_type')

def __init__(self, redis: Redis, prefix: str, value_type: Type[T]):
if value_type not in (str, bytes, dict) and not issubclass(value_type, BaseModel):
raise TypeError(f'Invalid value_type {value_type}')
self.redis = redis
self.prefix = prefix
self._value_type: Type[T] = value_type
Expand All @@ -140,7 +142,7 @@ async def set(self, key: str, value: T, ex: int | timedelta | None = None,
:param xx: Only set the key if it already exists.
:param keepttl: Retain the time to live associated with the key.
:return: True if key was set, False if not
:raises ValueError: if the type mismatches the Generic[T] or is None
:raises ValueError: raised the error if the type mismatches the Generic[T] or is None

"""
args: Any = [self.prefix + key, _obj_to_raw(self._value_type, value)]
Expand Down Expand Up @@ -300,7 +302,7 @@ async def pexpire(self, key, millis: int | timedelta):
async def persist(self, key):
"""
Remove the existing timeout on key, turning the key from volatile
(a key with an expire set) to persistent (a key that will never expire as no timeout is associated).
(a key with an expiration set) to persistent (a key that will never expire as no timeout is associated).

:param key:
"""
Expand Down Expand Up @@ -441,6 +443,8 @@ class RedisHash(Generic[T], RedisKey):
__slots__ = ('_value_type',)

def __init__(self, redis: Redis, rkey, value_type: Type[T]):
if value_type not in (str, bytes, dict) and not issubclass(value_type, BaseModel):
raise TypeError(f'Invalid value_type {value_type}')
super().__init__(redis, rkey)
self._value_type: Type[T] = value_type

Expand Down Expand Up @@ -632,15 +636,17 @@ async def _typed_iter():
else:
yield k3, v3

else:

elif issubclass(self._value_type, bytes):
async def _typed_iter():
async for k4, v4 in raw_it:
if isinstance(v4, str):
yield k4.decode(), v4.encode()
else:
yield k4.decode(), v4

else:
assert False

_item_iter: AsyncIterator[Tuple[str, T]] = _typed_iter()

return _item_iter
Expand Down Expand Up @@ -675,6 +681,8 @@ class RedisSet(Generic[T], RedisKey):
__slots__ = ('_value_type',)

def __init__(self, redis: Redis, rkey: str, value_type: Type[T]):
if value_type not in (str, bytes, dict) and not issubclass(value_type, BaseModel):
raise TypeError(f'Invalid value_type {value_type}')
super().__init__(redis, rkey)
self._value_type: Type[T] = value_type

Expand Down Expand Up @@ -747,6 +755,8 @@ class RedisSortedSet(Generic[T], RedisKey):
__slots__ = ('_value_type',)

def __init__(self, redis: Redis, rkey: str, value_type: Type[T]):
if value_type not in (str, bytes, dict) and not issubclass(value_type, BaseModel):
raise TypeError(f'Invalid value_type {value_type}')
super().__init__(redis, rkey)
self._value_type: Type[T] = value_type

Expand Down Expand Up @@ -793,7 +803,7 @@ async def score_multi(self, members: List[T]) -> List[Tuple[T, float]]:
"""
provide the score of a single SortedSet member, or multiple members at once.

aioredis 2.0 doesn't implement the ZMSCORE command yet, so we invoking them
aioredis 2.0 doesn't implement the ZMSCORE command yet, so we invoke them
in a pipeline instead

:param members:
Expand Down Expand Up @@ -942,6 +952,8 @@ class RedisList(Generic[T], RedisKey):
__slots__ = ('_value_type',)

def __init__(self, redis: Redis, rkey: str, value_type: Type[T]):
if value_type not in (str, bytes, dict) and not issubclass(value_type, BaseModel):
raise TypeError(f'Invalid value_type {value_type}')
super().__init__(redis, rkey)
self._value_type: Type[T] = value_type

Expand Down Expand Up @@ -1032,6 +1044,18 @@ async def slice(self, start: int, stop: int) -> List[T]:
return _list_from_raw(self._value_type, raw_res)

def values(self, batch_size: Union[int, None] = 10) -> AsyncIterator[T]:
"""
iterate over the list, yielding the values

example usage:

async for value in redis_list.values():
print(value)


:param batch_size:
:return:
"""

async def _typed_iter():
if not batch_size or (list_len := await self.len()) <= batch_size:
Expand Down Expand Up @@ -1069,6 +1093,8 @@ class RedisQueue(Generic[T], RedisKey):
__slots__ = ('_value_type',)

def __init__(self, redis: Redis, rkey: str, value_type: Type[T]):
if value_type not in (str, bytes, dict) and not issubclass(value_type, BaseModel):
raise TypeError(f'Invalid value_type {value_type}')
super().__init__(redis, rkey)
self._value_type: Type[T] = value_type

Expand All @@ -1077,12 +1103,13 @@ async def put(self, item: T):

async def get(self, timeout: float = 0) -> T:
t: Any = timeout
_, res = await self.redis.brpop(self.rkey, timeout=t)

if res is None:
output = await self.redis.brpop(self.rkey, timeout=t)

if output is None:
raise asyncio.QueueEmpty("RedisQueue Empty")

return _obj_from_raw(self._value_type, res)
return _obj_from_raw(self._value_type, output[1])

async def get_nowait(self) -> T:
res = await self.redis.rpop(self.rkey)
Expand All @@ -1104,6 +1131,8 @@ class RedisLifoQueue(Generic[T], RedisKey):
__slots__ = ('_value_type',)

def __init__(self, redis: Redis, rkey: str, value_type: Type[T]):
if value_type not in (str, bytes, dict) and not issubclass(value_type, BaseModel):
raise TypeError(f'Invalid value_type {value_type}')
super().__init__(redis, rkey)
self._value_type: Type[T] = value_type

Expand All @@ -1112,12 +1141,12 @@ async def put(self, item: T):

async def get(self, timeout: float = 0) -> T:
t: Any = timeout
_, res = await self.redis.brpop(self.rkey, timeout=t)
output = await self.redis.brpop(self.rkey, timeout=t)

if res is None:
if output is None:
raise asyncio.QueueEmpty("RedisQueue Empty")

return _obj_from_raw(self._value_type, res)
return _obj_from_raw(self._value_type, output[1])

async def get_nowait(self) -> T:
res = await self.redis.rpop(self.rkey)
Expand All @@ -1139,21 +1168,28 @@ class RedisPriorityQueue(Generic[T], RedisKey):
__slots__ = ('_value_type',)

def __init__(self, redis: Redis, rkey: str, value_type: Type[T]):
if value_type not in (str, bytes, dict) and not issubclass(value_type, BaseModel):
raise TypeError(f'Invalid value_type {value_type}')
super().__init__(redis, rkey)
self._value_type: Type[T] = value_type

async def put(self, item: Tuple[T, int]):
raw = str(_obj_to_raw(self._value_type, item[0]))
return await self.redis.zadd(self.rkey, {f"{uuid4()}:{raw}": item[1]})
raw: str | bytes = _obj_to_raw(self._value_type, item[0])

_u = str(uuid4())

_prefix = _u if isinstance(raw, str) else _u.encode()

return await self.redis.zadd(self.rkey, {_prefix + raw: item[1]})

async def get(self, timeout: float = 0) -> Tuple[T, int]:
t: Any = timeout
res = await self.redis.bzpopmin(self.rkey, timeout=t)
output = await self.redis.bzpopmin(self.rkey, timeout=t)

if res is None:
if output is None:
raise asyncio.QueueEmpty("RedisQueue Empty")

return _obj_from_raw(self._value_type, res[1][37:]), int(res[2])
return _obj_from_raw(self._value_type, output[1][36:]), int(output[2])

async def get_nowait(self) -> Tuple[T, int]:
_, res = await self.redis.zpopmin(self.rkey)
Expand Down
57 changes: 57 additions & 0 deletions tests/test_redlock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@


from purse.redlock import Redlock
from purse import RedisList
import asyncio
from random import random

import pytest
from ctx import Context


@pytest.fixture(scope="session")
def ctx() -> Context:
ctx = Context()
yield ctx


def test_redlock(ctx):
async def do_job(n):

rlock = Redlock("redlock:list_lock", [ctx.redis_conn])
rlist = RedisList(ctx.redis_conn, "redis_list", str)

for x in range(n):
async with rlock:
cl = await rlist.len()

if cl == 0:
await rlist.append("0")
current_num = 0
else:
current_num = int(await rlist.getitem(-1))

# This sleep simulates the processing time of the job - up to 100ms here
await asyncio.sleep(0.1 * random())

# Get the job done, which is add 1 to the last number
current_num += 1

print(f"the task {asyncio.current_task().get_name()} working on item #: {current_num}")

await rlist.append(str(current_num))

async def main():
rlist = RedisList(ctx.redis_conn, "redis_list", str)
await rlist.clear()

# run 10 async threads (or tasks) in parallel, each one to perform 10 increments
await asyncio.gather(
*[asyncio.create_task(do_job(10)) for _ in range(10)]
)

assert [x async for x in rlist] == [str(x) for x in range(101)]

return "success"

asyncio.run(main())
Loading