From b49a0ee6ce20853d25af9656e21acba8e26523c0 Mon Sep 17 00:00:00 2001 From: billsedison Date: Mon, 15 Sep 2025 22:23:51 +0800 Subject: [PATCH] Topcoder NestJS Autopilot Phase Transition Logic Optimization --- src/autopilot/dto/autopilot.dto.ts | 13 +- .../interfaces/autopilot.interface.ts | 22 +- src/autopilot/services/autopilot.service.ts | 420 +++++++++++++++-- src/autopilot/services/scheduler.service.ts | 169 ++++++- src/challenge/challenge-api.service.ts | 28 +- src/common/constants/operators.constants.ts | 77 ++++ src/kafka/dto/produce-message.dto.ts | 13 +- src/kafka/templates/autopilot.template.ts | 13 +- src/recovery/recovery.service.ts | 223 ++++++++- src/sync/sync.service.ts | 66 ++- start-local.sh | 4 +- test/autopilot.e2e-spec.ts | 424 +++++++++++++++++- 12 files changed, 1363 insertions(+), 109 deletions(-) create mode 100644 src/common/constants/operators.constants.ts diff --git a/src/autopilot/dto/autopilot.dto.ts b/src/autopilot/dto/autopilot.dto.ts index e98ab13..3e83248 100644 --- a/src/autopilot/dto/autopilot.dto.ts +++ b/src/autopilot/dto/autopilot.dto.ts @@ -6,6 +6,7 @@ import { IsDateString, IsUUID, } from 'class-validator'; +import { AutopilotOperator } from '../interfaces/autopilot.interface'; export class PhaseTransitionDto { @IsDateString() @@ -23,8 +24,8 @@ export class PhaseTransitionDto { @IsEnum(['START', 'END']) state: 'START' | 'END'; - @IsString() - operator: string; + @IsEnum(AutopilotOperator) + operator: AutopilotOperator | string; @IsString() projectStatus: string; @@ -40,8 +41,8 @@ export class ChallengeUpdateDto { @IsString() status: string; - @IsString() - operator: string; + @IsEnum(AutopilotOperator) + operator: AutopilotOperator | string; } export class CommandDto { @@ -57,6 +58,6 @@ export class CommandDto { @IsObject() parameters: Record; - @IsString() - operator: string; + @IsEnum(AutopilotOperator) + operator: AutopilotOperator | string; } diff --git a/src/autopilot/interfaces/autopilot.interface.ts b/src/autopilot/interfaces/autopilot.interface.ts index d134bf3..22e1548 100644 --- a/src/autopilot/interfaces/autopilot.interface.ts +++ b/src/autopilot/interfaces/autopilot.interface.ts @@ -1,3 +1,19 @@ +export enum AutopilotOperator { + // System operators for internal autopilot operations + SYSTEM = 'system', + SYSTEM_SCHEDULER = 'system-scheduler', + SYSTEM_NEW_CHALLENGE = 'system-new-challenge', + SYSTEM_RECOVERY = 'system-recovery', + SYSTEM_SYNC = 'system-sync', + SYSTEM_PHASE_CHAIN = 'system-phase-chain', + + // Administrative operators + ADMIN = 'admin', + + // User operators (when operator comes from external sources) + USER = 'user', +} + export interface BaseMessage { topic: string; originator: string; @@ -10,7 +26,7 @@ export interface PhaseTransitionPayload { phaseId: string; // Changed from number to string to support UUIDs from the API phaseTypeName: string; state: 'START' | 'END'; - operator: string; + operator: AutopilotOperator | string; // Allow both enum and string for flexibility projectStatus: string; date?: string; challengeId: string; // Changed to string to support UUIDs @@ -20,7 +36,7 @@ export interface ChallengeUpdatePayload { projectId: number; challengeId: string; // Changed to string to support UUIDs status: string; - operator: string; + operator: AutopilotOperator | string; // Allow both enum and string for flexibility date?: string; // This nested structure may be present in Kafka messages from the API phases?: { @@ -31,7 +47,7 @@ export interface ChallengeUpdatePayload { export interface CommandPayload { command: string; - operator: string; + operator: AutopilotOperator | string; // Allow both enum and string for flexibility projectId?: number; challengeId?: string; // Added challengeId for new command handling date?: string; diff --git a/src/autopilot/services/autopilot.service.ts b/src/autopilot/services/autopilot.service.ts index 69e2e44..eff460d 100644 --- a/src/autopilot/services/autopilot.service.ts +++ b/src/autopilot/services/autopilot.service.ts @@ -4,8 +4,10 @@ import { PhaseTransitionPayload, ChallengeUpdatePayload, CommandPayload, + AutopilotOperator, } from '../interfaces/autopilot.interface'; import { ChallengeApiService } from '../../challenge/challenge-api.service'; +import { IPhase } from '../../challenge/interfaces/challenge.interface'; import { AUTOPILOT_COMMANDS } from '../../common/constants/commands.constants'; @Injectable() @@ -17,7 +19,24 @@ export class AutopilotService { constructor( private readonly schedulerService: SchedulerService, private readonly challengeApiService: ChallengeApiService, - ) {} + ) { + // Set up the phase chain callback to handle next phase opening and scheduling + this.schedulerService.setPhaseChainCallback( + ( + challengeId: string, + projectId: number, + projectStatus: string, + nextPhases: IPhase[], + ) => { + void this.openAndScheduleNextPhases( + challengeId, + projectId, + projectStatus, + nextPhases, + ); + }, + ); + } schedulePhaseTransition(phaseData: PhaseTransitionPayload): string { try { @@ -117,16 +136,49 @@ export class AutopilotService { `Consumed phase transition event: ${JSON.stringify(message)}`, ); - if (message.state === 'END') { - const canceled = this.cancelPhaseTransition( - message.challengeId, - message.phaseId, - ); - if (canceled) { - this.logger.log( - `Cleaned up job for phase ${message.phaseId} (challenge ${message.challengeId}) from registry after consuming event.`, - ); - } + if (message.state === 'START') { + // Advance the phase (open it) using the scheduler service + void (async () => { + try { + await this.schedulerService.advancePhase(message); + this.logger.log( + `Successfully processed START event for phase ${message.phaseId} (challenge ${message.challengeId})`, + ); + } catch (error) { + const err = error as Error; + this.logger.error( + `Failed to advance phase ${message.phaseId} for challenge ${message.challengeId}: ${err.message}`, + err.stack, + ); + } + })(); + } else if (message.state === 'END') { + // Advance the phase (close it) using the scheduler service + void (async () => { + try { + await this.schedulerService.advancePhase(message); + this.logger.log( + `Successfully processed END event for phase ${message.phaseId} (challenge ${message.challengeId})`, + ); + + // Clean up the scheduled job after closing the phase + const canceled = this.cancelPhaseTransition( + message.challengeId, + message.phaseId, + ); + if (canceled) { + this.logger.log( + `Cleaned up job for phase ${message.phaseId} (challenge ${message.challengeId}) from registry after consuming event.`, + ); + } + } catch (error) { + const err = error as Error; + this.logger.error( + `Failed to advance phase ${message.phaseId} for challenge ${message.challengeId}: ${err.message}`, + err.stack, + ); + } + })(); } } @@ -147,25 +199,50 @@ export class AutopilotService { return; } - for (const phase of challengeDetails.phases) { - if (!phase.scheduledEndDate) { - this.logger.warn( - `Phase ${phase.id} for new challenge ${challenge.challengeId} has no scheduled end date. Skipping.`, - ); - continue; - } - const phaseData: PhaseTransitionPayload = { - projectId: challengeDetails.projectId, - challengeId: challengeDetails.id, - phaseId: phase.id, - phaseTypeName: phase.name, - state: 'END', - operator: 'system-new-challenge', - projectStatus: challengeDetails.status, - date: phase.scheduledEndDate, - }; - this.schedulePhaseTransition(phaseData); + // Find the next phase that should be scheduled (similar to PhaseAdvancer logic) + const nextPhase = this.findNextPhaseToSchedule(challengeDetails.phases); + + if (!nextPhase) { + this.logger.log( + `No phase needs to be scheduled for new challenge ${challenge.challengeId}`, + ); + return; } + + // Determine if we should schedule for start or end based on phase state + const now = new Date(); + const shouldOpen = + !nextPhase.isOpen && + !nextPhase.actualEndDate && + new Date(nextPhase.scheduledStartDate) <= now; + + const scheduleDate = shouldOpen + ? nextPhase.scheduledStartDate + : nextPhase.scheduledEndDate; + const state = shouldOpen ? 'START' : 'END'; + + if (!scheduleDate) { + this.logger.warn( + `Next phase ${nextPhase.id} for new challenge ${challenge.challengeId} has no scheduled ${shouldOpen ? 'start' : 'end'} date. Skipping.`, + ); + return; + } + + const phaseData: PhaseTransitionPayload = { + projectId: challengeDetails.projectId, + challengeId: challengeDetails.id, + phaseId: nextPhase.id, + phaseTypeName: nextPhase.name, + state, + operator: AutopilotOperator.SYSTEM_NEW_CHALLENGE, + projectStatus: challengeDetails.status, + date: scheduleDate, + }; + + this.schedulePhaseTransition(phaseData); + this.logger.log( + `Scheduled next phase ${nextPhase.name} (${nextPhase.id}) for new challenge ${challenge.challengeId}`, + ); } catch (error) { const err = error as Error; this.logger.error( @@ -191,25 +268,50 @@ export class AutopilotService { return; } - for (const phase of challengeDetails.phases) { - if (!phase.scheduledEndDate) continue; - - const payload: PhaseTransitionPayload = { - projectId: challengeDetails.projectId, - challengeId: challengeDetails.id, - phaseId: phase.id, - phaseTypeName: phase.name, - operator: message.operator, - projectStatus: challengeDetails.status, - date: phase.scheduledEndDate, - state: 'END', - }; + // Find the next phase that should be scheduled (similar to PhaseAdvancer logic) + const nextPhase = this.findNextPhaseToSchedule(challengeDetails.phases); + if (!nextPhase) { this.logger.log( - `Rescheduling updated phase from notification: ${challengeDetails.id}:${phase.id}`, + `No phase needs to be rescheduled for updated challenge ${message.challengeId}`, ); - this.reschedulePhaseTransition(challengeDetails.id, payload); + return; } + + // Determine if we should schedule for start or end based on phase state + const now = new Date(); + const shouldOpen = + !nextPhase.isOpen && + !nextPhase.actualEndDate && + new Date(nextPhase.scheduledStartDate) <= now; + + const scheduleDate = shouldOpen + ? nextPhase.scheduledStartDate + : nextPhase.scheduledEndDate; + const state = shouldOpen ? 'START' : 'END'; + + if (!scheduleDate) { + this.logger.warn( + `Next phase ${nextPhase.id} for updated challenge ${message.challengeId} has no scheduled ${shouldOpen ? 'start' : 'end'} date. Skipping.`, + ); + return; + } + + const payload: PhaseTransitionPayload = { + projectId: challengeDetails.projectId, + challengeId: challengeDetails.id, + phaseId: nextPhase.id, + phaseTypeName: nextPhase.name, + operator: message.operator, + projectStatus: challengeDetails.status, + date: scheduleDate, + state, + }; + + this.logger.log( + `Rescheduling next phase ${nextPhase.name} (${nextPhase.id}) for updated challenge ${message.challengeId}`, + ); + this.reschedulePhaseTransition(challengeDetails.id, payload); } catch (error) { const err = error as Error; this.logger.error( @@ -328,6 +430,208 @@ export class AutopilotService { } } + /** + * Find the next phase that should be scheduled based on current phase state. + * Similar logic to PhaseAdvancer.js - only schedule the next phase that should advance. + */ + private findNextPhaseToSchedule(phases: IPhase[]): IPhase | null { + const now = new Date(); + + // First, check for phases that should be open but aren't + const phasesToOpen = phases.filter((phase) => { + if (phase.isOpen || phase.actualEndDate) { + return false; // Already open or already ended + } + + const startTime = new Date(phase.scheduledStartDate); + if (startTime > now) { + return false; // Not time to start yet + } + + // Check if predecessor requirements are met + if (!phase.predecessor) { + return true; // No predecessor, ready to start + } + + const predecessor = phases.find((p) => p.phaseId === phase.predecessor); + return predecessor && predecessor.actualEndDate; // Predecessor has ended + }); + + if (phasesToOpen.length > 0) { + // Return the earliest phase that should be opened + return phasesToOpen.sort( + (a, b) => + new Date(a.scheduledStartDate).getTime() - + new Date(b.scheduledStartDate).getTime(), + )[0]; + } + + // Next, check for open phases that should be closed + const openPhases = phases.filter((phase) => phase.isOpen); + + if (openPhases.length > 0) { + // Return the earliest phase that should end + return openPhases.sort( + (a, b) => + new Date(a.scheduledEndDate).getTime() - + new Date(b.scheduledEndDate).getTime(), + )[0]; + } + + // Finally, look for future phases that need to be scheduled + const futurePhases = phases.filter( + (phase) => + !phase.actualStartDate && // hasn't started yet + !phase.actualEndDate && // hasn't ended yet + phase.scheduledStartDate && // has a scheduled start date + new Date(phase.scheduledStartDate) > now, // starts in the future + ); + + if (futurePhases.length === 0) { + return null; + } + + // Find phases that are ready to start (no predecessor or predecessor is closed) + const readyPhases = futurePhases.filter((phase) => { + if (!phase.predecessor) { + return true; // No predecessor, ready to start + } + + // Check if predecessor has ended + const predecessor = phases.find((p) => p.phaseId === phase.predecessor); + return predecessor && predecessor.actualEndDate; + }); + + if (readyPhases.length === 0) { + return null; + } + + // Return the earliest scheduled phase + return readyPhases.sort( + (a, b) => + new Date(a.scheduledStartDate).getTime() - + new Date(b.scheduledStartDate).getTime(), + )[0]; + } + + /** + * Open and schedule next phases in the transition chain + */ + async openAndScheduleNextPhases( + challengeId: string, + projectId: number, + projectStatus: string, + nextPhases: IPhase[], + ): Promise { + if (!nextPhases || nextPhases.length === 0) { + this.logger.log( + `[PHASE CHAIN] No next phases to open for challenge ${challengeId}`, + ); + return; + } + + this.logger.log( + `[PHASE CHAIN] Opening and scheduling ${nextPhases.length} next phases for challenge ${challengeId}`, + ); + + let processedCount = 0; + for (const nextPhase of nextPhases) { + try { + // Step 1: Open the phase first + this.logger.log( + `[PHASE CHAIN] Opening phase ${nextPhase.name} (${nextPhase.id}) for challenge ${challengeId}`, + ); + + const openResult = await this.challengeApiService.advancePhase( + challengeId, + nextPhase.id, + 'open', + ); + + if (!openResult.success) { + this.logger.error( + `[PHASE CHAIN] Failed to open phase ${nextPhase.name} (${nextPhase.id}) for challenge ${challengeId}: ${openResult.message}`, + ); + continue; + } + + this.logger.log( + `[PHASE CHAIN] Successfully opened phase ${nextPhase.name} (${nextPhase.id}) for challenge ${challengeId}`, + ); + + // Step 2: Schedule the phase for closure (use updated phase data from API response) + const updatedPhase = + openResult.updatedPhases?.find((p) => p.id === nextPhase.id) || + nextPhase; + + if (!updatedPhase.scheduledEndDate) { + this.logger.warn( + `[PHASE CHAIN] Opened phase ${nextPhase.name} (${nextPhase.id}) has no scheduled end date, skipping scheduling`, + ); + continue; + } + + // Check if this phase is already scheduled to avoid duplicates + const phaseKey = `${challengeId}:${nextPhase.id}`; + if (this.activeSchedules.has(phaseKey)) { + this.logger.log( + `[PHASE CHAIN] Phase ${nextPhase.name} (${nextPhase.id}) is already scheduled, skipping`, + ); + continue; + } + + const nextPhaseData: PhaseTransitionPayload = { + projectId, + challengeId, + phaseId: updatedPhase.id, + phaseTypeName: updatedPhase.name, + state: 'END', + operator: AutopilotOperator.SYSTEM_PHASE_CHAIN, + projectStatus, + date: updatedPhase.scheduledEndDate, + }; + + const jobId = this.schedulePhaseTransition(nextPhaseData); + processedCount++; + this.logger.log( + `[PHASE CHAIN] Scheduled opened phase ${updatedPhase.name} (${updatedPhase.id}) for closure at ${updatedPhase.scheduledEndDate} with job ID: ${jobId}`, + ); + } catch (error) { + const err = error as Error; + this.logger.error( + `[PHASE CHAIN] Failed to open and schedule phase ${nextPhase.name} (${nextPhase.id}) for challenge ${challengeId}: ${err.message}`, + err.stack, + ); + } + } + + this.logger.log( + `[PHASE CHAIN] Successfully opened and scheduled ${processedCount} out of ${nextPhases.length} next phases for challenge ${challengeId}`, + ); + } + + /** + * @deprecated Use openAndScheduleNextPhases instead + * Schedule next phases in the transition chain + */ + scheduleNextPhases( + challengeId: string, + projectId: number, + projectStatus: string, + nextPhases: IPhase[], + ): void { + this.logger.warn( + `[PHASE CHAIN] scheduleNextPhases is deprecated, use openAndScheduleNextPhases instead`, + ); + // Convert to async call + void this.openAndScheduleNextPhases( + challengeId, + projectId, + projectStatus, + nextPhases, + ); + } + getActiveSchedules(): Map { return new Map(this.activeSchedules); } @@ -341,4 +645,32 @@ export class AutopilotService { activeSchedules: this.getActiveSchedules(), }; } + + /** + * Get detailed information about scheduled transitions for debugging + */ + getScheduledTransitionsDetails(): { + totalScheduled: number; + byChallenge: Record; + scheduledJobs: Map; + } { + const activeSchedules = this.getActiveSchedules(); + const scheduledJobs = + this.schedulerService.getAllScheduledTransitionsWithData(); + const byChallenge: Record = {}; + + for (const [phaseKey] of activeSchedules) { + const [challengeId] = phaseKey.split(':'); + if (!byChallenge[challengeId]) { + byChallenge[challengeId] = []; + } + byChallenge[challengeId].push(phaseKey); + } + + return { + totalScheduled: activeSchedules.size, + byChallenge, + scheduledJobs, + }; + } } diff --git a/src/autopilot/services/scheduler.service.ts b/src/autopilot/services/scheduler.service.ts index a5b161d..5c5744f 100644 --- a/src/autopilot/services/scheduler.service.ts +++ b/src/autopilot/services/scheduler.service.ts @@ -5,6 +5,7 @@ import { ChallengeApiService } from '../../challenge/challenge-api.service'; import { PhaseTransitionMessage, PhaseTransitionPayload, + AutopilotOperator, } from '../interfaces/autopilot.interface'; import { KAFKA_TOPICS } from '../../kafka/constants/topics'; @@ -12,6 +13,14 @@ import { KAFKA_TOPICS } from '../../kafka/constants/topics'; export class SchedulerService { private readonly logger = new Logger(SchedulerService.name); private scheduledJobs = new Map(); + private phaseChainCallback: + | (( + challengeId: string, + projectId: number, + projectStatus: string, + nextPhases: any[], + ) => Promise | void) + | null = null; constructor( private schedulerRegistry: SchedulerRegistry, @@ -19,6 +28,17 @@ export class SchedulerService { private readonly challengeApiService: ChallengeApiService, ) {} + setPhaseChainCallback( + callback: ( + challengeId: string, + projectId: number, + projectStatus: string, + nextPhases: any[], + ) => Promise | void, + ): void { + this.phaseChainCallback = callback; + } + schedulePhaseTransition(phaseData: PhaseTransitionPayload): string { const { challengeId, phaseId, date: endTime } = phaseData; const jobId = `${challengeId}:${phaseId}`; @@ -41,6 +61,35 @@ export class SchedulerService { () => { void (async () => { try { + // Before triggering the event, check if the phase still needs the transition + const phaseDetails = + await this.challengeApiService.getPhaseDetails( + phaseData.challengeId, + phaseData.phaseId, + ); + + if (!phaseDetails) { + this.logger.warn( + `Phase ${phaseData.phaseId} not found in challenge ${phaseData.challengeId}, skipping scheduled transition`, + ); + return; + } + + // Check if the phase is in the expected state for the transition + if (phaseData.state === 'END' && !phaseDetails.isOpen) { + this.logger.warn( + `Scheduled END transition for phase ${phaseData.phaseId} but it's already closed, skipping`, + ); + return; + } + + if (phaseData.state === 'START' && phaseDetails.isOpen) { + this.logger.warn( + `Scheduled START transition for phase ${phaseData.phaseId} but it's already open, skipping`, + ); + return; + } + await this.triggerKafkaEvent(phaseData); // Call advancePhase method when phase transition is triggered @@ -111,10 +160,38 @@ export class SchedulerService { } public async triggerKafkaEvent(data: PhaseTransitionPayload) { + // Validate phase state before sending the event + const phaseDetails = await this.challengeApiService.getPhaseDetails( + data.challengeId, + data.phaseId, + ); + + if (!phaseDetails) { + this.logger.error( + `Cannot trigger event: Phase ${data.phaseId} not found in challenge ${data.challengeId}`, + ); + return; + } + + // Check if the phase is in the expected state + if (data.state === 'END' && !phaseDetails.isOpen) { + this.logger.warn( + `Skipping END event for phase ${data.phaseId} - it's already closed`, + ); + return; + } + + if (data.state === 'START' && phaseDetails.isOpen) { + this.logger.warn( + `Skipping START event for phase ${data.phaseId} - it's already open`, + ); + return; + } + const payload: PhaseTransitionPayload = { ...data, - state: 'END', - operator: 'system-scheduler', + state: data.state || 'END', // Default to END if not specified + operator: AutopilotOperator.SYSTEM_SCHEDULER, date: new Date().toISOString(), }; @@ -128,7 +205,7 @@ export class SchedulerService { }; await this.kafkaService.produce(KAFKA_TOPICS.PHASE_TRANSITION, message); this.logger.log( - `Successfully sent transition event for challenge ${data.challengeId}, phase ${data.phaseId}`, + `Successfully sent ${payload.state} transition event for challenge ${data.challengeId}, phase ${data.phaseId}`, ); } catch (error: unknown) { const err = error as Error; @@ -145,16 +222,100 @@ export class SchedulerService { `Advancing phase ${data.phaseId} for challenge ${data.challengeId}`, ); + // Check current phase state to determine the correct operation + const phaseDetails = await this.challengeApiService.getPhaseDetails( + data.challengeId, + data.phaseId, + ); + + if (!phaseDetails) { + this.logger.error( + `Phase ${data.phaseId} not found in challenge ${data.challengeId}`, + ); + return; + } + + // Determine operation based on transition state and current phase state + let operation: 'open' | 'close'; + + if (data.state === 'START') { + operation = 'open'; + } else if (data.state === 'END') { + operation = 'close'; + } else { + // Fallback: determine based on current phase state + operation = phaseDetails.isOpen ? 'close' : 'open'; + } + + // Validate that the operation makes sense + if (operation === 'open' && phaseDetails.isOpen) { + this.logger.warn( + `Phase ${data.phaseId} is already open, skipping open operation`, + ); + return; + } + + if (operation === 'close' && !phaseDetails.isOpen) { + this.logger.warn( + `Phase ${data.phaseId} is already closed, skipping close operation`, + ); + return; + } + + this.logger.log( + `Phase ${data.phaseId} is currently ${phaseDetails.isOpen ? 'open' : 'closed'}, will ${operation} it`, + ); + const result = await this.challengeApiService.advancePhase( data.challengeId, data.phaseId, - 'close', // Assuming 'close' operation when phase ends + operation, ); if (result.success) { this.logger.log( `Successfully advanced phase ${data.phaseId} for challenge ${data.challengeId}: ${result.message}`, ); + + // Handle phase transition chain - open and schedule next phases if they exist + if ( + result.next?.operation === 'open' && + result.next.phases && + result.next.phases.length > 0 + ) { + try { + if (this.phaseChainCallback) { + this.logger.log( + `[PHASE CHAIN] Triggering phase chain callback for challenge ${data.challengeId} with ${result.next.phases.length} next phases`, + ); + const callbackResult = this.phaseChainCallback( + data.challengeId, + data.projectId, + data.projectStatus, + result.next.phases, + ); + + // Handle both sync and async callbacks + if (callbackResult instanceof Promise) { + await callbackResult; + } + } else { + this.logger.warn( + `[PHASE CHAIN] Phase chain callback not set, cannot open and schedule next phases for challenge ${data.challengeId}`, + ); + } + } catch (error) { + const err = error as Error; + this.logger.error( + `[PHASE CHAIN] Error in phase chain callback for challenge ${data.challengeId}: ${err.message}`, + err.stack, + ); + } + } else { + this.logger.log( + `[PHASE CHAIN] No next phases to open and schedule for challenge ${data.challengeId}`, + ); + } } else { this.logger.error( `Failed to advance phase ${data.phaseId} for challenge ${data.challengeId}: ${result.message}`, diff --git a/src/challenge/challenge-api.service.ts b/src/challenge/challenge-api.service.ts index bc0759f..bc09c0f 100644 --- a/src/challenge/challenge-api.service.ts +++ b/src/challenge/challenge-api.service.ts @@ -18,6 +18,12 @@ interface ChallengeFiltersDto { interface PhaseAdvanceResponseDto { success: boolean; message: string; + hasWinningSubmission?: boolean; + updatedPhases?: IPhase[]; + next?: { + operation?: 'open' | 'close'; + phases?: IPhase[]; + }; } // Corrected: Interface to correctly extend AxiosError for type safety @@ -221,7 +227,19 @@ export class ChallengeApiService { phaseId: string, operation: 'open' | 'close', ): Promise { - const body = { phaseId, operation }; + // Get the phase name from the phase ID - the API expects phase name, not phase ID + const phaseName = await this.getPhaseTypeName(challengeId, phaseId); + if (!phaseName || phaseName === 'Unknown') { + this.logger.error( + `Cannot advance phase: Phase with ID ${phaseId} not found in challenge ${challengeId}`, + ); + return { + success: false, + message: `Phase with ID ${phaseId} not found in challenge ${challengeId}`, + }; + } + + const body = { phase: phaseName, operation }; try { const headers = await this.getAuthHeader(); const response = await firstValueFrom( @@ -241,6 +259,14 @@ export class ChallengeApiService { status: err.response?.status, statusText: err.response?.statusText, data: err.response?.data, + request: { + method: 'POST', + url: `${this.challengeApiUrl}/challenges/${challengeId}/advance-phase`, + body, + headers: await this.getAuthHeader().catch(() => ({ + error: 'Failed to get auth header', + })), + }, }, ); return { success: false, message: 'Failed to advance phase' }; diff --git a/src/common/constants/operators.constants.ts b/src/common/constants/operators.constants.ts new file mode 100644 index 0000000..01d4d78 --- /dev/null +++ b/src/common/constants/operators.constants.ts @@ -0,0 +1,77 @@ +/** + * Autopilot Operator Constants + * + * This file documents the valid operator values used throughout the autopilot system. + * These operators identify the source or initiator of various autopilot operations. + */ + +import { AutopilotOperator } from '../../autopilot/interfaces/autopilot.interface'; + +/** + * System operators used by internal autopilot services + */ +export const SYSTEM_OPERATORS = { + /** General system operations */ + SYSTEM: AutopilotOperator.SYSTEM, + + /** Operations initiated by the scheduler service when executing scheduled phase transitions */ + SYSTEM_SCHEDULER: AutopilotOperator.SYSTEM_SCHEDULER, + + /** Operations initiated when handling new challenge creation events */ + SYSTEM_NEW_CHALLENGE: AutopilotOperator.SYSTEM_NEW_CHALLENGE, + + /** Operations initiated by the recovery service during application bootstrap */ + SYSTEM_RECOVERY: AutopilotOperator.SYSTEM_RECOVERY, + + /** Operations initiated by the sync service during periodic synchronization */ + SYSTEM_SYNC: AutopilotOperator.SYSTEM_SYNC, + + /** Operations initiated when opening next phases in the transition chain */ + SYSTEM_PHASE_CHAIN: AutopilotOperator.SYSTEM_PHASE_CHAIN, +} as const; + +/** + * Administrative operators for manual operations + */ +export const ADMIN_OPERATORS = { + /** Administrative operations (manual commands, overrides, etc.) */ + ADMIN: AutopilotOperator.ADMIN, +} as const; + +/** + * User operators for external operations + */ +export const USER_OPERATORS = { + /** Operations initiated by end users */ + USER: AutopilotOperator.USER, +} as const; + +/** + * All valid autopilot operators + */ +export const ALL_OPERATORS = { + ...SYSTEM_OPERATORS, + ...ADMIN_OPERATORS, + ...USER_OPERATORS, +} as const; + +/** + * Operator usage documentation + */ +export const OPERATOR_USAGE = { + [AutopilotOperator.SYSTEM]: 'General system operations and default fallback', + [AutopilotOperator.SYSTEM_SCHEDULER]: + 'Scheduled phase transitions executed by the scheduler service', + [AutopilotOperator.SYSTEM_NEW_CHALLENGE]: + 'Phase scheduling triggered by new challenge creation', + [AutopilotOperator.SYSTEM_RECOVERY]: + 'Phase processing during application recovery/bootstrap', + [AutopilotOperator.SYSTEM_SYNC]: + 'Phase synchronization during periodic sync operations', + [AutopilotOperator.SYSTEM_PHASE_CHAIN]: + 'Opening next phases in the transition chain after phase completion', + [AutopilotOperator.ADMIN]: + 'Manual administrative operations and command overrides', + [AutopilotOperator.USER]: + 'Operations initiated by end users through external interfaces', +} as const; diff --git a/src/kafka/dto/produce-message.dto.ts b/src/kafka/dto/produce-message.dto.ts index 2f052b4..d05e2c9 100644 --- a/src/kafka/dto/produce-message.dto.ts +++ b/src/kafka/dto/produce-message.dto.ts @@ -9,6 +9,7 @@ import { IsDateString, IsUUID, } from 'class-validator'; +import { AutopilotOperator } from '../../autopilot/interfaces/autopilot.interface'; export enum PhaseState { START = 'START', @@ -31,9 +32,9 @@ export class PhaseTransitionPayloadDto { @IsEnum(PhaseState) state: PhaseState; - @IsString() + @IsEnum(AutopilotOperator) @IsNotEmpty() - operator: string; + operator: AutopilotOperator | string; @IsString() @IsNotEmpty() @@ -61,9 +62,9 @@ export class ChallengeUpdatePayloadDto { @IsNotEmpty() status: string; - @IsString() + @IsEnum(AutopilotOperator) @IsNotEmpty() - operator: string; + operator: AutopilotOperator | string; @IsDateString() @IsOptional() @@ -75,9 +76,9 @@ export class CommandPayloadDto { @IsNotEmpty() command: string; - @IsString() + @IsEnum(AutopilotOperator) @IsNotEmpty() - operator: string; + operator: AutopilotOperator | string; @IsNumber() @IsOptional() diff --git a/src/kafka/templates/autopilot.template.ts b/src/kafka/templates/autopilot.template.ts index 06ccf04..4447a09 100644 --- a/src/kafka/templates/autopilot.template.ts +++ b/src/kafka/templates/autopilot.template.ts @@ -9,6 +9,7 @@ import { } from 'class-validator'; import { KafkaMessageTemplate } from './kafka.template'; import { KAFKA_TOPICS } from '../constants/topics'; +import { AutopilotOperator } from '../../autopilot/interfaces/autopilot.interface'; // Phase Transition Template export class PhaseTransitionPayload { @@ -27,9 +28,9 @@ export class PhaseTransitionPayload { @IsEnum(['START', 'END']) state: 'START' | 'END'; - @IsString() + @IsEnum(AutopilotOperator) @IsNotEmpty() - operator: string; + operator: AutopilotOperator | string; @IsString() @IsNotEmpty() @@ -64,9 +65,9 @@ export class ChallengeUpdatePayload { @IsNotEmpty() status: string; - @IsString() + @IsEnum(AutopilotOperator) @IsNotEmpty() - operator: string; + operator: AutopilotOperator | string; @IsDateString() @IsOptional() @@ -85,9 +86,9 @@ export class CommandPayload { @IsNotEmpty() command: string; - @IsString() + @IsEnum(AutopilotOperator) @IsNotEmpty() - operator: string; + operator: AutopilotOperator | string; @IsNumber() @IsOptional() diff --git a/src/recovery/recovery.service.ts b/src/recovery/recovery.service.ts index 5505ac2..1edf24a 100644 --- a/src/recovery/recovery.service.ts +++ b/src/recovery/recovery.service.ts @@ -1,8 +1,14 @@ import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common'; -import { PhaseTransitionPayload } from '../autopilot/interfaces/autopilot.interface'; +import { + PhaseTransitionPayload, + AutopilotOperator, +} from '../autopilot/interfaces/autopilot.interface'; import { AutopilotService } from '../autopilot/services/autopilot.service'; import { ChallengeApiService } from '../challenge/challenge-api.service'; -import { IChallenge } from '../challenge/interfaces/challenge.interface'; +import { + IChallenge, + IPhase, +} from '../challenge/interfaces/challenge.interface'; import { SchedulerService } from '../autopilot/services/scheduler.service'; @Injectable() @@ -19,6 +25,7 @@ export class RecoveryService implements OnApplicationBootstrap { this.logger.log('Starting recovery process...'); try { + // const activeChallenges: IChallenge[] = const activeChallenges: IChallenge[] = await this.challengeApiService.getAllActiveChallenges({ status: 'ACTIVE', @@ -45,35 +52,63 @@ export class RecoveryService implements OnApplicationBootstrap { continue; } - for (const phase of challenge.phases) { - const phaseEndDate = phase.scheduledEndDate; - if (!phaseEndDate) { + // Process all phases that need to be scheduled or triggered + const phasesToProcess = this.findAllPhasesToProcess(challenge.phases); + + if (phasesToProcess.length === 0) { + this.logger.log( + `No phases need to be processed for challenge ${challenge.id}`, + { projectId: challenge.projectId }, + ); + continue; + } + + const currentTime = new Date(); + + for (const phaseInfo of phasesToProcess) { + const { phase, action } = phaseInfo; + + // Determine schedule date and state based on action + let scheduleDate: string; + let state: 'START' | 'END'; + + if (action === 'start') { + scheduleDate = phase.scheduledStartDate; + state = 'START'; + } else { + scheduleDate = phase.scheduledEndDate; + state = 'END'; + } + + if (!scheduleDate) { this.logger.warn( - `Phase ${phase.id} for challenge ${challenge.id} has no scheduled end date. Skipping.`, + `Phase ${phase.id} for challenge ${challenge.id} has no scheduled ${action === 'start' ? 'start' : 'end'} date. Skipping.`, { phaseName: phase.name }, ); continue; } - const endTime = new Date(phaseEndDate).getTime(); - const now = Date.now(); const phaseData: PhaseTransitionPayload = { projectId: challenge.projectId, challengeId: challenge.id, phaseId: phase.id, phaseTypeName: phase.name, - state: 'END', - operator: 'system-recovery', + state, + operator: AutopilotOperator.SYSTEM_RECOVERY, projectStatus: challenge.status, - date: phaseEndDate, + date: scheduleDate, }; - if (endTime <= now) { + const scheduleTime = new Date(scheduleDate).getTime(); + if (scheduleTime <= currentTime.getTime()) { this.logger.warn( - `Phase ${phase.id} for challenge ${challenge.id} already ended — triggering immediate Kafka event`, + `Phase ${phase.id} (${phase.name}) for challenge ${challenge.id} should have ${action === 'start' ? 'started' : 'ended'} — triggering immediate Kafka event`, ); await this.schedulerService.triggerKafkaEvent(phaseData); } else { + this.logger.log( + `Scheduling phase ${phase.name} (${phase.id}) for challenge ${challenge.id} to ${state} at ${scheduleDate}`, + ); this.autopilotService.schedulePhaseTransition(phaseData); } } @@ -87,4 +122,166 @@ export class RecoveryService implements OnApplicationBootstrap { this.logger.error('Recovery process failed:', err.stack || err.message); } } + + /** + * Find all phases that should be processed during recovery. + * Returns phases with their required action (start or end). + */ + private findAllPhasesToProcess( + phases: IPhase[], + ): Array<{ phase: IPhase; action: 'start' | 'end' }> { + const phasesToProcess: Array<{ phase: IPhase; action: 'start' | 'end' }> = + []; + const now = new Date(); + + for (const phase of phases) { + // Skip phases that have already ended + if (phase.actualEndDate) { + continue; + } + + // Check if phase should be started + if (!phase.isOpen && !phase.actualStartDate) { + const startTime = new Date(phase.scheduledStartDate); + + // Check if it's time to start + if (startTime <= now) { + // Check predecessor requirements + if (phase.predecessor) { + const predecessor = phases.find( + (p) => p.phaseId === phase.predecessor, + ); + if (!predecessor || !predecessor.actualEndDate) { + // Predecessor hasn't ended yet, skip this phase + continue; + } + } + // Phase should be started + phasesToProcess.push({ phase, action: 'start' }); + } + } + // Check if phase should be ended + else if (phase.isOpen) { + // Phase is open, should only be scheduled to end if it hasn't been scheduled yet + // The scheduler will handle it when the time comes + // For recovery, we only care about overdue phases + const endTime = new Date(phase.scheduledEndDate); + if (endTime <= now) { + // Phase is overdue and should end immediately + phasesToProcess.push({ phase, action: 'end' }); + } + // Don't schedule future END transitions here - let sync service handle that + } + } + + return phasesToProcess; + } + + /** + * Find the next phase that should be processed during recovery. + * Similar logic to AutopilotService but also handles overdue phases. + * @deprecated Use findAllPhasesToProcess instead + */ + private findNextPhaseToProcess(phases: IPhase[]): IPhase | null { + const now = new Date(); + + // First, check for overdue phases (phases that should have ended but are still open) + const overduePhases = phases.filter( + (phase) => + phase.isOpen && + phase.scheduledEndDate && + !phase.actualEndDate && + new Date(phase.scheduledEndDate).getTime() <= now.getTime(), + ); + + if (overduePhases.length > 0) { + // Return the earliest overdue phase + return overduePhases.sort( + (a, b) => + new Date(a.scheduledEndDate).getTime() - + new Date(b.scheduledEndDate).getTime(), + )[0]; + } + + // Second, check for phases that should be open but aren't + const phasesToOpen = phases.filter((phase) => { + if (phase.isOpen || phase.actualEndDate) { + return false; // Already open or already ended + } + + const startTime = new Date(phase.scheduledStartDate); + if (startTime > now) { + return false; // Not time to start yet + } + + // Check if predecessor requirements are met + if (!phase.predecessor) { + return true; // No predecessor, ready to start + } + + const predecessor = phases.find((p) => p.phaseId === phase.predecessor); + return predecessor && predecessor.actualEndDate; // Predecessor has ended + }); + + if (phasesToOpen.length > 0) { + // Return the earliest phase that should be opened + return phasesToOpen.sort( + (a, b) => + new Date(a.scheduledStartDate).getTime() - + new Date(b.scheduledStartDate).getTime(), + )[0]; + } + + // Third, find currently open phases that haven't ended yet + const openPhases = phases.filter( + (phase) => + phase.isOpen && + phase.scheduledEndDate && + new Date(phase.scheduledEndDate).getTime() > now.getTime(), + ); + + if (openPhases.length > 0) { + // Return the earliest ending open phase + return openPhases.sort( + (a, b) => + new Date(a.scheduledEndDate).getTime() - + new Date(b.scheduledEndDate).getTime(), + )[0]; + } + + // Finally, look for future phases that need to be scheduled + const futurePhases = phases.filter( + (phase) => + !phase.actualStartDate && // hasn't started yet + !phase.actualEndDate && // hasn't ended yet + phase.scheduledStartDate && // has a scheduled start date + new Date(phase.scheduledStartDate) > now, // starts in the future + ); + + if (futurePhases.length === 0) { + return null; + } + + // Find phases that are ready to start (no predecessor or predecessor is closed) + const readyPhases = futurePhases.filter((phase) => { + if (!phase.predecessor) { + return true; // No predecessor, ready to start + } + + // Check if predecessor has ended + const predecessor = phases.find((p) => p.phaseId === phase.predecessor); + return predecessor && predecessor.actualEndDate; + }); + + if (readyPhases.length === 0) { + return null; + } + + // Return the earliest scheduled phase + return readyPhases.sort( + (a, b) => + new Date(a.scheduledStartDate).getTime() - + new Date(b.scheduledStartDate).getTime(), + )[0]; + } } diff --git a/src/sync/sync.service.ts b/src/sync/sync.service.ts index 09e79de..8acf460 100644 --- a/src/sync/sync.service.ts +++ b/src/sync/sync.service.ts @@ -3,7 +3,10 @@ import { Cron, CronExpression } from '@nestjs/schedule'; import { AutopilotService } from '../autopilot/services/autopilot.service'; import { ChallengeApiService } from '../challenge/challenge-api.service'; import { SchedulerService } from '../autopilot/services/scheduler.service'; -import { PhaseTransitionPayload } from '../autopilot/interfaces/autopilot.interface'; +import { + PhaseTransitionPayload, + AutopilotOperator, +} from '../autopilot/interfaces/autopilot.interface'; @Injectable() export class SyncService { @@ -32,19 +35,61 @@ export class SyncService { await this.challengeApiService.getAllActiveChallenges({ status: 'ACTIVE', }); + const scheduledJobs = this.schedulerService.getAllScheduledTransitionsWithData(); const scheduledPhaseKeys = new Set(scheduledJobs.keys()); const activePhaseKeys = new Set(); + const now = new Date(); // 1. Add/update schedules for active challenges for (const challenge of activeChallenges) { if (!challenge.phases) continue; for (const phase of challenge.phases) { - const phaseEndDate = phase.scheduledEndDate; - if (!phaseEndDate) continue; + // Skip phases that have already ended + if (phase.actualEndDate) { + continue; + } + + // Determine what to schedule based on phase state + let scheduleDate: string | undefined; + let state: 'START' | 'END'; + + if (!phase.isOpen && !phase.actualStartDate) { + // Phase hasn't started yet + const startTime = new Date(phase.scheduledStartDate); + + // Check if it's time to start or should be scheduled for future start + if (startTime <= now) { + // Check predecessor requirements + if (phase.predecessor) { + const predecessor = challenge.phases.find( + (p) => p.phaseId === phase.predecessor, + ); + if (!predecessor || !predecessor.actualEndDate) { + // Predecessor hasn't ended yet, skip this phase + continue; + } + } + // Should start now or soon + scheduleDate = phase.scheduledStartDate; + state = 'START'; + } else { + // Future phase - don't schedule yet, will be handled when predecessor ends + continue; + } + } else if (phase.isOpen) { + // Phase is currently open, schedule for end + scheduleDate = phase.scheduledEndDate; + state = 'END'; + } else { + // Phase is closed but not ended (shouldn't happen in normal flow) + continue; + } + + if (!scheduleDate) continue; const phaseKey = `${challenge.id}:${phase.id}`; activePhaseKeys.add(phaseKey); @@ -56,25 +101,26 @@ export class SyncService { challengeId: challenge.id, phaseId: phase.id, phaseTypeName: phase.name, - state: 'END', - operator: 'system-sync', + state, + operator: AutopilotOperator.SYSTEM_SYNC, projectStatus: challenge.status, - date: phaseEndDate, + date: scheduleDate, }; if (!scheduledJob) { this.logger.log( - `New active phase found: ${phaseKey}. Scheduling...`, + `New active phase found: ${phaseKey} (${state}). Scheduling for ${scheduleDate}...`, ); this.autopilotService.schedulePhaseTransition(phaseData); added++; } else if ( scheduledJob.date && - new Date(scheduledJob.date).getTime() !== - new Date(phaseEndDate).getTime() + (new Date(scheduledJob.date).getTime() !== + new Date(scheduleDate).getTime() || + scheduledJob.state !== state) ) { this.logger.log( - `Phase ${phaseKey} has updated timing. Rescheduling...`, + `Phase ${phaseKey} has updated timing or state. Rescheduling from ${scheduledJob.state} at ${scheduledJob.date} to ${state} at ${scheduleDate}...`, ); void this.autopilotService.reschedulePhaseTransition( challenge.id, diff --git a/start-local.sh b/start-local.sh index afaf891..4324aea 100755 --- a/start-local.sh +++ b/start-local.sh @@ -15,4 +15,6 @@ export LOG_LEVEL=${LOG_LEVEL:-debug} # Start the application using ts-node-dev echo "Starting application in development mode..." -npm run start:dev +mkdir -p logs +TIMESTAMP=$(date +"%Y%m%d_%H%M%S") +npm run start:dev 2>&1 | tee logs/app_${TIMESTAMP}.log diff --git a/test/autopilot.e2e-spec.ts b/test/autopilot.e2e-spec.ts index a2cca84..d1674f5 100644 --- a/test/autopilot.e2e-spec.ts +++ b/test/autopilot.e2e-spec.ts @@ -9,7 +9,10 @@ import { Auth0Service } from '../src/auth/auth0.service'; import { KAFKA_TOPICS } from '../src/kafka/constants/topics'; import { AutopilotConsumer } from '../src/kafka/consumers/autopilot.consumer'; import { RecoveryService } from '../src/recovery/recovery.service'; -import { ChallengeUpdatePayload } from 'src/autopilot/interfaces/autopilot.interface'; +import { + ChallengeUpdatePayload, + AutopilotOperator, +} from '../src/autopilot/interfaces/autopilot.interface'; import { AutopilotService } from '../src/autopilot/services/autopilot.service'; // --- Mock Data --- @@ -28,13 +31,25 @@ const mockChallenge = { phases: [ { id: 'p1a2b3c4-d5e6-f7a8-b9c0-d1e2f3a4b5c6', + phaseId: 'p1a2b3c4-d5e6-f7a8-b9c0-d1e2f3a4b5c6', name: 'Registration', + isOpen: true, + scheduledStartDate: mockPastPhaseDate, scheduledEndDate: mockFuturePhaseDate1, + actualStartDate: mockPastPhaseDate, + actualEndDate: null, + predecessor: null, }, { id: 'p2a3b4c5-d6e7-f8a9-b0c1-d2e3f4a5b6c7', + phaseId: 'p2a3b4c5-d6e7-f8a9-b0c1-d2e3f4a5b6c7', name: 'Submission', + isOpen: false, + scheduledStartDate: mockFuturePhaseDate1, scheduledEndDate: mockFuturePhaseDate2, + actualStartDate: null, + actualEndDate: null, + predecessor: 'p1a2b3c4-d5e6-f7a8-b9c0-d1e2f3a4b5c6', }, ], }; @@ -42,7 +57,11 @@ const mockChallenge = { const mockChallengeWithPastPhase = { ...mockChallenge, phases: [ - { ...mockChallenge.phases[0], scheduledEndDate: mockPastPhaseDate }, // This one is overdue + { + ...mockChallenge.phases[0], + scheduledEndDate: mockPastPhaseDate, + isOpen: true, // This phase is overdue but still open + }, mockChallenge.phases[1], // This one is in the future ], }; @@ -88,7 +107,54 @@ describe('Autopilot Service (e2e)', () => { getAllActiveChallenges: mockGetAllActiveChallenges, getChallenge: mockGetChallenge, getChallengeById: mockGetActiveChallenge, // Add the new method to the mock - advancePhase: jest.fn().mockResolvedValue({ success: true, message: 'Phase advanced' }), + getPhaseDetails: jest.fn().mockImplementation((challengeId, phaseId) => { + // Return mock phase details based on phaseId + const phase = mockChallenge.phases.find(p => p.id === phaseId); + if (phase) { + return Promise.resolve(phase); + } + // Default mock for test phases + return Promise.resolve({ + id: phaseId, + name: 'Test Phase', + isOpen: true, // Default to open for most tests + scheduledStartDate: mockPastPhaseDate, + scheduledEndDate: mockFuturePhaseDate1, + }); + }), + advancePhase: jest.fn().mockImplementation((challengeId, phaseId, operation) => { + if (operation === 'close') { + return Promise.resolve({ + success: true, + message: 'Phase closed', + next: { + operation: 'open', + phases: [ + { + id: 'p2a3b4c5-d6e7-f8a9-b0c1-d2e3f4a5b6c7', + name: 'Submission', + scheduledEndDate: mockFuturePhaseDate2, + } + ] + } + }); + } else if (operation === 'open') { + return Promise.resolve({ + success: true, + message: 'Phase opened', + updatedPhases: [ + { + id: 'p2a3b4c5-d6e7-f8a9-b0c1-d2e3f4a5b6c7', + name: 'Submission', + scheduledEndDate: mockFuturePhaseDate2, + isOpen: true, + actualStartDate: new Date().toISOString(), + } + ] + }); + } + return Promise.resolve({ success: false, message: 'Unknown operation' }); + }), }) .compile(); @@ -123,7 +189,7 @@ describe('Autopilot Service (e2e)', () => { }); describe('Challenge Creation and Scheduling', () => { - it('should schedule phases when a challenge.notification.create event is received', async () => { + it('should schedule only the next phase when a challenge.notification.create event is received', async () => { mockGetActiveChallenge.mockResolvedValue(mockChallenge); // Use the correct mocked method const scheduleSpy = jest.spyOn( schedulerService, @@ -134,7 +200,7 @@ describe('Autopilot Service (e2e)', () => { challengeId: mockChallenge.id, projectId: mockChallenge.projectId, status: 'ACTIVE', - operator: 'system', + operator: AutopilotOperator.SYSTEM, } as ChallengeUpdatePayload); await new Promise((resolve) => setTimeout(resolve, 100)); @@ -142,10 +208,56 @@ describe('Autopilot Service (e2e)', () => { expect(challengeApiService.getChallengeById).toHaveBeenCalledWith( mockChallenge.id, ); - expect(scheduleSpy).toHaveBeenCalledTimes(2); + // Should only schedule 1 phase (the next one), not all phases + expect(scheduleSpy).toHaveBeenCalledTimes(1); expect(scheduleSpy).toHaveBeenCalledWith( expect.objectContaining({ - phaseId: mockChallenge.phases[0].id, + phaseId: mockChallenge.phases[0].id, // Should schedule the currently open phase + }), + ); + }); + + it('should schedule the next ready phase when no phases are currently open', async () => { + const challengeWithNoOpenPhases = { + ...mockChallenge, + phases: [ + { + ...mockChallenge.phases[0], + isOpen: false, + actualStartDate: mockPastPhaseDate, + actualEndDate: mockPastPhaseDate, // This phase has ended + }, + { + ...mockChallenge.phases[1], + isOpen: false, + actualStartDate: null, + actualEndDate: null, // This phase is ready to start + }, + ], + }; + + mockGetActiveChallenge.mockResolvedValue(challengeWithNoOpenPhases); + const scheduleSpy = jest.spyOn( + schedulerService, + 'schedulePhaseTransition', + ); + + await autopilotConsumer.topicHandlers[KAFKA_TOPICS.CHALLENGE_CREATED]({ + challengeId: challengeWithNoOpenPhases.id, + projectId: challengeWithNoOpenPhases.projectId, + status: 'ACTIVE', + operator: AutopilotOperator.SYSTEM, + } as ChallengeUpdatePayload); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(challengeApiService.getChallengeById).toHaveBeenCalledWith( + challengeWithNoOpenPhases.id, + ); + expect(scheduleSpy).toHaveBeenCalledTimes(1); + expect(scheduleSpy).toHaveBeenCalledWith( + expect.objectContaining({ + phaseId: challengeWithNoOpenPhases.phases[1].id, // Should schedule the next ready phase }), ); }); @@ -172,7 +284,7 @@ describe('Autopilot Service (e2e)', () => { challengeId: updatedChallenge.id, projectId: updatedChallenge.projectId, status: 'ACTIVE', - operator: 'system', + operator: AutopilotOperator.SYSTEM, } as ChallengeUpdatePayload); await new Promise((resolve) => setTimeout(resolve, 100)); @@ -199,7 +311,7 @@ describe('Autopilot Service (e2e)', () => { date: mockChallenge.phases[0].scheduledEndDate, phaseTypeName: 'Registration', state: 'END', - operator: 'system', + operator: AutopilotOperator.SYSTEM, projectStatus: 'ACTIVE', }); @@ -213,7 +325,7 @@ describe('Autopilot Service (e2e)', () => { projectId: mockChallenge.projectId, challengeId: mockChallenge.id, phaseId: mockChallenge.phases[0].id, - operator: 'admin', + operator: AutopilotOperator.ADMIN, }); await new Promise((resolve) => setTimeout(resolve, 100)); @@ -226,7 +338,7 @@ describe('Autopilot Service (e2e)', () => { }); describe('Recovery Service', () => { - it('should schedule future phases and immediately trigger overdue phases on bootstrap', async () => { + it('should immediately trigger overdue phases on bootstrap', async () => { mockGetAllActiveChallenges.mockResolvedValue([ mockChallengeWithPastPhase, ]); @@ -240,19 +352,301 @@ describe('Autopilot Service (e2e)', () => { await new Promise((resolve) => setTimeout(resolve, 100)); expect(challengeApiService.getAllActiveChallenges).toHaveBeenCalled(); - expect(scheduleSpy).toHaveBeenCalledTimes(1); + // Should only trigger the overdue phase, not schedule future ones + expect(scheduleSpy).toHaveBeenCalledTimes(0); expect(triggerEventSpy).toHaveBeenCalledTimes(1); + expect(triggerEventSpy).toHaveBeenCalledWith( + expect.objectContaining({ + phaseId: mockChallengeWithPastPhase.phases[0].id, // The overdue phase + }), + ); + }); + + it('should schedule future phases when no overdue phases exist', async () => { + const challengeWithFuturePhase = { + ...mockChallenge, + phases: [ + { + ...mockChallenge.phases[0], + isOpen: false, + actualStartDate: mockPastPhaseDate, + actualEndDate: mockPastPhaseDate, // This phase has ended + }, + { + ...mockChallenge.phases[1], + isOpen: false, + actualStartDate: null, + actualEndDate: null, // This phase is ready to start + }, + ], + }; + + mockGetAllActiveChallenges.mockResolvedValue([challengeWithFuturePhase]); + const scheduleSpy = jest.spyOn( + schedulerService, + 'schedulePhaseTransition', + ); + const triggerEventSpy = jest.spyOn(schedulerService, 'triggerKafkaEvent'); + + await recoveryService.onApplicationBootstrap(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(challengeApiService.getAllActiveChallenges).toHaveBeenCalled(); + expect(scheduleSpy).toHaveBeenCalledTimes(1); + expect(triggerEventSpy).toHaveBeenCalledTimes(0); + expect(scheduleSpy).toHaveBeenCalledWith( expect.objectContaining({ - phaseId: mockChallengeWithPastPhase.phases[1].id, + phaseId: challengeWithFuturePhase.phases[1].id, // The next ready phase }), ); - expect(triggerEventSpy).toHaveBeenCalledWith( + }); + + it('should open and schedule next phases in the chain when a phase completes', async () => { + const scheduleSpy = jest.spyOn( + schedulerService, + 'schedulePhaseTransition', + ); + const openAndScheduleNextPhasesSpy = jest.spyOn( + autopilotService, + 'openAndScheduleNextPhases', + ); + + // Simulate a phase transition that triggers the chain + const phaseData = { + projectId: mockChallenge.projectId, + challengeId: mockChallenge.id, + phaseId: mockChallenge.phases[0].id, + phaseTypeName: mockChallenge.phases[0].name, + state: 'END' as const, + operator: AutopilotOperator.SYSTEM_SCHEDULER, + projectStatus: 'ACTIVE', + date: new Date().toISOString(), + }; + + await schedulerService.advancePhase(phaseData); + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(challengeApiService.advancePhase).toHaveBeenCalledWith( + mockChallenge.id, + mockChallenge.phases[0].id, + 'close', + ); + + // Should open and schedule the next phase in the chain + expect(openAndScheduleNextPhasesSpy).toHaveBeenCalledWith( + mockChallenge.id, + mockChallenge.projectId, + 'ACTIVE', + expect.arrayContaining([ + expect.objectContaining({ + id: 'p2a3b4c5-d6e7-f8a9-b0c1-d2e3f4a5b6c7', + name: 'Submission', + }) + ]) + ); + }); + + it('should handle complete phase chain flow from challenge creation to phase completion', async () => { + mockGetActiveChallenge.mockResolvedValue(mockChallenge); + const scheduleSpy = jest.spyOn( + schedulerService, + 'schedulePhaseTransition', + ); + + // 1. Create a new challenge - should schedule only the first phase + await autopilotConsumer.topicHandlers[KAFKA_TOPICS.CHALLENGE_CREATED]({ + challengeId: mockChallenge.id, + projectId: mockChallenge.projectId, + status: 'ACTIVE', + operator: AutopilotOperator.SYSTEM, + } as ChallengeUpdatePayload); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Should schedule the first phase (Registration) + expect(scheduleSpy).toHaveBeenCalledTimes(1); + expect(scheduleSpy).toHaveBeenCalledWith( expect.objectContaining({ - phaseId: mockChallengeWithPastPhase.phases[0].id, + phaseId: mockChallenge.phases[0].id, + phaseTypeName: 'Registration', }), ); + + // 2. Simulate the first phase completing - should trigger chain scheduling + const phaseData = { + projectId: mockChallenge.projectId, + challengeId: mockChallenge.id, + phaseId: mockChallenge.phases[0].id, + phaseTypeName: mockChallenge.phases[0].name, + state: 'END' as const, + operator: AutopilotOperator.SYSTEM_SCHEDULER, + projectStatus: 'ACTIVE', + date: new Date().toISOString(), + }; + + await schedulerService.advancePhase(phaseData); + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Should have called advancePhase on the API + expect(challengeApiService.advancePhase).toHaveBeenCalledWith( + mockChallenge.id, + mockChallenge.phases[0].id, + 'close', + ); + + // Should have opened the next phase first, then scheduled it for closure + expect(challengeApiService.advancePhase).toHaveBeenCalledWith( + mockChallenge.id, + 'p2a3b4c5-d6e7-f8a9-b0c1-d2e3f4a5b6c7', + 'open', + ); + + // Should have scheduled the next phase (Submission) for closure after opening it + expect(scheduleSpy).toHaveBeenCalledTimes(2); + expect(scheduleSpy).toHaveBeenLastCalledWith( + expect.objectContaining({ + phaseId: 'p2a3b4c5-d6e7-f8a9-b0c1-d2e3f4a5b6c7', + phaseTypeName: 'Submission', + operator: AutopilotOperator.SYSTEM_PHASE_CHAIN, + }), + ); + }); + + it('should open phases before scheduling them for closure', async () => { + const advancePhaseCallOrder: Array<{ operation: string; phaseId: string }> = []; + + // Track the order of advancePhase calls + const mockAdvancePhase = jest.fn().mockImplementation((challengeId, phaseId, operation) => { + advancePhaseCallOrder.push({ operation, phaseId }); + + if (operation === 'close') { + return Promise.resolve({ + success: true, + message: 'Phase closed', + next: { + operation: 'open', + phases: [ + { + id: 'next-phase-id', + name: 'Next Phase', + scheduledEndDate: mockFuturePhaseDate2, + } + ] + } + }); + } else if (operation === 'open') { + return Promise.resolve({ + success: true, + message: 'Phase opened', + updatedPhases: [ + { + id: 'next-phase-id', + name: 'Next Phase', + scheduledEndDate: mockFuturePhaseDate2, + isOpen: true, + actualStartDate: new Date().toISOString(), + } + ] + }); + } + return Promise.resolve({ success: false, message: 'Unknown operation' }); + }); + + challengeApiService.advancePhase = mockAdvancePhase; + + // Trigger phase advancement + const phaseData = { + projectId: mockChallenge.projectId, + challengeId: mockChallenge.id, + phaseId: 'current-phase-id', + phaseTypeName: 'Current Phase', + state: 'END' as const, + operator: AutopilotOperator.SYSTEM_SCHEDULER, + projectStatus: 'ACTIVE', + date: new Date().toISOString(), + }; + + await schedulerService.advancePhase(phaseData); + await new Promise((resolve) => setTimeout(resolve, 200)); + + // Verify the order: close first, then open + expect(advancePhaseCallOrder).toHaveLength(2); + expect(advancePhaseCallOrder[0]).toEqual({ + operation: 'close', + phaseId: 'current-phase-id' + }); + expect(advancePhaseCallOrder[1]).toEqual({ + operation: 'open', + phaseId: 'next-phase-id' + }); + }); + + it('should not try to close phases that are already closed', async () => { + // Mock a closed phase + const mockGetPhaseDetails = jest.fn().mockResolvedValue({ + id: 'closed-phase-id', + name: 'Closed Phase', + isOpen: false, // This phase is already closed + scheduledEndDate: mockFuturePhaseDate1, + }); + + challengeApiService.getPhaseDetails = mockGetPhaseDetails; + + const mockAdvancePhase = jest.fn(); + challengeApiService.advancePhase = mockAdvancePhase; + + // Trigger phase advancement for a closed phase + const phaseData = { + projectId: mockChallenge.projectId, + challengeId: mockChallenge.id, + phaseId: 'closed-phase-id', + phaseTypeName: 'Closed Phase', + state: 'END' as const, + operator: AutopilotOperator.SYSTEM_SCHEDULER, + projectStatus: 'ACTIVE', + date: new Date().toISOString(), + }; + + await schedulerService.advancePhase(phaseData); + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Should not call advancePhase since the phase is already closed + expect(mockAdvancePhase).not.toHaveBeenCalled(); + }); + + it('should not try to open phases that are already open', async () => { + // Mock an open phase + const mockGetPhaseDetails = jest.fn().mockResolvedValue({ + id: 'open-phase-id', + name: 'Open Phase', + isOpen: true, // This phase is already open + scheduledStartDate: mockPastPhaseDate, + }); + + challengeApiService.getPhaseDetails = mockGetPhaseDetails; + + const mockAdvancePhase = jest.fn(); + challengeApiService.advancePhase = mockAdvancePhase; + + // Trigger phase advancement for an open phase with START state + const phaseData = { + projectId: mockChallenge.projectId, + challengeId: mockChallenge.id, + phaseId: 'open-phase-id', + phaseTypeName: 'Open Phase', + state: 'START' as const, + operator: AutopilotOperator.SYSTEM_SCHEDULER, + projectStatus: 'ACTIVE', + date: new Date().toISOString(), + }; + + await schedulerService.advancePhase(phaseData); + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Should not call advancePhase since the phase is already open + expect(mockAdvancePhase).not.toHaveBeenCalled(); }); }); });