Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

job moves to completed when receiving data but before processing it. #804

Closed
pnodet opened this issue Oct 11, 2021 · 3 comments
Closed

job moves to completed when receiving data but before processing it. #804

pnodet opened this issue Oct 11, 2021 · 3 comments

Comments

@pnodet
Copy link

pnodet commented Oct 11, 2021

Hi,

I am trying to fetch a large amount of data and thought to use bull to help handling it.
I am quite new to bull but you will find below roughly how I dit it.

However, the job is moved to complete when the fetch action is done.
The job will then pipe the stream and process the data.

const worker = new Worker('downloads', async job => {
	const responseStream = await fetch(job.data.url);
	responseStream.body
		.pipe(ndjson.parse())
		.on('data', async currentGame => {
			const game = await generateGame(currentGame);
			await game.populate('user');
			await game.save()
		.on('pause', () => {
			console.log('pause');
		})
		.on('end', () => {
			console.log('end');
		})
		.on('error', error => {
			console.log(new Error(error));
		});
}, {connection});

worker.on('completed', job => {
	console.log(`${job.id} has completed!`);
});
@mull
Copy link

mull commented Oct 13, 2021

Your second statement, starting with responseStream.body, does not return a promise. Hence, the worker finishes immediately. You want to either return a promise or find something you can await.

@pnodet
Copy link
Author

pnodet commented Oct 13, 2021

Hey, thanks for the answer!

Do you know where can I find documentation on this? When does the job know it should end? Is there any option to make the job open until I manually move it to completed ?

I will admit I am having a hard time implementing this.

I update the code but the completed event is still triggered before any progress.

const worker = new Worker('downloads', async job => {
    fetch(job.data.url).
        then(responseStream => {
            new Promise((resolve, reject) => {
                responseStream.body
                    .pipe(ndjson.parse())
		    .on('data', async current => {
			const game = await generateGame(currentGame);
			await game.populate('user');
			await game.save()
		    })
		    .on('end', resolve)
		    .on('error', reject);
		    })
		    .then(() => {
			console.log(`end`);
		    })
		    .catch(error => {
			console.error(error);
		    });
            });
    },
    {connection},
);

@manast
Copy link
Contributor

manast commented Oct 14, 2021

You are still not returning the promise, try this:

const worker = new Worker(
  "downloads",
  async (job) => 
    fetch(job.data.url).then((responseStream) =>
      new Promise((resolve, reject) => {
        responseStream.body
          .pipe(ndjson.parse())
          .on("data", async (current) => {
            const game = await generateGame(currentGame);
            await game.populate("user");
            await game.save();
          })
          .on("end", resolve)
          .on("error", reject);
      })
    ),
  { connection }
);

@taskforcesh taskforcesh locked and limited conversation to collaborators Oct 14, 2021
@manast manast closed this as completed Oct 14, 2021

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants