Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bull Module not connecting as expected #1950

Closed
2 of 4 tasks
centralizer opened this issue Jan 5, 2024 · 1 comment
Closed
2 of 4 tasks

Bull Module not connecting as expected #1950

centralizer opened this issue Jan 5, 2024 · 1 comment
Labels
bug Something isn't working needs triage

Comments

@centralizer
Copy link

centralizer commented Jan 5, 2024

Is there an existing issue for this?

  • I have searched the existing issues

Current behavior

I am having issues getting BullModule to work in a production env like Heroku and Digital Ocean. I get the same results from both so I am thinking I have not done something properly, although I feel I have followed the docs properly.

In my main app module I have done the following

BullModule.forRootAsync({
  imports: [ConfigModule],
  inject: [ConfigService],
  useFactory: async (configService: ConfigService) => ({
    redis: {
      host: configService.get("REDIS_HOST"),
      port: configService.get("REDIS_PORT"),
      username: configService.get("REDIS_USER"),
      password: configService.get("REDIS_PASSWORD"),
    },
  }),
}),

I then have another module in which is imported into the main module setup as follows with the registerQueue as well as the producers and consumers as providers:

@Module({
  imports: [
    TypeOrmModule.forFeature([TranscriptEntity]),
    BullModule.registerQueue(
      {
        name: "download-youtube-queue",
      },
      {
        name: "transcribe-queue",
      },
    ),
    AuthModule,
    HttpModule,
    MediaModule,
    UsersModule,
    YoutubModule,
  ],
  controllers: [TranscriptsController],
  providers: [
    AssemblyAiService,
    YoutubeDownloadConsumer,
    YoutubeDownloadProducer,
    TranscriptsService,
    TranscriptConsumer,
    TranscriptProducer,
    {
      provide: "TranscriptsRepositoryInterface",
      useClass: TranscriptsRepository,
    },
  ],
})
export class TranscriptsModule {}

When I run the application I do not get any errors however the @OnQueueError() listener TranscriptConsumer gets called over and over indefinitely. I have the ability to log into the redis instance so I am really not sure what is going on. Any assistance would be great. Thanks.

Below I have included both the producer and consumer

Consumer:

@Processor("transcribe-queue")
export class TranscriptConsumer {
  constructor(
    private readonly mediasService: MediasService,
    private readonly transcriptsService: TranscriptsService,
    private readonly assemblyAiService: AssemblyAiService,
  ) {}

  @OnQueueCompleted()
  private async onCompleted(job: Job) {
    const { data = {} } = job;

    if (data?.file) {
      fs.unlinkSync(data?.file);
    }
  }

  @OnQueueError()
  private async onError(job: Job) {
    const { data = {} } = job;

    if (data?.file) {
      fs.unlinkSync(data?.file);
    }
  }

  @OnQueueFailed()
  private async onFailed(job: Job) {
    const { data = {} } = job;

    if (data?.file) {
      fs.unlinkSync(data?.file);
    }
  }

  @Process("transcribe-file-job")
  private async handleJob(job: Job) {
    const { data = {} } = job;

    const transcription = await this.assemblyAiService.updoadAudio(data);

    if (transcription.status === "error") {
      Sentry.captureException(transcription.error);
    }

    if (transcription.text) {
      try {
        await this.transcriptsService.update(data.transcript.id, {
          text: transcription.text,
          transcribed: true,
        });

        const transcript = await this.transcriptsService.findOne(
          data.transcript.id,
        );

        const media = await this.mediasService.findOne(data.transcript.mediaId);
        
        media.transcript = transcript;
        await this.mediasService.save(media);
      } catch (e) {
        Sentry.captureException(e);
      }
    }
  }
}

Producer:

@Injectable()
export class TranscriptProducer {
  constructor(@InjectQueue("transcribe-queue") private queue: Queue) {}

  public async queueJob(data: {
    transcript: TranscriptDataInterface;
    file: string;
  }) {
    await this.queue.add("transcribe-file-job", data);
  }
}

Minimum reproduction code

https://gist.github.com/centralizer/139552a11df0e8817e243ada1eef7b41

Steps to reproduce

No response

Expected behavior

The module should work

Package version

10.0.1

Bull version

10.0.1

NestJS version

10.0.1

Node.js version

No response

In which operating systems have you tested?

  • macOS
  • Windows
  • Linux

Other

No response

@centralizer centralizer added bug Something isn't working needs triage labels Jan 5, 2024
@kamilmysliwiec
Copy link
Member

Thank you for taking the time to submit your report! From the looks of it, this could be better discussed on our Discord. If you haven't already, please join here and send a new post in the #⁠ 🐈 nestjs-help forum. Make sure to include a link to this issue, so you don't need to write it all again. We have a large community of helpful members, who will assist you in getting this to work.

@nestjs nestjs locked and limited conversation to collaborators Jan 5, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
bug Something isn't working needs triage
Projects
None yet
Development

No branches or pull requests

2 participants