Skip to content

Commit

Permalink
fix(worker): set blockTimeout as 0.001 when reach the time to get del…
Browse files Browse the repository at this point in the history
…ayed jobs [python] (#2478)
  • Loading branch information
roggervalf committed Mar 19, 2024
1 parent 7bf6418 commit b385034
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 29 deletions.
20 changes: 0 additions & 20 deletions docs/gitbook/python/changelog.md
Expand Up @@ -5,26 +5,6 @@
## v2.3.0 (2024-03-16)
### Feature
* **job:** Add log method [python] (#2476) ref #2472 ([`34946c4`](https://github.com/taskforcesh/bullmq/commit/34946c4b29cc9e7d5ae81f8fd170a2e539ac6279))
* **job:** Add removeChildDependency method ([#2435](https://github.com/taskforcesh/bullmq/issues/2435)) ([`1151022`](https://github.com/taskforcesh/bullmq/commit/1151022e4825fbb20cf1ef6ce1ff3e7fe929de5c))
* **worker:** Add support for naming workers ([`7ba2729`](https://github.com/taskforcesh/bullmq/commit/7ba27293615e443903cfdf7d0ff8be0052d061c4))
* **flow:** Add ignoreDependencyOnFailure option ([#2426](https://github.com/taskforcesh/bullmq/issues/2426)) ([`c7559f4`](https://github.com/taskforcesh/bullmq/commit/c7559f4f0a7fa51764ad43b4f46bb9d55ac42d0d))

### Fix
* Move fast-glob and minimatch as dev-dependencies ([#2452](https://github.com/taskforcesh/bullmq/issues/2452)) ([`cf13b31`](https://github.com/taskforcesh/bullmq/commit/cf13b31ca552bcad53f40fe5668a907cf02e0a2e))
* **worker:** Set blockTimeout as 0.001 when reach the time to get delayed jobs (#2455) fixes #2450 ([`2de15ca`](https://github.com/taskforcesh/bullmq/commit/2de15ca1019517f7ce11f3734fff316a3e4ab894))
* **deps:** Replaced glob by fast-glob due to security advisory ([`91cf9a9`](https://github.com/taskforcesh/bullmq/commit/91cf9a9253370ea76df48c27a7e0fcf8d7504c81))
* **sandbox:** Extend SandboxedJob from JobJsonSandbox (#2446) fixes #2439 ([`7606e36`](https://github.com/taskforcesh/bullmq/commit/7606e3611f1cc18b1585c08b0f7fd9cb90749c9c))
* **add-job:** Fix parent job cannot be replaced error message ([#2441](https://github.com/taskforcesh/bullmq/issues/2441)) ([`1e9a13f`](https://github.com/taskforcesh/bullmq/commit/1e9a13fc0dc9de810ef75a042fbfeeae5b571ffe))
* **flow:** Remove failed children references on auto removal ([#2432](https://github.com/taskforcesh/bullmq/issues/2432)) ([`8a85207`](https://github.com/taskforcesh/bullmq/commit/8a85207cf3c552ebab37baca3c395821b9804b37))
* **redis-connection:** Close redis connection even when initializing (#2425) fixes #2385 ([`1bc26a6`](https://github.com/taskforcesh/bullmq/commit/1bc26a64871b85a2d1f6799a9b73b60f8bf9fa90))

### Documentation
* **pro:** Update changelog with v7.3.0 ([#2474](https://github.com/taskforcesh/bullmq/issues/2474)) ([`efab7af`](https://github.com/taskforcesh/bullmq/commit/efab7af98b3dd1cff070794e4cbb23133449ddcf))
* Fix typo in elasticache section ([#2471](https://github.com/taskforcesh/bullmq/issues/2471)) ([`2436c09`](https://github.com/taskforcesh/bullmq/commit/2436c09a38890dc58dc5c4189bc1cffe98249bef))
* **bullmq-pro:** Update changelog with v7 ([#2470](https://github.com/taskforcesh/bullmq/issues/2470)) ([`63c01e8`](https://github.com/taskforcesh/bullmq/commit/63c01e84d375fde372bbff4849cd8c37c6bb0177))
* Add infisical in usedBy ([`ada23fa`](https://github.com/taskforcesh/bullmq/commit/ada23fa3e6ff0d38d281c9e947ee43f1f57812a2))
* **guide:** Fix queueEvents.on failed description ([#2420](https://github.com/taskforcesh/bullmq/issues/2420)) ([`415f389`](https://github.com/taskforcesh/bullmq/commit/415f389e41ae692a6e8c3da90fdb5d58a58985eb))
* **guide:** Fix markdown syntax in prioritized jobs ([#2421](https://github.com/taskforcesh/bullmq/issues/2421)) ([`4c54873`](https://github.com/taskforcesh/bullmq/commit/4c548735d3c2489f2bdc1cfa870703db6f11f752))

## v2.2.4 (2024-02-13)
### Fix
Expand Down
11 changes: 10 additions & 1 deletion python/bullmq/redis_connection.py
Expand Up @@ -7,6 +7,7 @@
TimeoutError
)
import warnings
from bullmq.utils import isRedisVersionLowerThan

class RedisConnection:
"""
Expand All @@ -16,6 +17,10 @@ class RedisConnection:
minimum_version = '5.0.0'
recommended_minimum_version = '6.2.0'

capabilities = {
"canDoubleTimeout": False
}

def __init__(self, redisOpts: dict | str = {}):
self.version = None
retry = Retry(ExponentialBackoff(), 3)
Expand Down Expand Up @@ -56,4 +61,8 @@ async def getRedisVersion(self):
warnings.warn(f'IMPORTANT! Eviction policy is {doc.get("maxmemory_policy")}. It should be "noeviction"')

self.version = doc.get("redis_version")
return doc.get("redis_version")

self.capabilities = {
"canDoubleTimeout": not isRedisVersionLowerThan(self.version, '6.0.0')
}
return self.version
26 changes: 18 additions & 8 deletions python/bullmq/worker.py
Expand Up @@ -7,13 +7,16 @@
from bullmq.job import Job
from bullmq.timer import Timer
from bullmq.types import WorkerOptions
from bullmq.utils import isRedisVersionLowerThan, extract_result
from bullmq.utils import extract_result

import asyncio
import traceback
import time
import math

maximum_block_timeout = 10
minimum_block_timeout = 0.001

class Worker(EventEmitter):
def __init__(self, name: str, processor: Callable[[Job, str], asyncio.Future], opts: WorkerOptions = {}):
super().__init__()
Expand Down Expand Up @@ -146,14 +149,11 @@ def nextJobFromJobData(self, job_data = None, job_id: str = None, limit_until: i
return job_instance

async def waitForJob(self):
timeout = max(min(self.blockUntil - int(time.time() * 1000)
if self.blockUntil else 5000, 5000) / 1000, 0.00001)

redis_version = await self.blockingRedisConnection.getRedisVersion()
# Only Redis v6.0.0 and above supports doubles as block time
timeout = int(math.ceil(timeout)) if isRedisVersionLowerThan(redis_version, '6.0.0') else timeout
block_timeout = self.getBlockTimeout(self.blockUntil)
block_timeout = block_timeout if self.blockingRedisConnection.capabilities.get("canDoubleTimeout", False) else math.ceil(block_timeout)
block_timeout = min(block_timeout, maximum_block_timeout)

result = await self.bclient.bzpopmin(self.scripts.keys["marker"], timeout)
result = await self.bclient.bzpopmin(self.scripts.keys["marker"], block_timeout)
if result:
[_key, member, score] = result

Expand All @@ -163,6 +163,16 @@ async def waitForJob(self):
return 0
return 0

def getBlockTimeout(self, block_until: int):
if block_until:
block_delay = block_until - int(time.time() * 1000)
if block_delay < 1:
return minimum_block_timeout
else:
return block_delay / 1000
else:
return max(self.opts.get("drainDelay", 5), minimum_block_timeout)

async def processJob(self, job: Job, token: str):
try:
self.jobs.add((job, token))
Expand Down

0 comments on commit b385034

Please sign in to comment.