Skip to content

Commit

Permalink
feat(worker): better handling of concurrency when fetching jobs (#2242)
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Nov 18, 2023
1 parent edc31a6 commit d2e2035
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 132 deletions.
40 changes: 20 additions & 20 deletions .github/workflows/test.yml
Expand Up @@ -116,26 +116,26 @@ jobs:
- run: yarn build
- run: BULLMQ_TEST_PREFIX={b} yarn test

node-upstash:
runs-on: ubuntu-latest
continue-on-error: true

env:
node-version: lts/*
REDIS_HOST: ${{ secrets.REDIS_HOST }}

name: testing node@lts/*, upstash@latest
steps:
- name: Checkout repository
uses: actions/checkout@v3 # v3
- name: Use Node.js ${{ env.node-version }}
uses: actions/setup-node@v3 # v3
with:
node-version: lts/*
cache: 'yarn'
- run: yarn install --ignore-engines --frozen-lockfile --non-interactive
- run: yarn build
- run: yarn test
# node-upstash:
# runs-on: ubuntu-latest
# continue-on-error: true

# env:
# node-version: lts/*
# REDIS_HOST: ${{ secrets.REDIS_HOST }}

# name: testing node@lts/*, upstash@latest
# steps:
# - name: Checkout repository
# uses: actions/checkout@v3 # v3
# - name: Use Node.js ${{ env.node-version }}
# uses: actions/setup-node@v3 # v3
# with:
# node-version: lts/*
# cache: 'yarn'
# - run: yarn install --ignore-engines --frozen-lockfile --non-interactive
# - run: yarn build
# - run: yarn test

python:
runs-on: ubuntu-latest
Expand Down
3 changes: 1 addition & 2 deletions src/classes/scripts.ts
Expand Up @@ -835,8 +835,7 @@ export class Scripts {
}
}

async moveToActive(token: string, jobId?: string) {
const client = await this.queue.client;
async moveToActive(client: RedisClient, token: string, jobId?: string) {
const opts = this.queue.opts as WorkerOptions;

const queueKeys = this.queue.keys;
Expand Down
163 changes: 104 additions & 59 deletions src/classes/worker.ts
Expand Up @@ -397,30 +397,65 @@ export class Worker<

let tokenPostfix = 0;

const client = await this.client;
const bclient = await this.blockingConnection.client;

while (!this.closing) {
let numTotal = asyncFifoQueue.numTotal();
while (
!this.waiting &&
asyncFifoQueue.numTotal() < this.opts.concurrency &&
(!this.limitUntil || asyncFifoQueue.numTotal() == 0)
numTotal < this.opts.concurrency &&
(!this.limitUntil || numTotal == 0)
) {
const token = `${this.id}:${tokenPostfix++}`;
asyncFifoQueue.add(
this.retryIfFailed<void | Job<DataType, ResultType, NameType>>(
() => this.getNextJob(token),
this.opts.runRetryDelay,
),

const fetchedJob = this.retryIfFailed<void | Job<
DataType,
ResultType,
NameType
>>(
() => this._getNextJob(client, bclient, token, { block: true }),
this.opts.runRetryDelay,
);
asyncFifoQueue.add(fetchedJob);

numTotal = asyncFifoQueue.numTotal();

if (this.waiting && numTotal > 1) {
// We have a job waiting but we have others that we could start processing already
break;
}

// We await here so that we fetch jobs in sequence, this is important to avoid unnecessary calls
// to Redis in high concurrency scenarios.
const job = await fetchedJob;

// No more jobs waiting but we have others that could start processing already
if (!job && numTotal > 1) {
break;
}

// If there are potential jobs to be processed and blockUntil is set, we should exit to avoid waiting
// for processing this job.
if (this.blockUntil) {
break;
}
}

const job = await asyncFifoQueue.fetch();
let job: Job<DataType, ResultType, NameType> | void;
// Since there can be undefined jobs in the queue (when a job fails or queue is empty)
// we iterate until we find a job.
while (!job && asyncFifoQueue.numTotal() > 0) {
job = await asyncFifoQueue.fetch();
}

if (job) {
const token = job.token;
asyncFifoQueue.add(
this.retryIfFailed<void | Job<DataType, ResultType, NameType>>(
() =>
this.processJob(
job,
<Job<DataType, ResultType, NameType>>job,
token,
() => asyncFifoQueue.numTotal() <= this.opts.concurrency,
jobsInProgress,
Expand All @@ -444,7 +479,18 @@ export class Worker<
* @param token - worker token to be assigned to retrieved job
* @returns a Job or undefined if no job was available in the queue.
*/
async getNextJob(
async getNextJob(token: string, { block = true }: GetNextJobOptions = {}) {
return this._getNextJob(
await this.client,
await this.blockingConnection.client,
token,
{ block },
);
}

private async _getNextJob(
client: RedisClient,
bclient: RedisClient,
token: string,
{ block = true }: GetNextJobOptions = {},
): Promise<Job<DataType, ResultType, NameType> | undefined> {
Expand All @@ -461,14 +507,10 @@ export class Worker<
}

if (this.drained && block && !this.limitUntil && !this.waiting) {
this.waiting = this.waitForJob(bclient);
try {
this.waiting = this.waitForJob();
try {
const jobId = await this.waiting;
return this.moveToActive(token, jobId);
} finally {
this.waiting = null;
}
const jobId = await this.waiting;
return this.moveToActive(client, token, jobId);
} catch (err) {
// Swallow error if locally paused or closing since we did force a disconnection
if (
Expand All @@ -477,14 +519,16 @@ export class Worker<
) {
throw err;
}
} finally {
this.waiting = null;
}
} else {
if (this.limitUntil) {
this.abortDelayController?.abort();
this.abortDelayController = new AbortController();
await this.delay(this.limitUntil, this.abortDelayController);
}
return this.moveToActive(token);
return this.moveToActive(client, token);
}
}

Expand All @@ -505,22 +549,29 @@ export class Worker<
}

protected async moveToActive(
client: RedisClient,
token: string,
jobId?: string,
): Promise<Job<DataType, ResultType, NameType>> {
// If we get the special delayed job ID, we pick the delay as the next
// block timeout.
if (jobId && jobId.startsWith('0:')) {
this.blockUntil = parseInt(jobId.split(':')[1]) || 0;

// Remove marker from active list.
await client.lrem(this.keys.active, 1, jobId);
if (this.blockUntil > 0) {
return;
}
}
const [jobData, id, limitUntil, delayUntil] =
await this.scripts.moveToActive(token, jobId);
return this.nextJobFromJobData(jobData, id, limitUntil, delayUntil, token);
await this.scripts.moveToActive(client, token, jobId);
this.updateDelays(limitUntil, delayUntil);

return this.nextJobFromJobData(jobData, id, token);
}

private async waitForJob() {
// I am not sure returning here this quick is a good idea, the main
// loop could stay looping at a very high speed and consume all CPU time.
private async waitForJob(bclient: RedisClient): Promise<string> {
if (this.paused) {
return;
}
Expand All @@ -529,30 +580,35 @@ export class Worker<
const opts: WorkerOptions = <WorkerOptions>this.opts;

if (!this.closing) {
const client = await this.blockingConnection.client;

let blockTimeout = Math.max(
this.blockUntil
? (this.blockUntil - Date.now()) / 1000
: opts.drainDelay,
0.01,
0,
);

// Only Redis v6.0.0 and above supports doubles as block time
blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout
? blockTimeout
: Math.ceil(blockTimeout);
let jobId;

// We restrict the maximum block timeout to 10 second to avoid
// blocking the connection for too long in the case of reconnections
// reference: https://github.com/taskforcesh/bullmq/issues/1658
blockTimeout = Math.min(blockTimeout, maximumBlockTimeout);
// Blocking for less than 50ms is useless.
if (blockTimeout > 0.05) {
blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout
? blockTimeout
: Math.ceil(blockTimeout);

const jobId = await client.brpoplpush(
this.keys.wait,
this.keys.active,
blockTimeout,
);
// We restrict the maximum block timeout to 10 second to avoid
// blocking the connection for too long in the case of reconnections
// reference: https://github.com/taskforcesh/bullmq/issues/1658
blockTimeout = Math.min(blockTimeout, maximumBlockTimeout);

jobId = await bclient.brpoplpush(
this.keys.wait,
this.keys.active,
blockTimeout,
);
} else {
jobId = await bclient.rpoplpush(this.keys.wait, this.keys.active);
}
this.blockUntil = 0;
return jobId;
}
} catch (error) {
Expand All @@ -578,29 +634,22 @@ export class Worker<
await delay(milliseconds || DELAY_TIME_1, abortController);
}

private updateDelays(limitUntil = 0, delayUntil = 0) {
this.limitUntil = Math.max(limitUntil, 0) || 0;
this.blockUntil = Math.max(delayUntil, 0) || 0;
}

protected async nextJobFromJobData(
jobData?: JobJsonRaw,
jobId?: string,
limitUntil?: number,
delayUntil?: number,
token?: string,
): Promise<Job<DataType, ResultType, NameType>> {
if (!jobData) {
if (!this.drained) {
this.emit('drained');
this.drained = true;
this.blockUntil = 0;
}
}

// Update limitUntil and delayUntil
// TODO: Refactor out of this function
this.limitUntil = Math.max(limitUntil, 0) || 0;
if (delayUntil) {
this.blockUntil = Math.max(delayUntil, 0) || 0;
}

if (jobData) {
} else {
this.drained = false;
const job = this.createJob(jobData, jobId);
job.token = token;
Expand Down Expand Up @@ -631,13 +680,9 @@ export class Worker<
);
this.emit('completed', job, result, 'active');
const [jobData, jobId, limitUntil, delayUntil] = completed || [];
return this.nextJobFromJobData(
jobData,
jobId,
limitUntil,
delayUntil,
token,
);
this.updateDelays(limitUntil, delayUntil);

return this.nextJobFromJobData(jobData, jobId, token);
}
};

Expand Down
7 changes: 5 additions & 2 deletions src/commands/addJob-9.lua
Expand Up @@ -136,14 +136,17 @@ if waitChildrenKey ~= nil then
rcall("ZADD", waitChildrenKey, timestamp, jobId)
rcall("XADD", KEYS[8], "MAXLEN", "~", maxEvents, "*", "event", "waiting-children", "jobId", jobId)
elseif (delayedTimestamp ~= 0) then
local delayedKey = KEYS[5]
local score = delayedTimestamp * 0x1000 + bit.band(jobCounter, 0xfff)
rcall("ZADD", KEYS[5], score, jobId)

rcall("ZADD", delayedKey, score, jobId)
rcall("XADD", KEYS[8], "MAXLEN", "~", maxEvents, "*", "event", "delayed", "jobId", jobId,
"delay", delayedTimestamp)

-- If wait list is empty, and this delayed job is the next one to be processed,
-- then we need to signal the workers by adding a dummy job (jobId 0:delay) to the wait list.
local target = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])
addDelayMarkerIfNeeded(target, KEYS[5])
addDelayMarkerIfNeeded(target, delayedKey)
else
local target, paused = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])

Expand Down
16 changes: 14 additions & 2 deletions src/commands/includes/addDelayMarkerIfNeeded.lua
Expand Up @@ -6,10 +6,22 @@
--- @include "getNextDelayedTimestamp"

local function addDelayMarkerIfNeeded(targetKey, delayedKey)
if rcall("LLEN", targetKey) == 0 then
local waitLen = rcall("LLEN", targetKey)
if waitLen <= 1 then
local nextTimestamp = getNextDelayedTimestamp(delayedKey)
if nextTimestamp ~= nil then
rcall("LPUSH", targetKey, "0:" .. nextTimestamp)
-- Check if there is already a marker with older timestamp
-- if there is, we need to replace it.
if waitLen == 1 then
local marker = rcall("LINDEX", targetKey, 0)
local oldTimestamp = tonumber(marker:sub(3))
if oldTimestamp and oldTimestamp > nextTimestamp then
rcall("LSET", targetKey, 0, "0:" .. nextTimestamp)
end
else
-- if there is no marker, then we need to add one
rcall("LPUSH", targetKey, "0:" .. nextTimestamp)
end
end
end
end
5 changes: 3 additions & 2 deletions tests/test_clean.ts
Expand Up @@ -499,7 +499,7 @@ describe('Cleaner', () => {
async (job, token) => {
if (job.name === 'child') {
await delay(100);
throw new Error('error');
throw new Error('forced child error');
}
let step = job.data.step;
while (step !== Step.Finish) {
Expand All @@ -513,6 +513,7 @@ describe('Cleaner', () => {
id: job.id!,
queue: job.queueQualifiedName,
},
removeDependencyOnFailure: true,
},
);
await delay(1000);
Expand Down Expand Up @@ -562,7 +563,7 @@ describe('Cleaner', () => {
);

await new Promise<void>(resolve => {
worker.on('failed', async () => {
worker.on('failed', async job => {
await queue.clean(0, 0, 'failed');
resolve();
});
Expand Down

0 comments on commit d2e2035

Please sign in to comment.