Skip to content

Commit

Permalink
feat(sandbox): emulate moveToDelayed method (#2122) ref #2118
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Aug 20, 2023
1 parent 9253112 commit 4c4559b
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 10 deletions.
16 changes: 13 additions & 3 deletions src/classes/child-processor.ts
Expand Up @@ -59,7 +59,7 @@ export class ChildProcessor {
});
}

public async start(jobJson: JobJson): Promise<void> {
public async start(jobJson: JobJson, token?: string): Promise<void> {
if (this.status !== ChildStatus.Idle) {
return this.send({
cmd: ParentCommand.Error,
Expand All @@ -70,7 +70,8 @@ export class ChildProcessor {
this.currentJobPromise = (async () => {
try {
const job = wrapJob(jobJson, this.send);
const result = (await this.processor(job)) || {};
//console.log('el token', token, job.id)
const result = (await this.processor(job, token)) || {};
await this.send({
cmd: ParentCommand.Completed,
value: result,
Expand Down Expand Up @@ -144,7 +145,16 @@ function wrapJob(
});
},
/*
* Emulate the real job `update` function.
* Emulate the real job `moveToDelayed` function.
*/
moveToDelayed: async (timestamp: number, token?: string) => {
send({
cmd: ParentCommand.MoveToDelayed,
value: { timestamp, token },
});
},
/*
* Emulate the real job `updateData` function.
*/
updateData: async (data: any) => {
send({
Expand Down
2 changes: 1 addition & 1 deletion src/classes/main-base.ts
Expand Up @@ -20,7 +20,7 @@ export default (
await childProcessor.init(msg.value);
break;
case ChildCommand.Start:
await childProcessor.start(msg.job);
await childProcessor.start(msg.job, msg?.token);
break;
case ChildCommand.Stop:
break;
Expand Down
6 changes: 5 additions & 1 deletion src/classes/sandbox.ts
Expand Up @@ -6,14 +6,15 @@ const sandbox = <T, R, N extends string>(
processFile: any,
childPool: ChildPool,
) => {
return async function process(job: Job<T, R, N>): Promise<R> {
return async function process(job: Job<T, R, N>, token?: string): Promise<R> {
const child = await childPool.retain(processFile);
let msgHandler: any;
let exitHandler: any;

await child.send({
cmd: ChildCommand.Start,
job: job.asJSONSandbox(),
token,
});

const done: Promise<R> = new Promise((resolve, reject) => {
Expand All @@ -35,6 +36,9 @@ const sandbox = <T, R, N extends string>(
case ParentCommand.Log:
await job.log(msg.value);
break;
case ParentCommand.MoveToDelayed:
await job.moveToDelayed(msg.value?.timestamp, msg.value?.token);
break;
case ParentCommand.Update:
await job.updateData(msg.value);
break;
Expand Down
2 changes: 1 addition & 1 deletion src/classes/worker.ts
Expand Up @@ -651,7 +651,7 @@ export class Worker<

if (
err instanceof DelayedError ||
err.name == 'DelayedError' ||
err.message == 'DelayedError' ||
err instanceof WaitingChildrenError ||
err.name == 'WaitingChildrenError'
) {
Expand Down
1 change: 1 addition & 0 deletions src/interfaces/parent-command.ts
Expand Up @@ -5,6 +5,7 @@ export enum ParentCommand {
InitFailed,
InitCompleted,
Log,
MoveToDelayed,
Progress,
Update,
}
3 changes: 2 additions & 1 deletion src/interfaces/sandboxed-job.ts
Expand Up @@ -8,8 +8,9 @@ export interface SandboxedJob<T = any, R = any>
extends Omit<JobJson, 'data' | 'opts' | 'progress' | 'returnValue'> {
data: T;
opts: JobsOptions;
updateProgress: (value: object | number) => Promise<void>;
moveToDelayed: (timestamp: number, token?: string) => Promise<void>;
log: (row: any) => void;
updateData: (data: any) => Promise<void>;
updateProgress: (value: object | number) => Promise<void>;
returnValue: R;
}
21 changes: 21 additions & 0 deletions tests/fixtures/fixture_processor_move_to_delayed.js
@@ -0,0 +1,21 @@
/**
* A processor file to be used in tests.
*
*/
'use strict';

const { DelayedError } = require('../../dist/cjs/classes/delayed-error');
const delay = require('./delay');

module.exports = function (job, token) {
if (job.attemptsMade == 1) {
return delay(250)
.then(() => {
job.moveToDelayed(2500, token);
return delay(500);
})
.then(() => {
throw new DelayedError();
});
}
};
File renamed without changes.
2 changes: 1 addition & 1 deletion tests/fixtures/fixture_processor_with_extra_param.js
Expand Up @@ -6,7 +6,7 @@

const delay = require('./delay');

module.exports = function (job, extraParam) {
module.exports = function (job, token, extraParam) {
return delay(500).then(() => {
return 42;
});
Expand Down
49 changes: 47 additions & 2 deletions tests/test_sandboxed_process.ts
Expand Up @@ -73,7 +73,7 @@ function sandboxProcessTests(
await worker.close();
});

describe('when processor has more than 1 param', () => {
describe('when processor has more than 2 params', () => {
it('should ignore extra params, process and complete', async () => {
const processFile =
__dirname + '/fixtures/fixture_processor_with_extra_param.js';
Expand Down Expand Up @@ -429,7 +429,8 @@ function sandboxProcessTests(
});

it('should process and update data', async () => {
const processFile = __dirname + '/fixtures/fixture_processor_update.js';
const processFile =
__dirname + '/fixtures/fixture_processor_update_data.js';

const worker = new Worker(queueName, processFile, {
connection,
Expand Down Expand Up @@ -459,6 +460,50 @@ function sandboxProcessTests(
await worker.close();
});

it('should process and move to delayed', async () => {
const processFile =
__dirname + '/fixtures/fixture_processor_move_to_delayed.js';

const worker = new Worker(queueName, processFile, {
connection,
drainDelay: 1,
useWorkerThreads,
});

const delaying = new Promise<void>((resolve, reject) => {
queueEvents.on('delayed', async ({ delay }) => {
try {
expect(Number(delay)).to.be.greaterThanOrEqual(2500);
expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(
1,
);
expect(worker['childPool'].getAllFree()).to.have.lengthOf(0);
resolve();
} catch (err) {
reject(err);
}
});
});

const completing = new Promise<void>((resolve, reject) => {
worker.on('completed', async (job: Job) => {
expect(job.data.bar).to.be.equal('foo');
resolve();
});
});

const job = await queue.add('test', { bar: 'foo' });

await delaying;

const state = await queue.getJobState(job.id!);

expect(state).to.be.equal('delayed');

await completing;
await worker.close();
});

describe('when env variables are provided', () => {
it('shares env variables', async () => {
const processFile = __dirname + '/fixtures/fixture_processor_env.js';
Expand Down

0 comments on commit 4c4559b

Please sign in to comment.