diff --git a/packages/commandkit/src/app/handlers/AppEventsHandler.ts b/packages/commandkit/src/app/handlers/AppEventsHandler.ts index 2415b44a..5518f826 100644 --- a/packages/commandkit/src/app/handlers/AppEventsHandler.ts +++ b/packages/commandkit/src/app/handlers/AppEventsHandler.ts @@ -14,6 +14,7 @@ import { CommandKitErrorCodes, isErrorType } from '../../utils/error-codes'; export type EventListener = { handler: ListenerFunction; once: boolean; + parallel: boolean; }; /** @@ -111,6 +112,7 @@ export class AppEventsHandler { listeners.push({ handler: handler.default, once: !!handler.once, + parallel: !!handler.parallel, }); } @@ -164,6 +166,20 @@ export class AppEventsHandler { const onceListeners = listeners.filter((listener) => listener.once); const onListeners = listeners.filter((listener) => !listener.once); + // Further separate into parallel and sequential groups + const onParallelListeners = onListeners.filter( + (listener) => listener.parallel, + ); + const onSequentialListeners = onListeners.filter( + (listener) => !listener.parallel, + ); + const onceParallelListeners = onceListeners.filter( + (listener) => listener.parallel, + ); + const onceSequentialListeners = onceListeners.filter( + (listener) => !listener.parallel, + ); + // Initialize set to track executed once listeners const executedOnceListeners = new Set(); @@ -200,7 +216,24 @@ export class AppEventsHandler { variables: new Map(), }, async () => { - for (const listener of onListeners) { + // Execute parallel listeners first using Promise.all + if (onParallelListeners.length > 0) { + await Promise.all( + onParallelListeners.map(async (listener) => { + try { + await listener.handler(...args, client, this.commandkit); + } catch (e) { + // Log errors but don't stop other parallel listeners + Logger.error`Error handling event ${name}${ + namespace ? ` of namespace ${namespace}` : '' + }: ${e}`; + } + }), + ); + } + + // Execute sequential listeners in order + for (const listener of onSequentialListeners) { try { await listener.handler(...args, client, this.commandkit); } catch (e) { @@ -248,7 +281,40 @@ export class AppEventsHandler { }) .catch(Object); - for (const listener of onceListeners) { + // Execute parallel once listeners first using Promise.all + if (onceParallelListeners.length > 0) { + await Promise.all( + onceParallelListeners.map(async (listener) => { + return runInEventWorkerContext( + { + event: name, + namespace: namespace ?? null, + data: data.event, + commandkit: this.commandkit, + arguments: args, + variables: new Map(), + }, + async () => { + try { + // Skip if already executed + if (executedOnceListeners.has(listener.handler)) return; + + await listener.handler(...args, client, this.commandkit); + executedOnceListeners.add(listener.handler); + } catch (e) { + // Log errors but don't stop other parallel listeners + Logger.error`Error handling event ${name}${ + namespace ? ` of namespace ${namespace}` : '' + }: ${e}`; + } + }, + ); + }), + ); + } + + // Execute sequential once listeners in order + for (const listener of onceSequentialListeners) { if (broken) break; // Stop executing remaining listeners if propagation was stopped await runInEventWorkerContext( @@ -307,7 +373,7 @@ export class AppEventsHandler { ...data, mainListener: onListeners.length > 0 - ? { handler: mainHandler, once: false } + ? { handler: mainHandler, once: false, parallel: false } : undefined, executedOnceListeners, }); @@ -332,7 +398,7 @@ export class AppEventsHandler { Logger.info( `🔌 Registered event ${name}${ namespace ? ` of namespace ${namespace}` : '' - } (${onListeners.length} regular, ${onceListeners.length} once-only)`, + } (${onSequentialListeners.length} sequential, ${onParallelListeners.length} parallel, ${onceSequentialListeners.length} once-sequential, ${onceParallelListeners.length} once-parallel)`, ); } }