Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
feat(ceb-messaging-core): received messages should be observable
Browse files Browse the repository at this point in the history
  • Loading branch information
tmorin committed Dec 10, 2021
1 parent 3beaac0 commit f9d4205
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 1 deletion.
4 changes: 4 additions & 0 deletions packages/ceb-messaging-core/src/command.ts
Expand Up @@ -127,6 +127,10 @@ export interface CommandBus extends Disposable {
* The map defines the internal events of an {@link CommandBus}.
*/
export type CommandBusNotificationMap = {
command_received: {
bus: CommandBus
command: Command
}
command_handler_failed: {
bus: CommandBus
command: Command
Expand Down
4 changes: 4 additions & 0 deletions packages/ceb-messaging-core/src/event.ts
Expand Up @@ -78,6 +78,10 @@ export interface EventBus extends Disposable {
* The map defines the internal events of an {@link EventBus}.
*/
export type EventBusNotificationMap = {
event_received: {
bus: EventBus
event: Event
}
event_listener_failed: {
bus: EventBus
event: Event
Expand Down
2 changes: 1 addition & 1 deletion packages/ceb-messaging-core/src/message.ts
Expand Up @@ -16,7 +16,7 @@ export type MessageType = string
/**
* The value of a header.
*/
export type MessageHeaderValue = string | number | Array<string | number>
export type MessageHeaderValue = string | number | boolean | Array<string | number | boolean>

/**
* The headers of the a message.
Expand Down
4 changes: 4 additions & 0 deletions packages/ceb-messaging-core/src/query.ts
Expand Up @@ -78,6 +78,10 @@ export interface QueryBus extends Disposable {
* The map defines the internal events of an {@link QueryBus}.
*/
export type QueryBusNotificationMap = {
query_received: {
bus: QueryBus
query: Query
}
query_handler_failed: {
bus: QueryBus
query: Query
Expand Down
10 changes: 10 additions & 0 deletions packages/ceb-messaging-simple/src/command.ts
Expand Up @@ -36,6 +36,11 @@ export class SimpleCommandBus implements CommandBus, Disposable {
command: C,
options?: Partial<ExecuteActionOptions>
): Promise<CommandResult<R>> {
this.emitter.emit("command_received", {
bus: this,
command,
})

const handler = this.resolveHandler(command)

const opts: ExecuteActionOptions = {
Expand All @@ -58,6 +63,11 @@ export class SimpleCommandBus implements CommandBus, Disposable {
}

executeAndForget<C extends Command = Command>(command: C): void {
this.emitter.emit("command_received", {
bus: this,
command,
})

const handler = this.resolveHandler(command)
Promise.resolve((async () => handler(command))())
.then((output) => this.processHandlerOutput(output))
Expand Down
4 changes: 4 additions & 0 deletions packages/ceb-messaging-simple/src/event.ts
Expand Up @@ -26,6 +26,10 @@ export class SimpleEventBus implements EventBus, Disposable {

publish<E extends Event = Event>(...events: Array<E>): void {
events.forEach((event) => {
this.emitter.emit("event_received", {
bus: this,
event,
})
this.listeners.get(event.headers.messageType)?.forEach((listener) => {
Promise.resolve((async () => listener(event))()).catch((error: Error) =>
this.emitter.emit("event_listener_failed", {
Expand Down
5 changes: 5 additions & 0 deletions packages/ceb-messaging-simple/src/query.ts
Expand Up @@ -31,6 +31,11 @@ export class SimpleQueryBus implements QueryBus, Disposable {
query: Q,
options?: Partial<ExecuteActionOptions>
): Promise<QueryResult<R>> {
this.emitter.emit("query_received", {
bus: this,
query,
})

const handler = this.resolveHandler<Q, R>(query)

const opts: ExecuteActionOptions = {
Expand Down

0 comments on commit f9d4205

Please sign in to comment.