Skip to content

Commit

Permalink
feat(flows): allow parent on root jobs in addBulk method (#1488) ref #…
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Oct 24, 2022
1 parent 6191040 commit 92308e5
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 96 deletions.
17 changes: 16 additions & 1 deletion src/classes/flow-producer.ts
Expand Up @@ -336,7 +336,22 @@ export class FlowProducer extends EventEmitter {
* @returns
*/
protected addNodes(multi: ChainableCommander, nodes: FlowJob[]): JobNode[] {
return nodes.map(node => this.addNode({ multi, node }));
return nodes.map(node => {
const parentOpts = node?.opts?.parent;
const parentKey = getParentKey(parentOpts);
const parentDependenciesKey = parentKey
? `${parentKey}:dependencies`
: undefined;

return this.addNode({
multi,
node,
parent: {
parentOpts,
parentDependenciesKey,
},
});
});
}

private async getNode(client: RedisClient, node: NodeOpts): Promise<JobNode> {
Expand Down
297 changes: 202 additions & 95 deletions tests/test_flow.ts
Expand Up @@ -2100,122 +2100,229 @@ describe('flows', () => {
await removeAllQueueData(new IORedis(), parentQueueName);
});

it('should process jobs when using addBulk', async () => {
const name = 'child-job';
const values = [
{ idx: 0, bar: 'something' },
{ idx: 1, baz: 'something' },
];
describe('.addBulk', () => {
it('should allow parent opts on the root job', async () => {
const name = 'child-job';
const values = [{ bar: 'something' }, { baz: 'something' }];

const rootQueueName = 'root-queue';
const parentQueueName = `parent-queue-${v4()}`;
const grandparentQueueName = `grandparent-queue-${v4()}`;
const grandparentQueue = new Queue(grandparentQueueName, { connection });
const grandparentJob = await grandparentQueue.add('grandparent', {
foo: 'bar',
});

let childrenProcessor,
rootProcessor,
processedChildren = 0,
processedRoot = 0;
const processingChildren = new Promise<void>((resolve, reject) => [
(childrenProcessor = async (job: Job) => {
try {
processedChildren++;
if (processedChildren === values.length) {
resolve();
}
return values[job.data.idx];
} catch (err) {
reject(err);
}
}),
]);
let childrenProcessor,
parentProcessor,
processedChildren = 0;
const processingChildren = new Promise<void>(
resolve =>
(childrenProcessor = async (job: Job) => {
processedChildren++;

const processingRoot = new Promise<void>((resolve, reject) => [
(rootProcessor = async (job: Job) => {
try {
const childrenValues = await job.getChildrenValues();
const index = job.name === 'root-job-1' ? 0 : 1;
const jobKey = queue.toKey(trees[index].children[0].job.id);
expect(childrenValues[jobKey]).to.be.deep.equal(values[index]);
if (processedChildren == values.length) {
resolve();
}
return values[job.data.idx];
}),
);

const processingParent = new Promise<void>((resolve, reject) => [
(parentProcessor = async (job: Job) => {
try {
const { processed, nextProcessedCursor } =
await job.getDependencies({
processed: {},
});
expect(nextProcessedCursor).to.be.equal(0);
expect(Object.keys(processed)).to.have.length(2);

const childrenValues = await job.getChildrenValues();

processedRoot++;
if (processedRoot === 2) {
for (let i = 0; i < values.length; i++) {
const jobKey = queue.toKey(tree.children[i].job.id);
expect(childrenValues[jobKey]).to.be.deep.equal(values[i]);
}
resolve();
} catch (err) {
console.error(err);
reject(err);
}
return processedRoot;
} catch (err) {
console.error(err);
reject(err);
}
}),
]);
}),
]);

const flow = new FlowProducer({ connection });
const trees = await flow.addBulk([
{
name: 'root-job-1',
queueName: rootQueueName,
data: {},
children: [
{
name,
data: { idx: 0, foo: 'bar' },
queueName,
},
],
},
{
name: 'root-job-2',
queueName: rootQueueName,
data: {},
children: [
{
name,
data: { idx: 1, foo: 'baz' },
queueName,
const parentWorker = new Worker(parentQueueName, parentProcessor, {
connection,
});
const childrenWorker = new Worker(queueName, childrenProcessor, {
connection,
});
await parentWorker.waitUntilReady();
await childrenWorker.waitUntilReady();

const flow = new FlowProducer({ connection });
const [tree] = await flow.addBulk([
{
name: 'parent-job',
queueName: parentQueueName,
data: {},
children: [
{ name, data: { idx: 0, foo: 'bar' }, queueName },
{ name, data: { idx: 1, foo: 'baz' }, queueName },
],
opts: {
parent: {
id: grandparentJob.id,
queue: `bull:${grandparentQueueName}`,
},
},
],
},
]);
},
]);

expect(trees).to.have.length(2);
expect(tree).to.have.property('job');
expect(tree).to.have.property('children');

expect(trees[0]).to.have.property('job');
expect(trees[0]).to.have.property('children');
const { children, job } = tree;

expect(trees[1]).to.have.property('job');
expect(trees[1]).to.have.property('children');
expect(job.parentKey).to.be.equal(
`bull:${grandparentQueueName}:${grandparentJob.id}`,
);
const parentState = await job.getState();

const firstJob = trees[0];
const isFirstJobWaitingChildren = await firstJob.job.isWaitingChildren();
expect(isFirstJobWaitingChildren).to.be.true;
expect(firstJob.children).to.have.length(1);
expect(parentState).to.be.eql('waiting-children');
expect(children).to.have.length(2);

expect(firstJob.children[0].job.id).to.be.ok;
expect(firstJob.children[0].job.data.foo).to.be.eql('bar');
expect(firstJob.children).to.have.length(1);
await processingChildren;
await childrenWorker.close();

const secondJob = trees[1];
const isSecondJobWaitingChildren = await secondJob.job.isWaitingChildren();
expect(isSecondJobWaitingChildren).to.be.true;
expect(secondJob.children).to.have.length(1);
await processingParent;
await parentWorker.close();

expect(secondJob.children[0].job.id).to.be.ok;
expect(secondJob.children[0].job.data.foo).to.be.eql('baz');
await flow.close();

const parentWorker = new Worker(rootQueueName, rootProcessor, {
connection,
});
const childrenWorker = new Worker(queueName, childrenProcessor, {
connection,
await grandparentQueue.close();
await removeAllQueueData(new IORedis(), grandparentQueueName);
await removeAllQueueData(new IORedis(), parentQueueName);
});

await processingChildren;
await childrenWorker.close();
it('should process jobs', async () => {
const name = 'child-job';
const values = [
{ idx: 0, bar: 'something' },
{ idx: 1, baz: 'something' },
];

await processingRoot;
await parentWorker.close();
const rootQueueName = 'root-queue';

await flow.close();
let childrenProcessor,
rootProcessor,
processedChildren = 0,
processedRoot = 0;
const processingChildren = new Promise<void>((resolve, reject) => [
(childrenProcessor = async (job: Job) => {
try {
processedChildren++;
if (processedChildren === values.length) {
resolve();
}
return values[job.data.idx];
} catch (err) {
reject(err);
}
}),
]);

const processingRoot = new Promise<void>((resolve, reject) => [
(rootProcessor = async (job: Job) => {
try {
const childrenValues = await job.getChildrenValues();
const index = job.name === 'root-job-1' ? 0 : 1;
const jobKey = queue.toKey(trees[index].children[0].job.id);
expect(childrenValues[jobKey]).to.be.deep.equal(values[index]);

await removeAllQueueData(new IORedis(), rootQueueName);
processedRoot++;
if (processedRoot === 2) {
resolve();
}
return processedRoot;
} catch (err) {
console.error(err);
reject(err);
}
}),
]);

const flow = new FlowProducer({ connection });
const trees = await flow.addBulk([
{
name: 'root-job-1',
queueName: rootQueueName,
data: {},
children: [
{
name,
data: { idx: 0, foo: 'bar' },
queueName,
},
],
},
{
name: 'root-job-2',
queueName: rootQueueName,
data: {},
children: [
{
name,
data: { idx: 1, foo: 'baz' },
queueName,
},
],
},
]);

expect(trees).to.have.length(2);

expect(trees[0]).to.have.property('job');
expect(trees[0]).to.have.property('children');

expect(trees[1]).to.have.property('job');
expect(trees[1]).to.have.property('children');

const firstJob = trees[0];
const isFirstJobWaitingChildren = await firstJob.job.isWaitingChildren();
expect(isFirstJobWaitingChildren).to.be.true;
expect(firstJob.children).to.have.length(1);

expect(firstJob.children[0].job.id).to.be.ok;
expect(firstJob.children[0].job.data.foo).to.be.eql('bar');
expect(firstJob.children).to.have.length(1);

const secondJob = trees[1];
const isSecondJobWaitingChildren =
await secondJob.job.isWaitingChildren();
expect(isSecondJobWaitingChildren).to.be.true;
expect(secondJob.children).to.have.length(1);

expect(secondJob.children[0].job.id).to.be.ok;
expect(secondJob.children[0].job.data.foo).to.be.eql('baz');

const parentWorker = new Worker(rootQueueName, rootProcessor, {
connection,
});
const childrenWorker = new Worker(queueName, childrenProcessor, {
connection,
});

await processingChildren;
await childrenWorker.close();

await processingRoot;
await parentWorker.close();

await flow.close();

await removeAllQueueData(new IORedis(), rootQueueName);
});
});

describe('remove', () => {
Expand Down

0 comments on commit 92308e5

Please sign in to comment.