Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug? - Error in processor permanently breaks entire queue #956

Closed
gottlike opened this issue Dec 20, 2021 · 18 comments · Fixed by #991
Closed

Bug? - Error in processor permanently breaks entire queue #956

gottlike opened this issue Dec 20, 2021 · 18 comments · Fixed by #991
Labels

Comments

@gottlike
Copy link

gottlike commented Dec 20, 2021

Consider the following setup:

// Queuer
import { Queue } from 'bullmq'


const queue = new Queue('mailer'),
      mails = [
        {
          from: "manast@taskforce.sh",
          subject: "This is a simple test",
          text: "An email sent using BullMQ",
          to: 'user@localhost',
          attachments: [{ filename: 'test.txt', content: Buffer.from('hello world!', 'utf-8') }]
        },
        {
          from: "manast@taskforce.sh",
          subject: "This is a simple test",
          text: "An email sent using BullMQ",
          to: 'user@localhost',
          attachments: []
        },
        {
          from: "manast@taskforce.sh",
          subject: "This is a simple test",
          text: "An email sent using BullMQ",
          to: 'user@localhost',
          attachments: []
        },
        {
          from: "manast@taskforce.sh",
          subject: "This is a simple test",
          text: "An email sent using BullMQ",
          to: 'user@localhost',
          attachments: []
        }
      ]


;(async () => {
  for (let mail of mails) {
    await queue.add('mails', mail, {
      attempts: 0,
      backoff: {
        type: 'exponential',
        delay: 1000
      }
    })
  }
})
// Logger
import { QueueEvents } from 'bullmq'


const queueEvents = new QueueEvents('mailer')

queueEvents.on('failed', async ({ jobId, failedReason }) => {
  console.log(`Job ${jobId} permanantly failed, because: ${failedReason}`)
})
// Processor
import nodemailer from 'nodemailer'
import { Job } from 'bullmq'


const transporter = nodemailer.createTransport({ sendmail: true, newline: 'unix' })

export default async (job: Job) => {
  const response = await transporter.sendMail(job.data)
  console.log(job.id, response.messageId)
}

When running this, the queue breaks when the first erroneous mail is processed. The "logger" will console.log something like Failed job 9880, because: The "chunk" argument must be of type string or an instance of Buffer or Uint8Array. Received an instance of Object, which is correct and expected, but all subsequent jobs will fail, too. Even though they are correct and Nodemailer doesn't throw an error here.

Am I missing something?

@manast
Copy link
Contributor

manast commented Dec 20, 2021

Ok, so you say all subsequent jobs fail too, which what error exactly?

@gottlike
Copy link
Author

gottlike commented Dec 20, 2021

Same error, only the job ID is updated:

image

@gottlike
Copy link
Author

Wait, actually this is not reproducible with the example I have given. I'll give you a working example, one moment.

@gottlike
Copy link
Author

gottlike commented Dec 20, 2021

Okay, so I updated my comments with a piece of code that actually triggers the issue.

Interestingly, when I add the attachment to the queue and get the data via job.data the error occurs. However, when I directly overwrite the property inside the processor like this, it runs fine:

job.data.attachments = [{ filename: 'test.txt', content: Buffer.from('hello world!', 'utf-8') }]

When I console.log the original job.data.attachments that I get back from BullMQ, this is the output:

[
  { filename: 'test.txt', content: { type: 'Buffer', data: [Array] } }
]

So, to me it looks like it could be an issue with queueing Buffer data.. ? But regardless, an error of any kind probably shouldn't lead to the queue being broken.

@Pho3niX90
Copy link

Have you tried Buffer.from(content.data)

@gottlike
Copy link
Author

I found a way of fixing the application, but the issue with BullMQ kind of remains: It should not fail all subsequent jobs, once this type of error happens. This is quite concerning for use in production.

@Pho3niX90
Copy link

I found a way of fixing the application, but the issue with BullMQ kind of remains: It should not fail all subsequent jobs, once this type of error happens. This is quite concerning for use in production.

You should probably add a try catch around the problematic line of code. I personally haven't seen it then force fail all other jobs, unless all those jobs in the screenshot were concurrent on the same worker. If not, they probably fail due to the same issue, judgeing from the console logs.

I have witness, however, the worker will cease to accept anymore jobs and go offline when an unhandled exception occurred.

@gottlike
Copy link
Author

Yeah, I tried try .. catch and inline .catch, but the issue always persists. Also the jobs are definitely not processed concurrently. Really don't know why BullMQ behaves that way 🤷🏼‍♂️. I'll just have to make really sure that the input is correct.

@manast
Copy link
Contributor

manast commented Dec 25, 2021

I found a way of fixing the application, but the issue with BullMQ kind of remains: It should not fail all subsequent jobs, once this type of error happens. This is quite concerning for use in production.

BullMQ does not behave in this way, a failing job cannot break the queue. However, I see you are not using a QueueScheduler instance in your setup. Since you want to use retries, this is needed otherwise the delayed jobs will never be promoted. Have you checked the tutorials for exactly this use case? https://blog.taskforce.sh/implementing-mail-microservice-with-bullmq/

@gottlike
Copy link
Author

gottlike commented Dec 25, 2021

BullMQ does not behave in this way, a failing job cannot break the queue. However, I see you are not using a QueueScheduler instance in your setup. Since you want to use retries, this is needed otherwise the delayed jobs will never be promoted. Have you checked the tutorials for exactly this use case? https://blog.taskforce.sh/implementing-mail-microservice-with-bullmq/

When starting the project, I took your tutorial as a template, so yes (and thanks for the tutorial) :)

Could you please try to run this newly created repo (npm start) on your local machine: https://github.com/gottlike/tmp-mq

At least on my machine this reproduces the issue. However, I'm using Node v16.12.0 on an M1 (Apple Silicon) Mac and use the latest Redis inside Docker.. maybe that's part of the problem, in case you cannot reproduce the issue on your machine.

Edit: Merry Christmas btw 🎄

@manast
Copy link
Contributor

manast commented Dec 26, 2021

image

@gottlike
Copy link
Author

gottlike commented Dec 26, 2021

You are probably not using Node v16: https://stackoverflow.com/a/69063138/974647

If you don't want to update to v16 permanently, you can temporarily install/use it with NVM: https://www.digitalocean.com/community/tutorials/nodejs-node-version-manager

@gottlike
Copy link
Author

@manast Have you been able to test it with v16 on your machine?

@manast
Copy link
Contributor

manast commented Jan 14, 2022

I haven't dig down yet but if I have to guess this is a bug in NodeJS, maybe you could try with other versions? Because this code also reproduces the issue, which makes absolutely no sense:

import nodemailer from "nodemailer";

// Types
import { Job } from "bullmq";

const transporter = nodemailer.createTransport({
  sendmail: true,
  newline: "unix",
});

export default async (job: Job) => {
  try {
    const response = await transporter.sendMail(job.data);
    console.log("This is not printed")
  } catch (err) {
    console.log("This is not printed either")
    console.log(err);
  }
};

i.e., the jobs are not even failing, still it breaks the queue, so there must be some weird intra-process side effect at play here.

@manast
Copy link
Contributor

manast commented Jan 14, 2022

If there was no bug in node then one of that console.log would be printed, but they are not, it is as if the nodejs process itself crashes or something...

@manast
Copy link
Contributor

manast commented Jan 14, 2022

I found that the callstack for the error looks like this:

node:internal/streams/writable:312
      throw new ERR_INVALID_ARG_TYPE(
      ^

TypeError [ERR_INVALID_ARG_TYPE]: The "chunk" argument must be of type string or an instance of Buffer or Uint8Array. Received an instance of Object
    at new NodeError (node:internal/errors:371:5)
    at _write (node:internal/streams/writable:312:13)
    at PassThrough.Writable.end (node:internal/streams/writable:609:17)
    at Immediate.<anonymous> (/Users/manuelastudillo/Dev/tmp-mq/node_modules/nodemailer/lib/mime-node/index.js:978:46)
    at processImmediate (node:internal/timers:464:21) {
  code: 'ERR_INVALID_ARG_TYPE'
}

So this adds to the assumption that it is a bug in node, since the error is produced in the core and probably breaks V8. Now the question is if using a sandboxed processor in BullMQ could somehow detect this and spawn a new child process, I am spending some time to investigate if this is possible or not.

@github-actions
Copy link

🎉 This issue has been resolved in version 1.64.1 🎉

The release is available on:

Your semantic-release bot 📦🚀

@gottlike
Copy link
Author

Great, thanks for following up and making BullMQ even more robust!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants