Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new sibling job in flow at runtime #703

Closed
blankenshipz opened this issue Aug 24, 2021 · 8 comments
Closed

Add new sibling job in flow at runtime #703

blankenshipz opened this issue Aug 24, 2021 · 8 comments

Comments

@blankenshipz
Copy link

As a worker, I'd like to be able to inject a new job into a flow at the same level as the current job (same parentId) I tried just creating a new job with the same parentId via JobsOptions.parent but the parent job still ran before the new child. Any advice on how to get my desired behavior?

@blankenshipz
Copy link
Author

blankenshipz commented Aug 31, 2021

I confirmed that my sibling job does not get added to the dependencies set for the parent job; if I add it to that set manually the parent does wait for my but once my job finishes the parent does not ever start - my new manually added job was never removed from the depencies set of the parent despite completing successfully and having the parent.id in JobOptions

@roggervalf
Copy link
Collaborator

hi @blankenshipz, I'm trying to catch your case, could you please add a reproducible test case to take a look on this? you can also use https://github.com/taskforcesh/bullmq/blob/master/src/test/test_flow.ts as reference for flows cases

@blankenshipz
Copy link
Author

@roggervalf Sorry I'm just now getting back to this; I was going through the code and to me it looks like this should be possible -- maybe I'm doing something incorrectly?

updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey, parentId, jobIdKey, returnvalue)

This line seems to imply that if you create a new job with a parent.id specified that the dependencies for the parent should automatically get updated? Even if the parent already exists. Is that a correct understanding?

I tried this and it doesn't appear to happen; I also tried using the redis SDK to update the depedencies set for the parent manually and it didn't correct the ordering; the parent still ran before the new child and finished with these "logs"

Error: Job 187b7c07-9a3e-47e1-901f-ac85d259f9ad has pending dependencies

Any thoughts about what to do next? I'm not sure I can create a simple test case.

@blankenshipz
Copy link
Author

blankenshipz commented Feb 1, 2022

@roggervalf @manast Here's my attempt at a test case

  it('should process children before the parent when jobs have been added to the flow at runtime', async () => {
    const name = 'child-job';
    const values = [
      { bar: 'something' },
      { baz: 'something' },
      { qux: 'something' },
    ];

    const parentQueueName = `parent-queue-${v4()}`;

    let childrenProcessor,
      parentProcessor,
      processedChildren = 0;

    const processingChildren = new Promise<void>(
      resolve =>
        (childrenProcessor = async (job: Job) => {
          processedChildren++;

          if(processedChildren == 1) {
            await queue.add(name, { data: { qux: 'something', idx: 2}, parent: { id: job?.parent.id, queue: queue.name}})
          }

          console.log("Processing:  " + job.data.idx)

          return job.data.idx
        }),
    );

    const processingParent = new Promise<void>((resolve, reject) => [
      (parentProcessor = async (job: Job) => {
        try {
          const { processed, nextProcessedCursor } = await job.getDependencies({
            processed: {},
          });

          expect(nextProcessedCursor).to.be.equal(0);
          expect(Object.keys(processed)).to.have.length(3);

          const childrenValues = await job.getChildrenValues();

          resolve();
        } catch (err) {
          console.error(err);
          reject(err);
        }
      }),
    ]);

    const parentWorker = new Worker(parentQueueName, parentProcessor, {
      connection,
    });
    const childrenWorker = new Worker(queueName, childrenProcessor, {
      connection,
    });
    await parentWorker.waitUntilReady();
    await childrenWorker.waitUntilReady();

    const flow = new FlowProducer({ connection });
    const tree = await flow.add({
      name: 'parent-job',
      queueName: parentQueueName,
      data: {},
      children: [
        { name, data: { idx: 0, foo: 'bar' }, queueName },
        { name, data: { idx: 1, foo: 'baz' }, queueName },
      ],
    });

    expect(tree).to.have.property('job');
    expect(tree).to.have.property('children');

    const { children, job } = tree;
    const parentState = await job.getState();

    expect(parentState).to.be.eql('waiting-children');

    await processingChildren;
    await childrenWorker.close();

    expect(job.getDependenciesCount()).to.eql(3)

    await processingParent;
    await parentWorker.close();

    await flow.close();
    await removeAllQueueData(new IORedis(), parentQueueName);
  });

I'm not sure if this actually works as a test case but the intention is that if a flow has already been submitted and a child job adds a new dependency that dependency should be added to the parent.

In the test I simulate having one of the children add a sibling:

            await queue.add(name, { data: { qux: 'something', idx: 2}, parent: { id: job?.parent.id, queue: queue.name}})

and then I have an expectation that after the children finish processing the parent will have three children

    await processingChildren;
    await childrenWorker.close();

    expect(job.getDependenciesCount()).to.eql(3)

This expectation fails right now, maybe I'm missing something?

@blankenshipz
Copy link
Author

blankenshipz commented Feb 2, 2022

@roggervalf Here's a more succinct test

  it("waits for jobs added to parents at runtime", async function () {
    const connection =  {
      host: 'redis',
      port: 6379,
    }
 
    const childQueue = new Queue('childQueue', { connection });
    const parentQueue = new Queue('parentQueue', { connection });

    let childrenProcessor, processedChildren = 0;

    const processingChildren = new Promise<void>(
      resolve => {
        childrenProcessor = async (job: Job) => {
          processedChildren++;

          // Allow for processing the first two children but not the third 
          // that was dynamically added
          if (processedChildren === 2) {
            // Resolve the promise so the childworker stops
            resolve();

            // I don't think this is needed but ensure the resolve works by waiting
            await new Promise(f => setTimeout(f, 1000));

            // Add a new child late to the game during this childs processing
            // which should be awaited by the close()
            childQueue.add('child3',
             {
               data: { place: 'floors' },
               parent: { id: job.parent.id || "bad", queue: parentQueue.name }
             }
            );
          }
          return "something"
        }
      });

    const childWorker = new Worker(childQueue.name, childrenProcessor, { connection });
    await childWorker.waitUntilReady()

    const flowProducer = new FlowProducer({connection});
    const flow = await flowProducer.add({
      name: 'renovate-interior',
      queueName: parentQueue.name,
      children: [
        { name: 'child1', data: { place: 'ceiling' }, queueName: childQueue.name},
        { name: 'child2', data: { place: 'walls' }, queueName:  childQueue.name},
      ],
    })

    await processingChildren;
    await childWorker.close()

    // Failing; the flow is being moved to "waiting"
    expect(await flow.job.getState()).toEqual("waiting-children")
  });

@roggervalf
Copy link
Collaborator

hi @blankenshipz, I modified your first example a little bit, there were some data that were passed in an incorrect way:

it('should process children before the parent when jobs have been added to the flow at runtime', async () => {
    const name = 'child-job';
    const values = [
      { bar: 'something' },
      { baz: 'something' },
      { qux: 'something' },
    ];

    const parentQueueName = `parent-queue-${v4()}`;

    let childrenProcessor,
      parentProcessor,
      processedChildren = 0;

    const processingChildren = new Promise<void>(
      resolve =>
        (childrenProcessor = async (job: Job) => {
          processedChildren++;

          if(processedChildren == 1) {
            await queue.add(name, {  qux: 'something', idx: 2}, 
            {parent:{ id: job?.parent.id, queue: `bull:${parentQueueName}`}});
          }

          console.log("Processing:  " + job.data.idx);

          if(processedChildren == 3) {
            resolve();
          }
          return job.data.idx;
        }),
    );

    const processingParent = new Promise<void>((resolve, reject) => [
      (parentProcessor = async (job: Job) => {
        try {
          const { processed, nextProcessedCursor } = await job.getDependencies({
            processed: {},
          });

          expect(nextProcessedCursor).to.be.equal(0);
          expect(Object.keys(processed)).to.have.length(3);
          resolve();
        } catch (err) {
          console.error(err);
          reject(err);
        }
      }),
    ]);

    const parentWorker = new Worker(parentQueueName, parentProcessor, {
      connection,
    });
    const childrenWorker = new Worker(queueName, childrenProcessor, {
      connection,
    });
    await parentWorker.waitUntilReady();
    await childrenWorker.waitUntilReady();

    const flow = new FlowProducer({ connection });
    const tree = await flow.add({
      name: 'parent-job',
      queueName: parentQueueName,
      data: {},
      children: [
        { name, data: { idx: 0, foo: 'bar' }, queueName },
        { name, data: { idx: 1, foo: 'baz' }, queueName },
      ],
    });

    expect(tree).to.have.property('job');
    expect(tree).to.have.property('children');

    const { job } = tree;
    const parentState = await job.getState();

    expect(parentState).to.be.eql('waiting-children');

    await processingChildren;
    await childrenWorker.close();
    const dependencyCount = await job.getDependenciesCount();

    expect(dependencyCount.processed).to.eql(3);

    await processingParent;
    await parentWorker.close();

    await flow.close();
    await removeAllQueueData(new IORedis(), parentQueueName);
  });

this one is working

@roggervalf
Copy link
Collaborator

Also maybe this pattern could help https://docs.bullmq.io/patterns/process-step-jobs

@roggervalf
Copy link
Collaborator

As we have a pattern to handle step jobs, this issue could be closed I think

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants