Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 70 additions & 4 deletions packages/commandkit/src/app/handlers/AppEventsHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { CommandKitErrorCodes, isErrorType } from '../../utils/error-codes';
export type EventListener = {
handler: ListenerFunction;
once: boolean;
parallel: boolean;
};

/**
Expand Down Expand Up @@ -111,6 +112,7 @@ export class AppEventsHandler {
listeners.push({
handler: handler.default,
once: !!handler.once,
parallel: !!handler.parallel,
});
}

Expand Down Expand Up @@ -164,6 +166,20 @@ export class AppEventsHandler {
const onceListeners = listeners.filter((listener) => listener.once);
const onListeners = listeners.filter((listener) => !listener.once);

// Further separate into parallel and sequential groups
const onParallelListeners = onListeners.filter(
(listener) => listener.parallel,
);
const onSequentialListeners = onListeners.filter(
(listener) => !listener.parallel,
);
const onceParallelListeners = onceListeners.filter(
(listener) => listener.parallel,
);
const onceSequentialListeners = onceListeners.filter(
(listener) => !listener.parallel,
);

// Initialize set to track executed once listeners
const executedOnceListeners = new Set<ListenerFunction>();

Expand Down Expand Up @@ -200,7 +216,24 @@ export class AppEventsHandler {
variables: new Map(),
},
async () => {
for (const listener of onListeners) {
// Execute parallel listeners first using Promise.all
if (onParallelListeners.length > 0) {
await Promise.all(
onParallelListeners.map(async (listener) => {
try {
await listener.handler(...args, client, this.commandkit);
} catch (e) {
// Log errors but don't stop other parallel listeners
Logger.error`Error handling event ${name}${
namespace ? ` of namespace ${namespace}` : ''
}: ${e}`;
}
}),
);
}

// Execute sequential listeners in order
for (const listener of onSequentialListeners) {
try {
await listener.handler(...args, client, this.commandkit);
} catch (e) {
Expand Down Expand Up @@ -248,7 +281,40 @@ export class AppEventsHandler {
})
.catch(Object);

for (const listener of onceListeners) {
// Execute parallel once listeners first using Promise.all
if (onceParallelListeners.length > 0) {
await Promise.all(
onceParallelListeners.map(async (listener) => {
return runInEventWorkerContext(
{
event: name,
namespace: namespace ?? null,
data: data.event,
commandkit: this.commandkit,
arguments: args,
variables: new Map(),
},
async () => {
try {
// Skip if already executed
if (executedOnceListeners.has(listener.handler)) return;

await listener.handler(...args, client, this.commandkit);
executedOnceListeners.add(listener.handler);
} catch (e) {
// Log errors but don't stop other parallel listeners
Logger.error`Error handling event ${name}${
namespace ? ` of namespace ${namespace}` : ''
}: ${e}`;
}
},
);
}),
);
}

// Execute sequential once listeners in order
for (const listener of onceSequentialListeners) {
if (broken) break; // Stop executing remaining listeners if propagation was stopped

await runInEventWorkerContext(
Expand Down Expand Up @@ -307,7 +373,7 @@ export class AppEventsHandler {
...data,
mainListener:
onListeners.length > 0
? { handler: mainHandler, once: false }
? { handler: mainHandler, once: false, parallel: false }
: undefined,
executedOnceListeners,
});
Expand All @@ -332,7 +398,7 @@ export class AppEventsHandler {
Logger.info(
`🔌 Registered event ${name}${
namespace ? ` of namespace ${namespace}` : ''
} (${onListeners.length} regular, ${onceListeners.length} once-only)`,
} (${onSequentialListeners.length} sequential, ${onParallelListeners.length} parallel, ${onceSequentialListeners.length} once-sequential, ${onceParallelListeners.length} once-parallel)`,
);
}
}
Expand Down