From 4c4559b3c678313b3727c9781a6d3f963bcfda4e Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Sun, 20 Aug 2023 17:18:29 -0600 Subject: [PATCH] feat(sandbox): emulate moveToDelayed method (#2122) ref #2118 --- src/classes/child-processor.ts | 16 ++++-- src/classes/main-base.ts | 2 +- src/classes/sandbox.ts | 6 ++- src/classes/worker.ts | 2 +- src/interfaces/parent-command.ts | 1 + src/interfaces/sandboxed-job.ts | 3 +- .../fixture_processor_move_to_delayed.js | 21 ++++++++ ...te.js => fixture_processor_update_data.js} | 0 .../fixture_processor_with_extra_param.js | 2 +- tests/test_sandboxed_process.ts | 49 ++++++++++++++++++- 10 files changed, 92 insertions(+), 10 deletions(-) create mode 100644 tests/fixtures/fixture_processor_move_to_delayed.js rename tests/fixtures/{fixture_processor_update.js => fixture_processor_update_data.js} (100%) diff --git a/src/classes/child-processor.ts b/src/classes/child-processor.ts index 39c960c89e..97ca5dc94a 100644 --- a/src/classes/child-processor.ts +++ b/src/classes/child-processor.ts @@ -59,7 +59,7 @@ export class ChildProcessor { }); } - public async start(jobJson: JobJson): Promise { + public async start(jobJson: JobJson, token?: string): Promise { if (this.status !== ChildStatus.Idle) { return this.send({ cmd: ParentCommand.Error, @@ -70,7 +70,8 @@ export class ChildProcessor { this.currentJobPromise = (async () => { try { const job = wrapJob(jobJson, this.send); - const result = (await this.processor(job)) || {}; + //console.log('el token', token, job.id) + const result = (await this.processor(job, token)) || {}; await this.send({ cmd: ParentCommand.Completed, value: result, @@ -144,7 +145,16 @@ function wrapJob( }); }, /* - * Emulate the real job `update` function. + * Emulate the real job `moveToDelayed` function. + */ + moveToDelayed: async (timestamp: number, token?: string) => { + send({ + cmd: ParentCommand.MoveToDelayed, + value: { timestamp, token }, + }); + }, + /* + * Emulate the real job `updateData` function. */ updateData: async (data: any) => { send({ diff --git a/src/classes/main-base.ts b/src/classes/main-base.ts index f8441c1380..c83dc59b21 100644 --- a/src/classes/main-base.ts +++ b/src/classes/main-base.ts @@ -20,7 +20,7 @@ export default ( await childProcessor.init(msg.value); break; case ChildCommand.Start: - await childProcessor.start(msg.job); + await childProcessor.start(msg.job, msg?.token); break; case ChildCommand.Stop: break; diff --git a/src/classes/sandbox.ts b/src/classes/sandbox.ts index 93c6045630..b6c1af4af5 100644 --- a/src/classes/sandbox.ts +++ b/src/classes/sandbox.ts @@ -6,7 +6,7 @@ const sandbox = ( processFile: any, childPool: ChildPool, ) => { - return async function process(job: Job): Promise { + return async function process(job: Job, token?: string): Promise { const child = await childPool.retain(processFile); let msgHandler: any; let exitHandler: any; @@ -14,6 +14,7 @@ const sandbox = ( await child.send({ cmd: ChildCommand.Start, job: job.asJSONSandbox(), + token, }); const done: Promise = new Promise((resolve, reject) => { @@ -35,6 +36,9 @@ const sandbox = ( case ParentCommand.Log: await job.log(msg.value); break; + case ParentCommand.MoveToDelayed: + await job.moveToDelayed(msg.value?.timestamp, msg.value?.token); + break; case ParentCommand.Update: await job.updateData(msg.value); break; diff --git a/src/classes/worker.ts b/src/classes/worker.ts index 1f1a50cce2..1a11671c5a 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -651,7 +651,7 @@ export class Worker< if ( err instanceof DelayedError || - err.name == 'DelayedError' || + err.message == 'DelayedError' || err instanceof WaitingChildrenError || err.name == 'WaitingChildrenError' ) { diff --git a/src/interfaces/parent-command.ts b/src/interfaces/parent-command.ts index 04916c2800..d87a4e24f0 100644 --- a/src/interfaces/parent-command.ts +++ b/src/interfaces/parent-command.ts @@ -5,6 +5,7 @@ export enum ParentCommand { InitFailed, InitCompleted, Log, + MoveToDelayed, Progress, Update, } diff --git a/src/interfaces/sandboxed-job.ts b/src/interfaces/sandboxed-job.ts index 27c05ef2bc..b61b937b28 100644 --- a/src/interfaces/sandboxed-job.ts +++ b/src/interfaces/sandboxed-job.ts @@ -8,8 +8,9 @@ export interface SandboxedJob extends Omit { data: T; opts: JobsOptions; - updateProgress: (value: object | number) => Promise; + moveToDelayed: (timestamp: number, token?: string) => Promise; log: (row: any) => void; updateData: (data: any) => Promise; + updateProgress: (value: object | number) => Promise; returnValue: R; } diff --git a/tests/fixtures/fixture_processor_move_to_delayed.js b/tests/fixtures/fixture_processor_move_to_delayed.js new file mode 100644 index 0000000000..3a385f8c6d --- /dev/null +++ b/tests/fixtures/fixture_processor_move_to_delayed.js @@ -0,0 +1,21 @@ +/** + * A processor file to be used in tests. + * + */ +'use strict'; + +const { DelayedError } = require('../../dist/cjs/classes/delayed-error'); +const delay = require('./delay'); + +module.exports = function (job, token) { + if (job.attemptsMade == 1) { + return delay(250) + .then(() => { + job.moveToDelayed(2500, token); + return delay(500); + }) + .then(() => { + throw new DelayedError(); + }); + } +}; diff --git a/tests/fixtures/fixture_processor_update.js b/tests/fixtures/fixture_processor_update_data.js similarity index 100% rename from tests/fixtures/fixture_processor_update.js rename to tests/fixtures/fixture_processor_update_data.js diff --git a/tests/fixtures/fixture_processor_with_extra_param.js b/tests/fixtures/fixture_processor_with_extra_param.js index d94f5970c8..9a3c426145 100644 --- a/tests/fixtures/fixture_processor_with_extra_param.js +++ b/tests/fixtures/fixture_processor_with_extra_param.js @@ -6,7 +6,7 @@ const delay = require('./delay'); -module.exports = function (job, extraParam) { +module.exports = function (job, token, extraParam) { return delay(500).then(() => { return 42; }); diff --git a/tests/test_sandboxed_process.ts b/tests/test_sandboxed_process.ts index 3a964c89ab..5829e1a4c7 100644 --- a/tests/test_sandboxed_process.ts +++ b/tests/test_sandboxed_process.ts @@ -73,7 +73,7 @@ function sandboxProcessTests( await worker.close(); }); - describe('when processor has more than 1 param', () => { + describe('when processor has more than 2 params', () => { it('should ignore extra params, process and complete', async () => { const processFile = __dirname + '/fixtures/fixture_processor_with_extra_param.js'; @@ -429,7 +429,8 @@ function sandboxProcessTests( }); it('should process and update data', async () => { - const processFile = __dirname + '/fixtures/fixture_processor_update.js'; + const processFile = + __dirname + '/fixtures/fixture_processor_update_data.js'; const worker = new Worker(queueName, processFile, { connection, @@ -459,6 +460,50 @@ function sandboxProcessTests( await worker.close(); }); + it('should process and move to delayed', async () => { + const processFile = + __dirname + '/fixtures/fixture_processor_move_to_delayed.js'; + + const worker = new Worker(queueName, processFile, { + connection, + drainDelay: 1, + useWorkerThreads, + }); + + const delaying = new Promise((resolve, reject) => { + queueEvents.on('delayed', async ({ delay }) => { + try { + expect(Number(delay)).to.be.greaterThanOrEqual(2500); + expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf( + 1, + ); + expect(worker['childPool'].getAllFree()).to.have.lengthOf(0); + resolve(); + } catch (err) { + reject(err); + } + }); + }); + + const completing = new Promise((resolve, reject) => { + worker.on('completed', async (job: Job) => { + expect(job.data.bar).to.be.equal('foo'); + resolve(); + }); + }); + + const job = await queue.add('test', { bar: 'foo' }); + + await delaying; + + const state = await queue.getJobState(job.id!); + + expect(state).to.be.equal('delayed'); + + await completing; + await worker.close(); + }); + describe('when env variables are provided', () => { it('shares env variables', async () => { const processFile = __dirname + '/fixtures/fixture_processor_env.js';