Skip to content

Commit

Permalink
feat(job): pass queueName into sandbox (#1053) fixes #1050 ref #1051
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Feb 8, 2022
1 parent 8fc4b7e commit 12bb19c
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 5 deletions.
15 changes: 13 additions & 2 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
WorkerOptions,
RedisClient,
} from '../interfaces';
import { JobState } from '../types';
import { JobState, JobJsonSandbox } from '../types';
import {
errorObject,
isEmpty,
Expand Down Expand Up @@ -317,6 +317,17 @@ export class Job<
};
}

/**
* Prepares a job to be passed to Sandbox.
* @returns
*/
asJSONSandbox(): JobJsonSandbox {
return {
...this.asJSON(),
queueName: this.queueName,
};
}

/**
* Updates a job's data
*
Expand Down Expand Up @@ -737,7 +748,7 @@ export class Job<

/**
* Returns a promise the resolves when the job has completed (containing the return value of the job),
* or rejects when the job has failed (containing the failedReason).
* or rejects when the job has failed (containing the failedReason).
*
* @param queueEvents - Instance of QueueEvents.
* @param ttl - Time in milliseconds to wait for job to finish before timing out.
Expand Down
2 changes: 1 addition & 1 deletion src/classes/sandbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const sandbox = <T, R, N extends string>(

await parentSend(child, {
cmd: ChildCommand.Start,
job: job.asJSON(),
job: job.asJSONSandbox(),
});

const done: Promise<R> = new Promise((resolve, reject) => {
Expand Down
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './finished-target';
export * from './job-json-sandbox';
export * from './job-type';
3 changes: 3 additions & 0 deletions src/types/job-json-sandbox.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { JobJson } from '../interfaces';

export type JobJsonSandbox = JobJson & { queueName: string };
13 changes: 13 additions & 0 deletions tests/fixtures/fixture_processor_queueName.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/**
* A processor file to be used in tests.
*
*/
'use strict';

const delay = require('./delay');

module.exports = function (job) {
return delay(500).then(() => {
return job.queueName;
});
};
35 changes: 33 additions & 2 deletions tests/test_sandboxed_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ describe('sandboxed process', () => {
drainDelay: 1,
});

const completing = new Promise<void>((resolve, reject) => {
const completing = new Promise<void>(resolve => {
worker.on('completed', async (job: Job, value: any) => {
expect(job.data).to.be.eql({ foo: 'bar' });
expect(value).to.be.eql(1);
Expand Down Expand Up @@ -144,7 +144,7 @@ describe('sandboxed process', () => {
drainDelay: 1,
});

const completing = new Promise<void>((resolve, reject) => {
const completing = new Promise<void>(resolve => {
worker.on('completed', async (job: Job, value: any) => {
expect(job.data).to.be.eql({ foo: 'bar' });
expect(value).to.be.eql(1);
Expand Down Expand Up @@ -359,6 +359,37 @@ describe('sandboxed process', () => {
});
});

it('includes queueName', async () => {
const processFile = __dirname + '/fixtures/fixture_processor_queueName.js';

const worker = new Worker(queueName, processFile, {
connection,
drainDelay: 1,
});

const completing = new Promise<void>((resolve, reject) => {
worker.on('completed', async (job: Job, value: any) => {
try {
expect(job.data).to.be.eql({ foo: 'bar' });
expect(value).to.be.eql(queueName);
expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(0);
expect(worker['childPool'].free[processFile]).to.have.lengthOf(1);
await worker.close();
resolve();
} catch (err) {
await worker.close();
reject(err);
}
});
});

await queue.add('test', { foo: 'bar' });

await completing;

await worker.close();
});

it('should process and fail', async () => {
const processFile = __dirname + '/fixtures/fixture_processor_fail.js';

Expand Down

0 comments on commit 12bb19c

Please sign in to comment.