Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ workflows:
- develop
- feat/ai-workflows
- pm-1955_2
- re-try-failed-jobs


- 'build-prod':
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"lodash": "^4.17.21",
"multer": "^2.0.1",
"nanoid": "~5.1.2",
"pg-boss": "^11.0.2",
"pg-boss": "^11.0.5",
"reflect-metadata": "^0.2.2",
"rxjs": "^7.8.1",
"tc-core-library-js": "appirio-tech/tc-core-library-js.git#v3.0.1"
Expand Down Expand Up @@ -104,4 +104,4 @@
"coverageDirectory": "../coverage",
"testEnvironment": "node"
}
}
}
20 changes: 10 additions & 10 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 25 additions & 2 deletions src/shared/modules/global/queue-scheduler.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { policies, Queue } from 'pg-boss';
export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
private readonly logger: Logger = new Logger(QueueSchedulerService.name);
private boss: PgBoss;
private $start;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming the variable $start to a more descriptive name that indicates its purpose or usage within the service. Using a more descriptive name can improve code readability and maintainability.


private jobsHandlersMap = new Map<
string,
Expand Down Expand Up @@ -46,7 +47,7 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
return;
}

await this.boss.start();
await (this.$start = this.boss.start());
}

async onModuleDestroy() {
Expand Down Expand Up @@ -114,14 +115,35 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
return;
}

if (resolution === 'fail') {
// IMPORTANT!
// thes 4 operations will update the cache for the active singletons in the database
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in comment: 'thes' should be 'these'.

// and will allow the jobs queue to go next or retry
await this.boss.cancel(queueName, jobId);
await this.boss.getQueueStats(queueName);
await this.boss.supervise(queueName);
await this.boss.resume(queueName, jobId);
}

if (this.jobsHandlersMap.has(jobId)) {
this.logger.log(
`Found job handler for ${jobId}. Calling with '${resolution}' resolution.`,
);
this.jobsHandlersMap.get(jobId)?.call(null, resolution);
this.jobsHandlersMap.delete(jobId);
this.logger.log('JobHandlers left:', [...this.jobsHandlersMap.keys()]);
} else {
await this.boss[resolution](queueName, jobId);
}

this.logger.log(`Job ${jobId} ${resolution} called.`);

if (resolution === 'fail') {
const bossJob = await this.boss.getJobById(queueName, jobId);
if (bossJob && bossJob.retryCount >= bossJob.retryLimit) {
throw new Error('Job failed! Retry limit reached!');
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider logging a message before throwing an error to provide more context about the failure.

}
}
}

async handleWorkForQueues<T>(
Expand All @@ -135,7 +157,7 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
return;
}

await this.boss.start();
await this.$start;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line await this.$start; seems to be incorrect. It looks like it should be await this.boss.start(); instead. Ensure that the method start() is being called on the correct object, which appears to be this.boss based on the previous code.

return Promise.all(
queuesNames.map(async (queueName) => {
const queue = await this.boss.getQueue(queueName);
Expand All @@ -155,6 +177,7 @@ export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
jobId: string,
handler: (resolution?: string, result?: any) => void,
) {
this.logger.log(`Registering job handler for job ${jobId}.`);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling or validation to ensure that jobId and handler are valid before logging and setting them in jobsHandlersMap. This can prevent potential runtime errors if invalid data is passed.

this.jobsHandlersMap.set(jobId, handler);
}
}
53 changes: 29 additions & 24 deletions src/shared/modules/global/workflow-queue.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,19 @@ export class WorkflowQueueHandler implements OnModuleInit {
// return not-resolved promise,
// this will put a pause on the job
// until it is marked as completed via webhook call
return new Promise<void>((resolve, reject) => {
await new Promise<void>((resolve, reject) => {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change from return to await here suggests that the surrounding function is asynchronous. Ensure that the function signature reflects this by including async if it is not already present.

this.scheduler.registerJobHandler(
job.id,
(resolution: string = 'complete', result: any) => {
this.logger.log(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider providing more context in the log message for better traceability. For example, include the job ID in the log message to make it easier to identify which job the log entry pertains to.

`Job handler called with ${resolution} and ${result}`,
);
(resolution === 'fail' ? reject : resolve)(result);
},
);
});

this.logger.log(`Job ${job.id} promise finished.`);
}

async handleWorkflowRunEvents(event: {
Expand Down Expand Up @@ -263,24 +268,6 @@ export class WorkflowQueueHandler implements OnModuleInit {
break;
}

if (conclusion === 'FAILURE') {
await this.scheduler.completeJob(
(aiWorkflowRun as any).workflow.gitWorkflowId,
aiWorkflowRun.scheduledJobId as string,
'fail',
);

this.logger.log({
message: 'Workflow job failed. Calling retry.',
aiWorkflowRunId: aiWorkflowRun.id,
gitRunId: event.workflow_job.run_id,
jobId: event.workflow_job.id,
status: conclusion,
timestamp: new Date().toISOString(),
});
break;
}

await this.prisma.aiWorkflowRun.update({
where: { id: aiWorkflowRun.id },
data: {
Expand All @@ -289,13 +276,31 @@ export class WorkflowQueueHandler implements OnModuleInit {
completedJobs: { increment: 1 },
},
});
await this.scheduler.completeJob(
(aiWorkflowRun as any).workflow.gitWorkflowId,
aiWorkflowRun.scheduledJobId as string,
);

try {
await this.scheduler.completeJob(
(aiWorkflowRun as any).workflow.gitWorkflowId,
aiWorkflowRun.scheduledJobId as string,
conclusion === 'FAILURE' ? 'fail' : 'complete',
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conclusion parameter is being used to determine the status passed to completeJob. Ensure that conclusion is correctly set and validated before this point to avoid unexpected behavior.

);

if (conclusion === 'FAILURE') {
this.logger.log({
message: `Workflow job ${aiWorkflowRun.id} failed. Retrying!`,
aiWorkflowRunId: aiWorkflowRun.id,
gitRunId: event.workflow_job.run_id,
jobId: event.workflow_job.id,
status: conclusion,
timestamp: new Date().toISOString(),
});
return;
}
} catch (e) {
this.logger.log(aiWorkflowRun.id, e.message);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider providing more context in the error logging. Currently, only the aiWorkflowRun.id and e.message are logged. Including additional information such as the stack trace or error type might be helpful for debugging.

}

this.logger.log({
message: 'Workflow job completed',
message: `Workflow job ${aiWorkflowRun.id} completed with conclusion: ${conclusion}`,
aiWorkflowRunId: aiWorkflowRun.id,
gitRunId: event.workflow_job.run_id,
jobId: event.workflow_job.id,
Expand Down