Skip to content

Conversation

vas3a
Copy link
Collaborator

@vas3a vas3a commented Oct 7, 2025

Handle failed job. Clear cached queue stats before re-trying/

@vas3a vas3a requested a review from kkartunov October 7, 2025 11:09
engines: {node: '>=12.0.0'}
cron-parser@5.4.0:
resolution: {integrity: sha512-HxYB8vTvnQFx4dLsZpGRa0uHp6X3qIzS3ZJgJ9v6l/5TJMgeWQbLkR5yiJ5hOxGbc9+jCADDnydIe15ReLZnJA==}
engines: {node: '>=18'}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The updated cron-parser package now requires Node.js version >=18. Ensure that the deployment environment supports this version to prevent runtime issues.

perfect-debounce@1.0.0: {}

pg-boss@11.0.2:
pg-boss@11.0.6:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure that the update from pg-boss@11.0.2 to pg-boss@11.0.6 is compatible with the rest of the codebase and does not introduce any breaking changes.

pg-boss@11.0.6:
dependencies:
cron-parser: 4.9.0
cron-parser: 5.4.0
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verify that the update from cron-parser: 4.9.0 to cron-parser: 5.4.0 is compatible with the current implementation and does not introduce any breaking changes.

export class QueueSchedulerService implements OnModuleInit, OnModuleDestroy {
private readonly logger: Logger = new Logger(QueueSchedulerService.name);
private boss: PgBoss;
private $start;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming the variable $start to a more descriptive name that indicates its purpose or usage within the service. Using a more descriptive name can improve code readability and maintainability.


if (resolution === 'fail') {
// IMPORTANT!
// thes 4 operations will update the cache for the active singletons in the database
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in comment: 'thes' should be 'these'.

if (resolution === 'fail') {
const bossJob = await this.boss.getJobById(queueName, jobId);
if (bossJob && bossJob.retryCount >= bossJob.retryLimit) {
throw new Error('Job failed! Retry limit reached!');
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider logging a message before throwing an error to provide more context about the failure.

}

await this.boss.start();
await this.$start;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line await this.$start; seems to be incorrect. It looks like it should be await this.boss.start(); instead. Ensure that the method start() is being called on the correct object, which appears to be this.boss based on the previous code.

jobId: string,
handler: (resolution?: string, result?: any) => void,
) {
this.logger.log(`Registering job handler for job ${jobId}.`);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling or validation to ensure that jobId and handler are valid before logging and setting them in jobsHandlersMap. This can prevent potential runtime errors if invalid data is passed.

// this will put a pause on the job
// until it is marked as completed via webhook call
return new Promise<void>((resolve, reject) => {
await new Promise<void>((resolve, reject) => {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change from return to await here suggests that the surrounding function is asynchronous. Ensure that the function signature reflects this by including async if it is not already present.

this.scheduler.registerJobHandler(
job.id,
(resolution: string = 'complete', result: any) => {
this.logger.log(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider providing more context in the log message for better traceability. For example, include the job ID in the log message to make it easier to identify which job the log entry pertains to.

await this.scheduler.completeJob(
(aiWorkflowRun as any).workflow.gitWorkflowId,
aiWorkflowRun.scheduledJobId as string,
conclusion === 'FAILURE' ? 'fail' : 'complete',
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conclusion parameter is being used to determine the status passed to completeJob. Ensure that conclusion is correctly set and validated before this point to avoid unexpected behavior.

return;
}
} catch (e) {
this.logger.log(aiWorkflowRun.id, e.message);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider providing more context in the error logging. Currently, only the aiWorkflowRun.id and e.message are logged. Including additional information such as the stack trace or error type might be helpful for debugging.

Copy link
Contributor

@kkartunov kkartunov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good and we tested it well.

@kkartunov kkartunov merged commit b3f3235 into develop Oct 7, 2025
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants