Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zrangebyscore returning wrong type. #3144

Open
jeanluciano opened this issue Feb 13, 2024 · 0 comments
Open

zrangebyscore returning wrong type. #3144

jeanluciano opened this issue Feb 13, 2024 · 0 comments

Comments

@jeanluciano
Copy link

Thanks for wanting to report an issue you've found in redis-py. Please delete this text and fill in the template below.
It is of course not always possible to reduce your code to a small test case, but it's highly appreciated to have as much data as possible. Thank you!

Version: Happening in redis version 6 and 7 using redis-py version redis[hiredis]==4.3.4

Platform: Python version 3.8 - 3.11 GHA runner used https://github.com/actions/runner-images/blob/ubuntu22/20240204.1/images/ubuntu/Ubuntu2204-Readme.md

Description:
I've been working on adding limited support to the watch and transactions to redis cluster. Right now we have a flaky test that fails because zrangebyscore sometimes returns a int, more specifically a 0, instead of a list. We are not able to reproduce this locally and only happens during our CI process in GHA. The test fails about half the time which makes to reproduce. I'm more interested in reasons why zrangebyscore would returns a 0 instead of a list since when it works and the set is empty it still returns and empty list.

Test in question

async def test_mung(arq_redis_cluster: ArqRedisCluster, cluster_worker):
    """
    check a job can't be enqueued multiple times with the same id
    """
    counter = Counter()

    async def count(ctx, v):
        counter[v] += 1

    tasks = []
    for i in range(50):
        tasks += [
            arq_redis_cluster.enqueue_job('count', i, _job_id=f'v-{i}'),
            arq_redis_cluster.enqueue_job('count', i, _job_id=f'v-{i}'),
        ]
    shuffle(tasks)
 
    await asyncio.gather(*tasks)
    await worker.main()
    assert counter.most_common(1)[0][1] == 1  # no job go enqueued twice
  async def enqueue_job(
        self,
        function: str,
        *args: Any,
        _job_id: Optional[str] = None,
        _queue_name: Optional[str] = None,
        _defer_until: Optional[datetime] = None,
        _defer_by: Union[None, int, float, timedelta] = None,
        _expires: Union[None, int, float, timedelta] = None,
        _job_try: Optional[int] = None,
        **kwargs: Any,
    ) -> Optional[Job]:

        if _queue_name is None:
            _queue_name = self.default_queue_name
        job_id = _job_id or uuid4().hex
        job_key = job_key_prefix + job_id
        assert not (_defer_until and _defer_by), "use either 'defer_until' or 'defer_by' or neither, not both"

        defer_by_ms = to_ms(_defer_by)
        expires_ms = to_ms(_expires)

        async with self.pipeline(transaction=True) as pipe:
            await pipe.watch(job_key)
            if await pipe.exists(job_key, result_key_prefix + job_id):
                await pipe.reset()
                return None

            score = timestamp_ms()
            expires_ms = expires_ms or score - enqueue_time_ms + self.expires_extra_ms

            job = serialize_job(function, args, kwargs, _job_try, enqueue_time_ms, serializer=self.job_serializer)
            pipe.multi()
            pipe.psetex(job_key, expires_ms, job)  # type: ignore[no-untyped-call]
            pipe.zadd("queue_name", {job_id: score})  # type: ignore[unused-coroutine]
            try:
                await pipe.execute()
            except WatchError:
                # job got enqueued since we checked 'job_exists'
                return None

        the_job = Job(job_id, redis=self, _queue_name=_queue_name, _deserializer=self.job_deserializer)
        return the_job
 async def main(self) -> None:
        if self._pool is None:
            self._pool = await create_pool(
                self.redis_settings,
                job_deserializer=self.job_deserializer,
                job_serializer=self.job_serializer,
                default_queue_name=self.queue_name,
                expires_extra_ms=self.expires_extra_ms,
            )
        
        logger.info('Starting worker for %d functions: %s', len(self.functions), ', '.join(self.functions))
       
        self.ctx['redis'] = self.pool
        if self.on_startup:
            await self.on_startup(self.ctx)

        async for _ in poll(self.poll_delay_s):  # noqa F841
            await self._poll_iteration()

            if self.burst:
                if 0 <= self.max_burst_jobs <= self._jobs_started():
                    await asyncio.gather(*self.tasks.values())
                    return None
                queued_jobs = await self.pool.zcard(self.queue_name)
                if queued_jobs == 0:
                    await asyncio.gather(*self.tasks.values())
                    return None

    async def _poll_iteration(self) -> None:
        """
        Get ids of pending jobs from the main queue sorted-set data structure and start those jobs, remove
        any finished tasks from self.tasks.
        """
        count = self.queue_read_limit
        if self.burst and self.max_burst_jobs >= 0:
            burst_jobs_remaining = self.max_burst_jobs - self._jobs_started()
            if burst_jobs_remaining < 1:
                return
            count = min(burst_jobs_remaining, count)
        if self.allow_pick_jobs:
            if self.job_counter < self.max_jobs:
                now = timestamp_ms()
                job_ids = await self.pool.zrangebyscore( ## This returns an int sometimes.##
                    self.queue_name, min=float('-inf'), start=self._queue_read_offset, num=count, max=now
                )

                await self.start_jobs(job_ids) ##fails here as we are expecting the list.##
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant