Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/taskforcesh/bullmq into f…
Browse files Browse the repository at this point in the history
…ix/force-timeout-for-bzpopmin
  • Loading branch information
manast committed Apr 29, 2024
2 parents a7fa029 + 8020f99 commit dbb7f48
Show file tree
Hide file tree
Showing 9 changed files with 348 additions and 163 deletions.
12 changes: 12 additions & 0 deletions docs/gitbook/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
## [5.7.6](https://github.com/taskforcesh/bullmq/compare/v5.7.5...v5.7.6) (2024-04-27)


### Bug Fixes

* **redis-connection:** increase redis retry strategy backoff ([#2546](https://github.com/taskforcesh/bullmq/issues/2546)) [python] ([6cf7712](https://github.com/taskforcesh/bullmq/commit/6cf77122da845e5b0afa1607348cf06602679329))


### Performance Improvements

* **worker:** do not call bzpopmin when blockDelay is lower or equal 0 ([#2544](https://github.com/taskforcesh/bullmq/issues/2544)) ref [#2466](https://github.com/taskforcesh/bullmq/issues/2466) ([9760b85](https://github.com/taskforcesh/bullmq/commit/9760b85dfbcc9b3c744f616961ef939e8951321d))

## [5.7.5](https://github.com/taskforcesh/bullmq/compare/v5.7.4...v5.7.5) (2024-04-24)


Expand Down
411 changes: 293 additions & 118 deletions docs/gitbook/python/changelog.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bullmq",
"version": "5.7.5",
"version": "5.7.6",
"description": "Queue for messages and jobs based on Redis",
"homepage": "https://bullmq.io/",
"main": "./dist/cjs/index.js",
Expand Down
2 changes: 1 addition & 1 deletion python/bullmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
A background job processor and message queue for Python based on Redis.
"""
__version__ = "2.7.3"
__version__ = "2.7.5"
__author__ = 'Taskforce.sh Inc.'
__credits__ = 'Taskforce.sh Inc.'

Expand Down
2 changes: 1 addition & 1 deletion python/bullmq/redis_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class RedisConnection:

def __init__(self, redisOpts: dict | str | redis.Redis = {}):
self.version = None
retry = Retry(ExponentialBackoff(), 3)
retry = Retry(ExponentialBackoff(cap=20, base=1), 20)
retry_errors = [BusyLoadingError, ConnectionError, TimeoutError]

if isinstance(redisOpts, redis.Redis):
Expand Down
7 changes: 6 additions & 1 deletion python/bullmq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,18 @@ async def runStalledJobsCheck(self):

async def close(self, force: bool = False):
"""
Close the worker
Closes the worker and related redis connections.
This method waits for current jobs to finalize before returning.
"""
self.closing = True
if force:
self.forceClosing = True
self.cancelProcessing()

if not force and len(self.processing) > 0:
await asyncio.wait(self.processing, return_when=asyncio.ALL_COMPLETED)

await self.blockingRedisConnection.close()
await self.redisConnection.close()

Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "bullmq"
version = "2.7.3"
version = "2.7.5"
description='BullMQ for Python'
readme="README.md"
authors = [
Expand Down
59 changes: 29 additions & 30 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -621,35 +621,32 @@ will never work with more accuracy than 1ms. */
if (!this.closing) {
let blockTimeout = this.getBlockTimeout(blockUntil);

blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout
? blockTimeout
: Math.ceil(blockTimeout);

// We restrict the maximum block timeout to 10 second to avoid
// blocking the connection for too long in the case of reconnections
// reference: https://github.com/taskforcesh/bullmq/issues/1658
blockTimeout = Math.min(blockTimeout, maximumBlockTimeout);

// We cannot trust that the blocking connection stays blocking forever
// due to issues in Redis and IORedis, so we will reconnect if we
// don't get a response in the expected time.
const timeout = setTimeout(async () => {
await this.blockingConnection.disconnect();
await this.blockingConnection.reconnect();
}, blockTimeout * 1000 + 1000);

this.updateDelays(); // reset delays to avoid reusing same values in next iteration

// Markers should only be used for un-blocking, so we will handle them in this
// function only.
const result = await bclient.bzpopmin(this.keys.marker, blockTimeout);
clearTimeout(timeout);

if (result) {
const [_key, member, score] = result;

if (member) {
return parseInt(score);
if (blockTimeout > 0) {
blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout
? blockTimeout
: Math.ceil(blockTimeout);

// We cannot trust that the blocking connection stays blocking forever
// due to issues in Redis and IORedis, so we will reconnect if we
// don't get a response in the expected time.
const timeout = setTimeout(async () => {
await this.blockingConnection.disconnect();
await this.blockingConnection.reconnect();
}, blockTimeout * 1000 + 1000);

this.updateDelays(); // reset delays to avoid reusing same values in next iteration

// Markers should only be used for un-blocking, so we will handle them in this
// function only.
const result = await bclient.bzpopmin(this.keys.marker, blockTimeout);
clearTimeout(timeout);

if (result) {
const [_key, member, score] = result;

if (member) {
return parseInt(score);
}
}
}

Expand All @@ -673,7 +670,9 @@ will never work with more accuracy than 1ms. */
if (blockUntil) {
const blockDelay = blockUntil - Date.now();
// when we reach the time to get new jobs
if (blockDelay < this.minimumBlockTimeout * 1000) {
if (blockDelay <= 0) {
return blockDelay;
} else if (blockDelay < this.minimumBlockTimeout * 1000) {
return this.minimumBlockTimeout;
} else {
// We restrict the maximum block timeout to 10 second to avoid
Expand Down
14 changes: 4 additions & 10 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -949,23 +949,17 @@ describe('workers', function () {

describe('when blockUntil is greater than 0', () => {
describe('when blockUntil is lower than date now value', () => {
it('returns minimumBlockTimeout', async () => {
it('returns blockDelay value lower or equal 0', async () => {
const worker = new Worker(queueName, async () => {}, {
connection,
prefix,
autorun: false,
});
await worker.waitUntilReady();

if (isRedisVersionLowerThan(worker.redisVersion, '7.0.8')) {
expect(worker['getBlockTimeout'](Date.now() - 1)).to.be.equal(
0.002,
);
} else {
expect(worker['getBlockTimeout'](Date.now() - 1)).to.be.equal(
0.001,
);
}
expect(
worker['getBlockTimeout'](Date.now() - 1),
).to.be.lessThanOrEqual(0);
await worker.close();
});
});
Expand Down

0 comments on commit dbb7f48

Please sign in to comment.