Skip to content

Commit c10689a

Browse files
committed
fix: enhance deduplication handling
1 parent 1be9e5c commit c10689a

File tree

1 file changed

+13
-2
lines changed

1 file changed

+13
-2
lines changed

src/scheduler/events/start-all-nodes-by-profile.events.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,25 +10,36 @@ import { QUEUES_NAMES } from '@queue/queue.enum';
1010
@Injectable()
1111
export class StartAllNodesByProfileQueueEvents implements OnModuleInit {
1212
private readonly logger = new Logger(StartAllNodesByProfileQueueEvents.name);
13+
private waitingProfiles = new Set<string>();
1314

1415
private queueEvents: QueueEvents;
1516

1617
constructor(private readonly nodesQueuesService: NodesQueuesService) {}
1718

1819
async onModuleInit() {
20+
this.logger.log('Initializing deduplication event listener.');
21+
1922
this.queueEvents = new QueueEvents(QUEUES_NAMES.NODES.START_ALL_BY_PROFILE, {
2023
connection: this.nodesQueuesService.queues.startAllNodesByProfile.opts.connection,
2124
});
2225

2326
this.queueEvents.on('deduplicated', async (event) => {
2427
const { jobId, deduplicationId } = event;
28+
const profileUuid = deduplicationId;
2529

26-
this.logger.log(`[deduplicated] ${deduplicationId} – retrying job ${jobId} in 10s`);
30+
if (this.waitingProfiles.has(profileUuid)) {
31+
this.logger.log(`[deduplicated] ${deduplicationId} – already waiting for retry`);
32+
return;
33+
}
2734

28-
const profileUuid = deduplicationId;
35+
this.waitingProfiles.add(profileUuid);
36+
37+
this.logger.log(`[deduplicated] ${deduplicationId} – retrying job ${jobId} in 10s`);
2938

3039
await sleep(10_000);
3140

41+
this.waitingProfiles.delete(profileUuid);
42+
3243
await this.nodesQueuesService.startAllNodesByProfile({
3344
profileUuid,
3445
emitter: 'RetryStartAllNodesByProfile',

0 commit comments

Comments
 (0)