From d0386f2d097ea1af3b8de00e1724d33662a490b4 Mon Sep 17 00:00:00 2001 From: Vasilica Olariu Date: Mon, 6 Oct 2025 18:02:49 +0300 Subject: [PATCH 1/3] add some more logs --- src/shared/modules/global/queue-scheduler.service.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/shared/modules/global/queue-scheduler.service.ts b/src/shared/modules/global/queue-scheduler.service.ts index 47566d8..847882b 100644 --- a/src/shared/modules/global/queue-scheduler.service.ts +++ b/src/shared/modules/global/queue-scheduler.service.ts @@ -115,9 +115,16 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { } if (this.jobsHandlersMap.has(jobId)) { + this.logger.log( + `Found job handler for ${jobId}. Calling with '${resolution}' resolution.`, + ); this.jobsHandlersMap.get(jobId)?.call(null, resolution); this.jobsHandlersMap.delete(jobId); + this.logger.log('JobHandlers left:', [...this.jobsHandlersMap.keys()]); } else { + this.logger.log( + `No job handler found for ${jobId}. Calling with boss.'${resolution}' for queue ${queueName}.`, + ); await this.boss[resolution](queueName, jobId); } From f13ba12ba95109d560234c8e427fde0e7f5d0640 Mon Sep 17 00:00:00 2001 From: Vasilica Olariu Date: Mon, 6 Oct 2025 18:03:42 +0300 Subject: [PATCH 2/3] deploy to dev --- .circleci/config.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index 9a427ba..9b7a392 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -76,6 +76,7 @@ workflows: - develop - feat/ai-workflows - pm-1955_2 + - re-try-failed-jobs - 'build-prod': From 35ada6e4e83b62e93a084fe9e7174d81d3c7640b Mon Sep 17 00:00:00 2001 From: Vasilica Olariu Date: Tue, 7 Oct 2025 14:06:15 +0300 Subject: [PATCH 3/3] Handle job failure --- package.json | 4 +- pnpm-lock.yaml | 20 +++---- .../modules/global/queue-scheduler.service.ts | 26 +++++++-- .../modules/global/workflow-queue.handler.ts | 53 ++++++++++--------- 4 files changed, 62 insertions(+), 41 deletions(-) diff --git a/package.json b/package.json index 0f6f9fa..1e5cc2d 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,7 @@ "lodash": "^4.17.21", "multer": "^2.0.1", "nanoid": "~5.1.2", - "pg-boss": "^11.0.2", + "pg-boss": "^11.0.5", "reflect-metadata": "^0.2.2", "rxjs": "^7.8.1", "tc-core-library-js": "appirio-tech/tc-core-library-js.git#v3.0.1" @@ -104,4 +104,4 @@ "coverageDirectory": "../coverage", "testEnvironment": "node" } -} +} \ No newline at end of file diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 508952d..9cba1a7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -72,8 +72,8 @@ importers: specifier: ~5.1.2 version: 5.1.2 pg-boss: - specifier: ^11.0.2 - version: 11.0.2 + specifier: ^11.0.5 + version: 11.0.6 reflect-metadata: specifier: ^0.2.2 version: 0.2.2 @@ -2566,9 +2566,9 @@ packages: create-require@1.1.1: resolution: {integrity: sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==} - cron-parser@4.9.0: - resolution: {integrity: sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==} - engines: {node: '>=12.0.0'} + cron-parser@5.4.0: + resolution: {integrity: sha512-HxYB8vTvnQFx4dLsZpGRa0uHp6X3qIzS3ZJgJ9v6l/5TJMgeWQbLkR5yiJ5hOxGbc9+jCADDnydIe15ReLZnJA==} + engines: {node: '>=18'} cross-spawn@7.0.6: resolution: {integrity: sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==} @@ -4022,8 +4022,8 @@ packages: perfect-debounce@1.0.0: resolution: {integrity: sha512-xCy9V055GLEqoFaHoC1SoLIaLmWctgCUaBaWxDZ7/Zx4CTyX7cJQLJOok/orfjZAh9kEYpjJa4d0KcJmCbctZA==} - pg-boss@11.0.2: - resolution: {integrity: sha512-33KQtJpBvsF1C0zkMk2fGKZAptssrSxlban8CqSLJkoY5cwhiJwZL8uOv4T4OkZgbmlcy3nDdBdYOnlRFM+Qcw==} + pg-boss@11.0.6: + resolution: {integrity: sha512-ZjRDDDeGd2UeM8BMY85f0z7r/koP4F2njGN2I6s2m64i/DxGdZW4zIUmE6CQ17CkMP9n8wnqebLFC6ICQTxkDQ==} engines: {node: '>=22'} pg-cloudflare@1.2.7: @@ -7905,7 +7905,7 @@ snapshots: create-require@1.1.1: {} - cron-parser@4.9.0: + cron-parser@5.4.0: dependencies: luxon: 3.7.2 @@ -9611,9 +9611,9 @@ snapshots: perfect-debounce@1.0.0: {} - pg-boss@11.0.2: + pg-boss@11.0.6: dependencies: - cron-parser: 4.9.0 + cron-parser: 5.4.0 pg: 8.16.3 serialize-error: 8.1.0 transitivePeerDependencies: diff --git a/src/shared/modules/global/queue-scheduler.service.ts b/src/shared/modules/global/queue-scheduler.service.ts index 847882b..ed6c889 100644 --- a/src/shared/modules/global/queue-scheduler.service.ts +++ b/src/shared/modules/global/queue-scheduler.service.ts @@ -14,6 +14,7 @@ import { policies, Queue } from 'pg-boss'; export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { private readonly logger: Logger = new Logger(QueueSchedulerService.name); private boss: PgBoss; + private $start; private jobsHandlersMap = new Map< string, @@ -46,7 +47,7 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { return; } - await this.boss.start(); + await (this.$start = this.boss.start()); } async onModuleDestroy() { @@ -114,6 +115,16 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { return; } + if (resolution === 'fail') { + // IMPORTANT! + // thes 4 operations will update the cache for the active singletons in the database + // and will allow the jobs queue to go next or retry + await this.boss.cancel(queueName, jobId); + await this.boss.getQueueStats(queueName); + await this.boss.supervise(queueName); + await this.boss.resume(queueName, jobId); + } + if (this.jobsHandlersMap.has(jobId)) { this.logger.log( `Found job handler for ${jobId}. Calling with '${resolution}' resolution.`, @@ -122,13 +133,17 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { this.jobsHandlersMap.delete(jobId); this.logger.log('JobHandlers left:', [...this.jobsHandlersMap.keys()]); } else { - this.logger.log( - `No job handler found for ${jobId}. Calling with boss.'${resolution}' for queue ${queueName}.`, - ); await this.boss[resolution](queueName, jobId); } this.logger.log(`Job ${jobId} ${resolution} called.`); + + if (resolution === 'fail') { + const bossJob = await this.boss.getJobById(queueName, jobId); + if (bossJob && bossJob.retryCount >= bossJob.retryLimit) { + throw new Error('Job failed! Retry limit reached!'); + } + } } async handleWorkForQueues( @@ -142,7 +157,7 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { return; } - await this.boss.start(); + await this.$start; return Promise.all( queuesNames.map(async (queueName) => { const queue = await this.boss.getQueue(queueName); @@ -162,6 +177,7 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy { jobId: string, handler: (resolution?: string, result?: any) => void, ) { + this.logger.log(`Registering job handler for job ${jobId}.`); this.jobsHandlersMap.set(jobId, handler); } } diff --git a/src/shared/modules/global/workflow-queue.handler.ts b/src/shared/modules/global/workflow-queue.handler.ts index 4e515ef..4c9c65e 100644 --- a/src/shared/modules/global/workflow-queue.handler.ts +++ b/src/shared/modules/global/workflow-queue.handler.ts @@ -100,14 +100,19 @@ export class WorkflowQueueHandler implements OnModuleInit { // return not-resolved promise, // this will put a pause on the job // until it is marked as completed via webhook call - return new Promise((resolve, reject) => { + await new Promise((resolve, reject) => { this.scheduler.registerJobHandler( job.id, (resolution: string = 'complete', result: any) => { + this.logger.log( + `Job handler called with ${resolution} and ${result}`, + ); (resolution === 'fail' ? reject : resolve)(result); }, ); }); + + this.logger.log(`Job ${job.id} promise finished.`); } async handleWorkflowRunEvents(event: { @@ -263,24 +268,6 @@ export class WorkflowQueueHandler implements OnModuleInit { break; } - if (conclusion === 'FAILURE') { - await this.scheduler.completeJob( - (aiWorkflowRun as any).workflow.gitWorkflowId, - aiWorkflowRun.scheduledJobId as string, - 'fail', - ); - - this.logger.log({ - message: 'Workflow job failed. Calling retry.', - aiWorkflowRunId: aiWorkflowRun.id, - gitRunId: event.workflow_job.run_id, - jobId: event.workflow_job.id, - status: conclusion, - timestamp: new Date().toISOString(), - }); - break; - } - await this.prisma.aiWorkflowRun.update({ where: { id: aiWorkflowRun.id }, data: { @@ -289,13 +276,31 @@ export class WorkflowQueueHandler implements OnModuleInit { completedJobs: { increment: 1 }, }, }); - await this.scheduler.completeJob( - (aiWorkflowRun as any).workflow.gitWorkflowId, - aiWorkflowRun.scheduledJobId as string, - ); + + try { + await this.scheduler.completeJob( + (aiWorkflowRun as any).workflow.gitWorkflowId, + aiWorkflowRun.scheduledJobId as string, + conclusion === 'FAILURE' ? 'fail' : 'complete', + ); + + if (conclusion === 'FAILURE') { + this.logger.log({ + message: `Workflow job ${aiWorkflowRun.id} failed. Retrying!`, + aiWorkflowRunId: aiWorkflowRun.id, + gitRunId: event.workflow_job.run_id, + jobId: event.workflow_job.id, + status: conclusion, + timestamp: new Date().toISOString(), + }); + return; + } + } catch (e) { + this.logger.log(aiWorkflowRun.id, e.message); + } this.logger.log({ - message: 'Workflow job completed', + message: `Workflow job ${aiWorkflowRun.id} completed with conclusion: ${conclusion}`, aiWorkflowRunId: aiWorkflowRun.id, gitRunId: event.workflow_job.run_id, jobId: event.workflow_job.id,