Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't define a concurrency with a 0 value using the processors field of BullModuleOptions #590

Closed
noneedinmagic opened this issue Oct 14, 2020 · 3 comments

Comments

@noneedinmagic
Copy link

I'm submitting a...


[ ] Regression 
[x] Bug report
[ ] Feature request
[ ] Documentation issue or request
[ ] Support request => Please do not submit support request here, instead post your question on Stack Overflow.

Current behavior

This issue almost completely similar to issue #258 .
The only difference that I needed to set up sandboxed processors, so I ended up using BullModuleOptions.processors field, but setting zero-concurrency did not get any effect. After some testing I found out that this is because 0 is falsy value and skipped by the filter:

args = args.filter(arg => !!arg);

Expected behavior

Simple workaround:

args = args.filter(arg => !!arg);

Just one string to be changed to

args = args.filter(arg => arg !== undefined);

Very similar change was introduced in the #259
(0f891c3#diff-5088cd359ffe5490625cf08afdeaa0d6bf6b0f967735abd547fc8abd3a3c4119R82)

To be honest, I have no experience in submitting proper PRs on GitHub yet, so cannot do this myself right now, but I hope that I've given enough info/examples here :)

Environment


Nest version: 7.4.2
Nest bull version: 0.2.2 (0.1.2 also)

 
For Tooling issues:
- Node version: 12.18.0 
- Platform: Windows 

Others:

@noneedinmagic
Copy link
Author

Temporary workaround for anyone who might be stuck with this as well as me:

import { OnApplicationShutdown, Provider, DynamicModule } from '@nestjs/common';
import * as Bull from 'bull';
import { Queue } from 'bull';
import {
  BullModule,
  BullQueueProcessor,
  BullModuleOptions,
  BullModuleAsyncOptions,
  getQueueOptionsToken,
  getQueueToken,
} from '@nestjs/bull';
import {
  isAdvancedProcessor,
  isAdvancedSeparateProcessor,
  isProcessorCallback,
  isSeparateProcessor,
} from '@nestjs/bull/dist/utils/helpers';
import { createQueueOptionProviders } from '@nestjs/bull/dist/bull.providers';

function buildQueue(option: BullModuleOptions): Queue {
  const queue: Queue = new Bull(option.name ? option.name : 'default', option);
  if (option.processors) {
    option.processors.forEach((processor: BullQueueProcessor) => {
      let args = [];
      if (isAdvancedProcessor(processor)) {
        args.push(processor.name, processor.concurrency, processor.callback);
      } else if (isAdvancedSeparateProcessor(processor)) {
        args.push(processor.name, processor.concurrency, processor.path);
      } else if (isSeparateProcessor(processor)) {
        args.push(processor);
      } else if (isProcessorCallback(processor)) {
        args.push(processor);
      }
      // args = args.filter(arg => !!arg);
      args = args.filter((arg) => arg !== undefined); // FIX for concurrency == 0
      queue.process.call(queue, ...args);
    });
  }
  ((queue as unknown) as OnApplicationShutdown).onApplicationShutdown = function (
    this: Queue,
  ) {
    return this.close();
  };
  return queue;
}

function createQueueProviders(options: BullModuleOptions[]): Provider[] {
  return options.map((option) => ({
    provide: getQueueToken(option.name),
    useFactory: (o: BullModuleOptions) => {
      const queueName = o.name || option.name;
      return buildQueue({ ...o, name: queueName });
    },
    inject: [getQueueOptionsToken(option.name)],
  }));
}

export class AppBullModule extends BullModule {
  static registerQueue(...options: BullModuleOptions[]): DynamicModule {
    const queueProviders = createQueueProviders([].concat(options));
    const queueOptionProviders = createQueueOptionProviders([].concat(options));

    return {
      ...super.registerQueue(...options),
      providers: [...queueOptionProviders, ...queueProviders],
    };
  }

  static registerQueueAsync(
    ...options: BullModuleAsyncOptions[]
  ): DynamicModule {
    const queueProviders = createQueueProviders([].concat(options));
    // const asyncQueueOptionsProviders = options
    //   .map(queueOptions => this.createAsyncProviders(queueOptions))
    //   .reduce((a, b) => a.concat(b), []);

    const old = super.registerQueueAsync(...options);

    // TODO: check and avoid doubling providers if needed
    return {
      ...old,
      providers: [...old.providers, ...queueProviders],
    };
  }
}

Another solution is to create Queue directly by using Bull package, not NestJS wrapper.

@kamilmysliwiec
Copy link
Member

Would you like to create a PR for this issue?

@kamilmysliwiec
Copy link
Member

Fixed in 0.2.3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants