diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d73cda5190..b3e1ded7fa 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -116,26 +116,26 @@ jobs: - run: yarn build - run: BULLMQ_TEST_PREFIX={b} yarn test - node-upstash: - runs-on: ubuntu-latest - continue-on-error: true - - env: - node-version: lts/* - REDIS_HOST: ${{ secrets.REDIS_HOST }} - - name: testing node@lts/*, upstash@latest - steps: - - name: Checkout repository - uses: actions/checkout@v3 # v3 - - name: Use Node.js ${{ env.node-version }} - uses: actions/setup-node@v3 # v3 - with: - node-version: lts/* - cache: 'yarn' - - run: yarn install --ignore-engines --frozen-lockfile --non-interactive - - run: yarn build - - run: yarn test + # node-upstash: + # runs-on: ubuntu-latest + # continue-on-error: true + + # env: + # node-version: lts/* + # REDIS_HOST: ${{ secrets.REDIS_HOST }} + + # name: testing node@lts/*, upstash@latest + # steps: + # - name: Checkout repository + # uses: actions/checkout@v3 # v3 + # - name: Use Node.js ${{ env.node-version }} + # uses: actions/setup-node@v3 # v3 + # with: + # node-version: lts/* + # cache: 'yarn' + # - run: yarn install --ignore-engines --frozen-lockfile --non-interactive + # - run: yarn build + # - run: yarn test python: runs-on: ubuntu-latest diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 40c30e5b6b..154c6e3fca 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -835,8 +835,7 @@ export class Scripts { } } - async moveToActive(token: string, jobId?: string) { - const client = await this.queue.client; + async moveToActive(client: RedisClient, token: string, jobId?: string) { const opts = this.queue.opts as WorkerOptions; const queueKeys = this.queue.keys; diff --git a/src/classes/worker.ts b/src/classes/worker.ts index afe0464df6..add6bb13ab 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -397,22 +397,57 @@ export class Worker< let tokenPostfix = 0; + const client = await this.client; + const bclient = await this.blockingConnection.client; + while (!this.closing) { + let numTotal = asyncFifoQueue.numTotal(); while ( !this.waiting && - asyncFifoQueue.numTotal() < this.opts.concurrency && - (!this.limitUntil || asyncFifoQueue.numTotal() == 0) + numTotal < this.opts.concurrency && + (!this.limitUntil || numTotal == 0) ) { const token = `${this.id}:${tokenPostfix++}`; - asyncFifoQueue.add( - this.retryIfFailed>( - () => this.getNextJob(token), - this.opts.runRetryDelay, - ), + + const fetchedJob = this.retryIfFailed>( + () => this._getNextJob(client, bclient, token, { block: true }), + this.opts.runRetryDelay, ); + asyncFifoQueue.add(fetchedJob); + + numTotal = asyncFifoQueue.numTotal(); + + if (this.waiting && numTotal > 1) { + // We have a job waiting but we have others that we could start processing already + break; + } + + // We await here so that we fetch jobs in sequence, this is important to avoid unnecessary calls + // to Redis in high concurrency scenarios. + const job = await fetchedJob; + + // No more jobs waiting but we have others that could start processing already + if (!job && numTotal > 1) { + break; + } + + // If there are potential jobs to be processed and blockUntil is set, we should exit to avoid waiting + // for processing this job. + if (this.blockUntil) { + break; + } } - const job = await asyncFifoQueue.fetch(); + let job: Job | void; + // Since there can be undefined jobs in the queue (when a job fails or queue is empty) + // we iterate until we find a job. + while (!job && asyncFifoQueue.numTotal() > 0) { + job = await asyncFifoQueue.fetch(); + } if (job) { const token = job.token; @@ -420,7 +455,7 @@ export class Worker< this.retryIfFailed>( () => this.processJob( - job, + >job, token, () => asyncFifoQueue.numTotal() <= this.opts.concurrency, jobsInProgress, @@ -444,7 +479,18 @@ export class Worker< * @param token - worker token to be assigned to retrieved job * @returns a Job or undefined if no job was available in the queue. */ - async getNextJob( + async getNextJob(token: string, { block = true }: GetNextJobOptions = {}) { + return this._getNextJob( + await this.client, + await this.blockingConnection.client, + token, + { block }, + ); + } + + private async _getNextJob( + client: RedisClient, + bclient: RedisClient, token: string, { block = true }: GetNextJobOptions = {}, ): Promise | undefined> { @@ -461,14 +507,10 @@ export class Worker< } if (this.drained && block && !this.limitUntil && !this.waiting) { + this.waiting = this.waitForJob(bclient); try { - this.waiting = this.waitForJob(); - try { - const jobId = await this.waiting; - return this.moveToActive(token, jobId); - } finally { - this.waiting = null; - } + const jobId = await this.waiting; + return this.moveToActive(client, token, jobId); } catch (err) { // Swallow error if locally paused or closing since we did force a disconnection if ( @@ -477,6 +519,8 @@ export class Worker< ) { throw err; } + } finally { + this.waiting = null; } } else { if (this.limitUntil) { @@ -484,7 +528,7 @@ export class Worker< this.abortDelayController = new AbortController(); await this.delay(this.limitUntil, this.abortDelayController); } - return this.moveToActive(token); + return this.moveToActive(client, token); } } @@ -505,6 +549,7 @@ export class Worker< } protected async moveToActive( + client: RedisClient, token: string, jobId?: string, ): Promise> { @@ -512,15 +557,21 @@ export class Worker< // block timeout. if (jobId && jobId.startsWith('0:')) { this.blockUntil = parseInt(jobId.split(':')[1]) || 0; + + // Remove marker from active list. + await client.lrem(this.keys.active, 1, jobId); + if (this.blockUntil > 0) { + return; + } } const [jobData, id, limitUntil, delayUntil] = - await this.scripts.moveToActive(token, jobId); - return this.nextJobFromJobData(jobData, id, limitUntil, delayUntil, token); + await this.scripts.moveToActive(client, token, jobId); + this.updateDelays(limitUntil, delayUntil); + + return this.nextJobFromJobData(jobData, id, token); } - private async waitForJob() { - // I am not sure returning here this quick is a good idea, the main - // loop could stay looping at a very high speed and consume all CPU time. + private async waitForJob(bclient: RedisClient): Promise { if (this.paused) { return; } @@ -529,30 +580,35 @@ export class Worker< const opts: WorkerOptions = this.opts; if (!this.closing) { - const client = await this.blockingConnection.client; - let blockTimeout = Math.max( this.blockUntil ? (this.blockUntil - Date.now()) / 1000 : opts.drainDelay, - 0.01, + 0, ); - // Only Redis v6.0.0 and above supports doubles as block time - blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout - ? blockTimeout - : Math.ceil(blockTimeout); + let jobId; - // We restrict the maximum block timeout to 10 second to avoid - // blocking the connection for too long in the case of reconnections - // reference: https://github.com/taskforcesh/bullmq/issues/1658 - blockTimeout = Math.min(blockTimeout, maximumBlockTimeout); + // Blocking for less than 50ms is useless. + if (blockTimeout > 0.05) { + blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout + ? blockTimeout + : Math.ceil(blockTimeout); - const jobId = await client.brpoplpush( - this.keys.wait, - this.keys.active, - blockTimeout, - ); + // We restrict the maximum block timeout to 10 second to avoid + // blocking the connection for too long in the case of reconnections + // reference: https://github.com/taskforcesh/bullmq/issues/1658 + blockTimeout = Math.min(blockTimeout, maximumBlockTimeout); + + jobId = await bclient.brpoplpush( + this.keys.wait, + this.keys.active, + blockTimeout, + ); + } else { + jobId = await bclient.rpoplpush(this.keys.wait, this.keys.active); + } + this.blockUntil = 0; return jobId; } } catch (error) { @@ -578,29 +634,22 @@ export class Worker< await delay(milliseconds || DELAY_TIME_1, abortController); } + private updateDelays(limitUntil = 0, delayUntil = 0) { + this.limitUntil = Math.max(limitUntil, 0) || 0; + this.blockUntil = Math.max(delayUntil, 0) || 0; + } + protected async nextJobFromJobData( jobData?: JobJsonRaw, jobId?: string, - limitUntil?: number, - delayUntil?: number, token?: string, ): Promise> { if (!jobData) { if (!this.drained) { this.emit('drained'); this.drained = true; - this.blockUntil = 0; } - } - - // Update limitUntil and delayUntil - // TODO: Refactor out of this function - this.limitUntil = Math.max(limitUntil, 0) || 0; - if (delayUntil) { - this.blockUntil = Math.max(delayUntil, 0) || 0; - } - - if (jobData) { + } else { this.drained = false; const job = this.createJob(jobData, jobId); job.token = token; @@ -631,13 +680,9 @@ export class Worker< ); this.emit('completed', job, result, 'active'); const [jobData, jobId, limitUntil, delayUntil] = completed || []; - return this.nextJobFromJobData( - jobData, - jobId, - limitUntil, - delayUntil, - token, - ); + this.updateDelays(limitUntil, delayUntil); + + return this.nextJobFromJobData(jobData, jobId, token); } }; diff --git a/src/commands/addJob-9.lua b/src/commands/addJob-9.lua index bbb8bf64ea..22089f98e7 100644 --- a/src/commands/addJob-9.lua +++ b/src/commands/addJob-9.lua @@ -136,14 +136,17 @@ if waitChildrenKey ~= nil then rcall("ZADD", waitChildrenKey, timestamp, jobId) rcall("XADD", KEYS[8], "MAXLEN", "~", maxEvents, "*", "event", "waiting-children", "jobId", jobId) elseif (delayedTimestamp ~= 0) then + local delayedKey = KEYS[5] local score = delayedTimestamp * 0x1000 + bit.band(jobCounter, 0xfff) - rcall("ZADD", KEYS[5], score, jobId) + + rcall("ZADD", delayedKey, score, jobId) rcall("XADD", KEYS[8], "MAXLEN", "~", maxEvents, "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp) + -- If wait list is empty, and this delayed job is the next one to be processed, -- then we need to signal the workers by adding a dummy job (jobId 0:delay) to the wait list. local target = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2]) - addDelayMarkerIfNeeded(target, KEYS[5]) + addDelayMarkerIfNeeded(target, delayedKey) else local target, paused = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2]) diff --git a/src/commands/includes/addDelayMarkerIfNeeded.lua b/src/commands/includes/addDelayMarkerIfNeeded.lua index 1abbe05ae3..af4e829521 100644 --- a/src/commands/includes/addDelayMarkerIfNeeded.lua +++ b/src/commands/includes/addDelayMarkerIfNeeded.lua @@ -6,10 +6,22 @@ --- @include "getNextDelayedTimestamp" local function addDelayMarkerIfNeeded(targetKey, delayedKey) - if rcall("LLEN", targetKey) == 0 then + local waitLen = rcall("LLEN", targetKey) + if waitLen <= 1 then local nextTimestamp = getNextDelayedTimestamp(delayedKey) if nextTimestamp ~= nil then - rcall("LPUSH", targetKey, "0:" .. nextTimestamp) + -- Check if there is already a marker with older timestamp + -- if there is, we need to replace it. + if waitLen == 1 then + local marker = rcall("LINDEX", targetKey, 0) + local oldTimestamp = tonumber(marker:sub(3)) + if oldTimestamp and oldTimestamp > nextTimestamp then + rcall("LSET", targetKey, 0, "0:" .. nextTimestamp) + end + else + -- if there is no marker, then we need to add one + rcall("LPUSH", targetKey, "0:" .. nextTimestamp) + end end end end diff --git a/tests/test_clean.ts b/tests/test_clean.ts index f6a720e3a3..90274c557d 100644 --- a/tests/test_clean.ts +++ b/tests/test_clean.ts @@ -499,7 +499,7 @@ describe('Cleaner', () => { async (job, token) => { if (job.name === 'child') { await delay(100); - throw new Error('error'); + throw new Error('forced child error'); } let step = job.data.step; while (step !== Step.Finish) { @@ -513,6 +513,7 @@ describe('Cleaner', () => { id: job.id!, queue: job.queueQualifiedName, }, + removeDependencyOnFailure: true, }, ); await delay(1000); @@ -562,7 +563,7 @@ describe('Cleaner', () => { ); await new Promise(resolve => { - worker.on('failed', async () => { + worker.on('failed', async job => { await queue.clean(0, 0, 'failed'); resolve(); }); diff --git a/tests/test_delay.ts b/tests/test_delay.ts index 1155cd5f20..bbad901a86 100644 --- a/tests/test_delay.ts +++ b/tests/test_delay.ts @@ -255,7 +255,7 @@ describe('Delayed jobs', function () { reject(err); } - await delay(500); + await delay(100); }; const processing = new Promise((resolve, reject) => { @@ -292,8 +292,9 @@ describe('Delayed jobs', function () { await worker2.close(); }); + // Add test where delays overlap so that we can see that indeed the jobs are processed concurrently. it('should process delayed jobs concurrently respecting delay and concurrency', async function () { - const delay = 250; + const delay_ = 250; const concurrency = 100; const margin = 1.5; let numJobs = 10; @@ -308,11 +309,11 @@ describe('Delayed jobs', function () { expect( delayed, 'waited at least delay time', - ).to.be.greaterThanOrEqual(delay); + ).to.be.greaterThanOrEqual(delay_); expect( delayed, 'waited less than delay time and margin', - ).to.be.lessThan(delay * margin); + ).to.be.lessThan(delay_ * margin); } catch (err) { console.error(err); reject(err); @@ -325,11 +326,13 @@ describe('Delayed jobs', function () { ); }); + let index = 1; while (numJobs) { numJobs -= 1; - await queue.add('my-queue', { foo: 'bar' }, { delay }); + await queue.add('my-queue', { foo: 'bar', index }, { delay: delay_ }); + index += 1; if (numJobs) { - await new Promise(resolve => setTimeout(resolve, 300)); + await delay(1000); } } @@ -339,7 +342,7 @@ describe('Delayed jobs', function () { describe('when failed jobs are retried and moved to delayed', function () { it('processes jobs without getting stuck', async () => { - const countJobs = 32; + const countJobs = 2; const concurrency = 50; const processedJobs: { data: any }[] = []; diff --git a/tests/test_job.ts b/tests/test_job.ts index a7f63cd93f..232de72f21 100644 --- a/tests/test_job.ts +++ b/tests/test_job.ts @@ -336,10 +336,13 @@ describe('Job', function () { ); const delayedJobs = await queue.addBulk(jobsDataWithDelay); + const startTime = Date.now(); // Remove all jobs await Promise.all(delayedJobs.map(job => job.remove())); await Promise.all(waitingJobs.map(job => job.remove())); + expect(Date.now() - startTime).to.be.lessThan(4000); + const countJobs = await queue.getJobCountByTypes('waiting', 'delayed'); expect(countJobs).to.be.equal(0); }); diff --git a/tests/test_rate_limiter.ts b/tests/test_rate_limiter.ts index 1239599532..7396bd1ebb 100644 --- a/tests/test_rate_limiter.ts +++ b/tests/test_rate_limiter.ts @@ -659,6 +659,7 @@ describe('Rate Limiter', function () { describe('when there are more added jobs than max limiter', () => { it('processes jobs as max limiter from the beginning', async function () { + const numJobs = 400; this.timeout(5000); let parallelJobs = 0; @@ -685,10 +686,10 @@ describe('Rate Limiter', function () { }); const allCompleted = new Promise(resolve => { - worker.on('completed', after(400, resolve)); + worker.on('completed', after(numJobs, resolve)); }); - const jobs = Array(400) + const jobs = Array(numJobs) .fill('') .map((_, index) => { return { @@ -707,6 +708,7 @@ describe('Rate Limiter', function () { describe('when rate limit is max 1', () => { it('processes jobs as max limiter from the beginning', async function () { + const numJobs = 5; this.timeout(5000); let parallelJobs = 0; @@ -733,10 +735,10 @@ describe('Rate Limiter', function () { }); const allCompleted = new Promise(resolve => { - worker.on('completed', after(5, resolve)); + worker.on('completed', after(numJobs, resolve)); }); - const jobs = Array(5) + const jobs = Array(numJobs) .fill('') .map((_, index) => { return { @@ -784,9 +786,9 @@ describe('Rate Limiter', function () { async job => { const { priority } = job.opts; - priorityBuckets[priority] = priorityBuckets[priority] - 1; + priorityBuckets[priority!] = priorityBuckets[priority!] - 1; - for (let p = 1; p < priority; p++) { + for (let p = 1; p < priority!; p++) { if (priorityBuckets[p] > 0) { const before = JSON.stringify(priorityBucketsBefore); const after = JSON.stringify(priorityBuckets); diff --git a/tests/test_worker.ts b/tests/test_worker.ts index d45a547160..60ba98e755 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -1757,44 +1757,47 @@ describe('workers', function () { let nbProcessing = 0; let pendingMessageToProcess = 8; let wait = 10; + let worker; - const worker = new Worker( - queueName, - async () => { - try { - nbProcessing++; - expect(nbProcessing).to.be.lessThan(5); + const processing = new Promise((resolve, reject) => { + worker = new Worker( + queueName, + async () => { + try { + nbProcessing++; + expect(nbProcessing).to.be.lessThan(5); - wait += 100; + wait += 100; - await delay(wait); - //We should not have 4 more in parallel. - //At the end, due to empty list, no new job will process, so nbProcessing will decrease. - expect(nbProcessing).to.be.eql( - Math.min(pendingMessageToProcess, 4), - ); - pendingMessageToProcess--; - nbProcessing--; - } catch (err) { - console.error(err); - } - }, - { - connection, - prefix, - concurrency: 4, - }, - ); - await worker.waitUntilReady(); + await delay(wait); - const waiting = new Promise((resolve, reject) => { - worker.on('completed', after(8, resolve)); - worker.on('failed', reject); + // We should not have 4 more in parallel. + // At the end, due to empty list, no new job will process, so nbProcessing will decrease. + expect(nbProcessing).to.be.eql( + Math.min(pendingMessageToProcess, 4), + ); + pendingMessageToProcess--; + nbProcessing--; + + if (pendingMessageToProcess == 0) { + resolve(); + } + } catch (err) { + reject(err); + } + }, + { + connection, + prefix, + concurrency: 4, + }, + ); }); + await worker.waitUntilReady(); await Promise.all(times(8, () => queue.add('test', {}))); - await waiting; + await processing; await worker.close(); }); @@ -2577,13 +2580,11 @@ describe('workers', function () { setTimeout(() => { timeoutReached = true; }, timeout); - console.log(step, timeoutReached, job.data.timeout); while (step !== Step.Finish) { switch (step) { case Step.Initial: { await delay(1000); if (timeoutReached) { - console.log('reaached1'); throw new Error('Timeout'); } await job.updateData({