Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
feat(ceb-messaging-simple): add support for internal event like `erro…
Browse files Browse the repository at this point in the history
…r` or `dispose`
  • Loading branch information
tmorin committed Nov 5, 2021
1 parent d3ba020 commit 87bca25
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 7 deletions.
2 changes: 1 addition & 1 deletion packages/ceb-messaging-simple/src/bus.spec.ts
Expand Up @@ -44,7 +44,7 @@ describe("messaging/simple/bus", function () {
bus = new InMemorySimpleBus()
})
afterEach(async function () {
await bus?.destroy()
await bus?.dispose()
})
describe("action", function () {
it("should execute a command and wait for result", async function () {
Expand Down
22 changes: 17 additions & 5 deletions packages/ceb-messaging-simple/src/bus.ts
Expand Up @@ -19,6 +19,7 @@ import {
Subscription,
SubscriptionListener
} from "@tmorin/ceb-messaging-core";
import {AbstractBus} from "@tmorin/ceb-messaging-core/src";

class HandlerEntry<A extends AbstractSimpleAction = any, R extends AbstractSimpleResult = any> implements Handler {
constructor(
Expand Down Expand Up @@ -98,21 +99,32 @@ function getGlobalBus() {
/**
* An very simple implementation of a {@link Bus}.
*/
export class InMemorySimpleBus implements Bus {
export class InMemorySimpleBus extends AbstractBus implements Bus {

/**
* A global instance.
*/
public static readonly GLOBAL: InMemorySimpleBus = getGlobalBus()

constructor(
/**
* @ignore
*/
private readonly handlers: Map<MessageType, HandlerEntry> = new Map(),
private readonly subscriptions: Map<string, Set<SubscriptionEntry>> = new Map()
/**
* @ignore
*/
private readonly subscriptions: Map<string, Set<SubscriptionEntry>> = new Map(),
) {
super()
}

destroy() {
async dispose(): Promise<void> {
await super.dispose();
this.handlers.forEach(handler => handler.cancel())
this.handlers.clear()
this.subscriptions.forEach(entries => entries.forEach(entry => entry.unsubscribe()))
this.subscriptions.clear()
}

subscribe<E extends MessageEvent>(
Expand All @@ -128,7 +140,7 @@ export class InMemorySimpleBus implements Bus {
try {
value.handle(event)
} catch (error: any) {
console.error("a subscription failed to handle the event %o", event, error)
this.emit("error", error)
}
})
}
Expand Down Expand Up @@ -173,7 +185,7 @@ export class InMemorySimpleBus implements Bus {
entry: HandlerEntry<A>,
action: A
): Promise<void> {
entry.handle(action).catch(error => console.error("InMemorySimpleBus - the action failed", error))
entry.handle(action).catch(error => this.emit("error", error))
}

}
2 changes: 1 addition & 1 deletion packages/ceb-messaging-simple/src/inversion.ts
Expand Up @@ -25,6 +25,6 @@ export class SimpleModule extends AbstractModule {
}

async dispose(): Promise<void> {
await this.bus.destroy()
await this.bus.dispose()
}
}

0 comments on commit 87bca25

Please sign in to comment.