From a2c42900d6eeae1e81e20a4532cff05dd768d4b3 Mon Sep 17 00:00:00 2001 From: Vasilica Olariu Date: Mon, 6 Oct 2025 17:13:20 +0300 Subject: [PATCH] Fixes for workflow retries. also make sure to handle existing promises first --- .../modules/global/queue-scheduler.service.ts | 16 ++++++++++++---- .../modules/global/workflow-queue.handler.ts | 19 ++++++++----------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/shared/modules/global/queue-scheduler.service.ts b/src/shared/modules/global/queue-scheduler.service.ts index 7db31b1..47566d8 100644 --- a/src/shared/modules/global/queue-scheduler.service.ts +++ b/src/shared/modules/global/queue-scheduler.service.ts @@ -15,7 +15,10 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { private readonly logger: Logger = new Logger(QueueSchedulerService.name); private boss: PgBoss; - private jobsHandlersMap = new Map void>(); + private jobsHandlersMap = new Map< + string, + (resolution?: 'fail' | 'complete', result?: any) => void + >(); get isEnabled() { return String(process.env.DISPATCH_AI_REVIEW_WORKFLOWS) === 'true'; @@ -111,11 +114,13 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { return; } - await this.boss[resolution](queueName, jobId); if (this.jobsHandlersMap.has(jobId)) { - this.jobsHandlersMap.get(jobId)?.call(null); + this.jobsHandlersMap.get(jobId)?.call(null, resolution); this.jobsHandlersMap.delete(jobId); + } else { + await this.boss[resolution](queueName, jobId); } + this.logger.log(`Job ${jobId} ${resolution} called.`); } @@ -146,7 +151,10 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { ); } - registerJobHandler(jobId: string, handler: () => void) { + registerJobHandler( + jobId: string, + handler: (resolution?: string, result?: any) => void, + ) { this.jobsHandlersMap.set(jobId, handler); } } diff --git a/src/shared/modules/global/workflow-queue.handler.ts b/src/shared/modules/global/workflow-queue.handler.ts index effffb2..4e515ef 100644 --- a/src/shared/modules/global/workflow-queue.handler.ts +++ b/src/shared/modules/global/workflow-queue.handler.ts @@ -93,14 +93,20 @@ export class WorkflowQueueHandler implements OnModuleInit { data: { status: 'DISPATCHED', scheduledJobId: job.id, + completedJobs: 0, }, }); // return not-resolved promise, // this will put a pause on the job // until it is marked as completed via webhook call - return new Promise((resolve) => { - this.scheduler.registerJobHandler(job.id, () => resolve()); + return new Promise((resolve, reject) => { + this.scheduler.registerJobHandler( + job.id, + (resolution: string = 'complete', result: any) => { + (resolution === 'fail' ? reject : resolve)(result); + }, + ); }); } @@ -258,15 +264,6 @@ export class WorkflowQueueHandler implements OnModuleInit { } if (conclusion === 'FAILURE') { - // reset data for aiWorkflowRun - await this.prisma.aiWorkflowRun.update({ - where: { id: aiWorkflowRun.id }, - data: { - status: 'INIT', - completedJobs: 0, - }, - }); - await this.scheduler.completeJob( (aiWorkflowRun as any).workflow.gitWorkflowId, aiWorkflowRun.scheduledJobId as string,