Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exceptions swallowed #26

Closed
rantecki opened this issue Feb 1, 2018 · 5 comments
Closed

Exceptions swallowed #26

rantecki opened this issue Feb 1, 2018 · 5 comments

Comments

@rantecki
Copy link

rantecki commented Feb 1, 2018

Any exceptions raised in the promise appear to be swallowed, invoking the notorious 'Unhandled promise rejection' message under Node.js.

Is there any way to catch/handle errors that occur while processing a queue?

Perhaps a q.onError(callback) kind of thing?

Of course one could just add a global handler for unhandled rejections via process.on('unhandledRejection'..., although you then lose the ability to trap/handle errors within a particular context.

@lirbank
Copy link

lirbank commented Feb 14, 2018

To catch the error, you can do this:

q.add(() => someJob()).catch((error) => {
  console.log(error);
})

Or using async/await:

(async () => {
  try {
    await q.add(() => someJob());
  } catch(error) {
    console.log(error)
  }
})();

Full example:

import Queue from 'p-queue';

async function someJob(i) {
  const delay = Math.round(Math.random() * 1000);
  console.log('start', i.toString().padStart(2), delay);
  await new Promise(r => setTimeout(r, delay));
  console.log('end  ', i.toString().padStart(2));
  if (i === 2) throw new Error('some error ' + i);
  return 'some result';
}

const q = new Queue({concurrency: 3});

let c = 0;
q.add(() => someJob(c++)).catch(error => {
  console.log(error);
});
q.add(() => someJob(c++)).catch(error => {
  console.log(error);
});
q.add(() => someJob(c++)).catch(error => {
  console.log(error);
});
q.add(() => someJob(c++)).catch(error => {
  console.log(error);
});

let c = 0;
(async () => {
  try {
    await q.add(() => someJob(c++));
    await q.add(() => someJob(c++));
    await q.add(() => someJob(c++));
    await q.add(() => someJob(c++));
  } catch (e) {
    console.log(e);
  }
})();

StackBlitz fiddle here: https://stackblitz.com/edit/react-ps6xjs (open your browser console to see the results).

@rantecki
Copy link
Author

rantecki commented Feb 20, 2018

Thanks for this. I appreciate the fiddle.

In the async/await example it appears that you're waiting for each job to finish before you even add the next job to the queue. That kind of defeats the purpose of the queue doesn't it?

I guess I was hoping for something more like this:

  q.add(() => someJob(c++));
  q.add(() => someJob(c++));
  q.add(() => someJob(c++));
  q.add(() => someJob(c++));
  try {
    await q.onIdle();
  } catch (e) {
    console.log(e);
  }

Or even this:

  try {
    q.add(() => someJob(c++));
    q.add(() => someJob(c++));
    q.add(() => someJob(c++));
    q.add(() => someJob(c++));
    await q.onIdle();
  } catch (e) {
    console.log(e);
  }

Unfortunately the error is uncaught in these cases.

This does however seem to work:

  const jobs = [];
  jobs.push(q.add(() => someJob(c++)));
  jobs.push(q.add(() => someJob(c++)));
  jobs.push(q.add(() => someJob(c++)));
  jobs.push(q.add(() => someJob(c++)));
  try {
    await Promise.all(jobs);
  } catch (error) {
    console.log(error);
  }
  await q.onIdle();

Note that in this case the Promise.all() is really doing the job of q.onIdle(), making the q.onIdle() call fairly redundant (apart from waiting for the remaining non-errored jobs to finish). In my view this seems like a doubling-up and q.onIdle() should really be able to do this by itself without the extra boilerplate.

Thoughts?

@lirbank
Copy link

lirbank commented Feb 22, 2018

That's right, Promise.all is your friend when handling (errors of) parallel promises. It has nothing to do with p-queue, it's just how Promises in general may be handled. I was just trying to give a (too) simple example of error handling with promises, great that you figured it out 😃

I guess the use case for p-queue is something like this; Imagine you have 100,000 items you need to write to the DB (can be a REST call or anything else that takes resources or takes time). Then you can take the following two avenues in vanilla JavaScript:

for (const item of items) {
  await writeToDB(item);
}

Which would write them one a the time: Ugh, this will be slow, so let's parallelize:

await Promise.all(items.map(item => writeToDB(item)));

Ok, this should be much faster. But jikes, all of the writes will go of at once. Do we really want to fire 100,000 (or say the length of items is arbitrary so we don't even know ahead of time) write requests at once, can our DB handle it, etc?

Ok, so we want number two, but we want to limit the number of parallel writes at the same time. Enter p-queue!

await Promise.all(items.map(item => q.add(() => writeToDB(item))))

Now p-queue will prevent more than X writeToDB calls being run at any given time, while we're still awaiting all the promises.

@rantecki
Copy link
Author

What you say makes sense. I probably don't need to use onIdle() at all. Thanks for the help.

@catamphetamine
Copy link

catamphetamine commented Feb 22, 2019

A follow-up on @rantecki 's proposed code snippet:

I would advise against using try { await Promise.all(tasks) } there because it would defeat the whole point of using the queue for parallelizing the workload.

This code would work for not swallowing errors that are thrown from someJob():

  q.add(() => someJob(...));
  q.add(() => someJob(...));
  q.add(() => someJob(...));
  q.add(() => someJob(...));
  try {
    await q.onIdle();
  } catch (e) {
    // `someJob` threw an error.
    // Handle it somehow.
    // For example, print it to the console.
    console.error(e);
  }

My Queue implementation:

import PQueue from 'p-queue';

import waitForPromises from './waitForPromises.js';

// `Queue` executes tasks in parallel within the limits of `concurrency`.
export default class Queue {
  constructor({
    concurrency = getDefaultConcurrency()
  } = {}) {
    this.promises = [];
    this.queue = new PQueue({ concurrency, autoStart: false });
  }

  add(task) {
    this.promises.push(this.queue.add(task));
  }

  addAll(tasks) {
    for (const task of tasks) {
      this.add(task);
    }
    // `p-queue.addAll()` returns a `Promise.all()` which is not a correct way
    // to wait for all those `Promise`s. Instead, it should've done something like
    // `Promise.allSettled()` and then check for any "rejected" ones like
    // `waitForPromises()` does.
    // this.promises.push(this.queue.addAll(tasks));
  }

  // Returns a `Promise`.
  _waitUntilFinished() {
    const promises = this.promises;
    this.promises = [];
    // `p-queue` doesn't properly do `try/catch` during `await p-queue.onIdle()`.
    // https://github.com/sindresorhus/p-queue/issues/26
    // https://github.com/sindresorhus/p-queue/issues/29
    // This `Queue` implementation fixes that by `await`ing on `this.promises`.
    return waitForPromises(promises).then(() => {
      // If there were any new tasks added to the queue since it has been started
      // then wait for those new tasks to end too.
      if (this.promises.length > 0) {
        return this._waitUntilFinished();
      }
    });
  }

  async run() {
    this.promises = [this.queue.onIdle()].concat(this.promises);
    this.queue.start();
    await this._waitUntilFinished();
  }
}

function getDefaultConcurrency() {
  if (typeof QUEUE_CONCURRENCY !== 'undefined') {
    return QUEUE_CONCURRENCY;
  }
  return 10;
}
export default async function waitForPromises(promises, { printErrors = defaultPrintErrors } = {}) {
  const errors = [];
  const results = await Promise.allSettled(promises);
  for (const result of results) {
    if (result.status === 'fulfilled') {
      // The promise has finished.
    } else {
      // The promise has errored.
      const error = result.reason;
      errors.push(error);
    }
  }
  if (errors.length > 0) {
    if (printErrors) {
      printErrors(errors);
    }
    const error = new Error(errors[0].message);
    error.errors = errors;
    throw error;
  }
}

function defaultPrintErrors(errors) {
  console.error(`* ${errors.length} Promise${errors.length === 1 ? '' : 's'} Errored *`)
  for (const error of errors) {
    console.error(error.stack || error);
    // There may be deeply nested properties in an error
    // if it was thrown from a stack of `waitForPromises()` calls.
    // Example: `error.errors[0].errors[0].errors[]`.
    console.error(JSON.stringify(error, null, 2));
  }
}
  errorHandlingQueue.addAll(...)
  errorHandlingQueue.addAll(...)
  await errorHandlingQueue.run()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants