From f9d42056b281ddaeae59239199a4b7442ed73ffa Mon Sep 17 00:00:00 2001 From: tmorin Date: Tue, 7 Dec 2021 14:05:07 +0100 Subject: [PATCH] feat(ceb-messaging-core): received messages should be observable --- packages/ceb-messaging-core/src/command.ts | 4 ++++ packages/ceb-messaging-core/src/event.ts | 4 ++++ packages/ceb-messaging-core/src/message.ts | 2 +- packages/ceb-messaging-core/src/query.ts | 4 ++++ packages/ceb-messaging-simple/src/command.ts | 10 ++++++++++ packages/ceb-messaging-simple/src/event.ts | 4 ++++ packages/ceb-messaging-simple/src/query.ts | 5 +++++ 7 files changed, 32 insertions(+), 1 deletion(-) diff --git a/packages/ceb-messaging-core/src/command.ts b/packages/ceb-messaging-core/src/command.ts index adb3fe0c..aeb1f579 100644 --- a/packages/ceb-messaging-core/src/command.ts +++ b/packages/ceb-messaging-core/src/command.ts @@ -127,6 +127,10 @@ export interface CommandBus extends Disposable { * The map defines the internal events of an {@link CommandBus}. */ export type CommandBusNotificationMap = { + command_received: { + bus: CommandBus + command: Command + } command_handler_failed: { bus: CommandBus command: Command diff --git a/packages/ceb-messaging-core/src/event.ts b/packages/ceb-messaging-core/src/event.ts index 14c7c327..c786f31a 100644 --- a/packages/ceb-messaging-core/src/event.ts +++ b/packages/ceb-messaging-core/src/event.ts @@ -78,6 +78,10 @@ export interface EventBus extends Disposable { * The map defines the internal events of an {@link EventBus}. */ export type EventBusNotificationMap = { + event_received: { + bus: EventBus + event: Event + } event_listener_failed: { bus: EventBus event: Event diff --git a/packages/ceb-messaging-core/src/message.ts b/packages/ceb-messaging-core/src/message.ts index b7fe3abc..6a4bfadd 100644 --- a/packages/ceb-messaging-core/src/message.ts +++ b/packages/ceb-messaging-core/src/message.ts @@ -16,7 +16,7 @@ export type MessageType = string /** * The value of a header. */ -export type MessageHeaderValue = string | number | Array +export type MessageHeaderValue = string | number | boolean | Array /** * The headers of the a message. diff --git a/packages/ceb-messaging-core/src/query.ts b/packages/ceb-messaging-core/src/query.ts index 580961f0..c95ff438 100644 --- a/packages/ceb-messaging-core/src/query.ts +++ b/packages/ceb-messaging-core/src/query.ts @@ -78,6 +78,10 @@ export interface QueryBus extends Disposable { * The map defines the internal events of an {@link QueryBus}. */ export type QueryBusNotificationMap = { + query_received: { + bus: QueryBus + query: Query + } query_handler_failed: { bus: QueryBus query: Query diff --git a/packages/ceb-messaging-simple/src/command.ts b/packages/ceb-messaging-simple/src/command.ts index 658fb4bc..fc8ae831 100644 --- a/packages/ceb-messaging-simple/src/command.ts +++ b/packages/ceb-messaging-simple/src/command.ts @@ -36,6 +36,11 @@ export class SimpleCommandBus implements CommandBus, Disposable { command: C, options?: Partial ): Promise> { + this.emitter.emit("command_received", { + bus: this, + command, + }) + const handler = this.resolveHandler(command) const opts: ExecuteActionOptions = { @@ -58,6 +63,11 @@ export class SimpleCommandBus implements CommandBus, Disposable { } executeAndForget(command: C): void { + this.emitter.emit("command_received", { + bus: this, + command, + }) + const handler = this.resolveHandler(command) Promise.resolve((async () => handler(command))()) .then((output) => this.processHandlerOutput(output)) diff --git a/packages/ceb-messaging-simple/src/event.ts b/packages/ceb-messaging-simple/src/event.ts index 4a8c6a50..6814ff01 100644 --- a/packages/ceb-messaging-simple/src/event.ts +++ b/packages/ceb-messaging-simple/src/event.ts @@ -26,6 +26,10 @@ export class SimpleEventBus implements EventBus, Disposable { publish(...events: Array): void { events.forEach((event) => { + this.emitter.emit("event_received", { + bus: this, + event, + }) this.listeners.get(event.headers.messageType)?.forEach((listener) => { Promise.resolve((async () => listener(event))()).catch((error: Error) => this.emitter.emit("event_listener_failed", { diff --git a/packages/ceb-messaging-simple/src/query.ts b/packages/ceb-messaging-simple/src/query.ts index b91bd24c..ad82f1d1 100644 --- a/packages/ceb-messaging-simple/src/query.ts +++ b/packages/ceb-messaging-simple/src/query.ts @@ -31,6 +31,11 @@ export class SimpleQueryBus implements QueryBus, Disposable { query: Q, options?: Partial ): Promise> { + this.emitter.emit("query_received", { + bus: this, + query, + }) + const handler = this.resolveHandler(query) const opts: ExecuteActionOptions = {