From 93276debeff1e56c9803e7700875c4254a48236b Mon Sep 17 00:00:00 2001 From: Timo Glastra Date: Fri, 18 Aug 2023 16:00:50 +0200 Subject: [PATCH] fix: create message subscription first (#1549) Signed-off-by: Timo Glastra --- packages/core/src/agent/Agent.ts | 41 ++++++++++++++++---------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/packages/core/src/agent/Agent.ts b/packages/core/src/agent/Agent.ts index 125b3377cf..952e610c82 100644 --- a/packages/core/src/agent/Agent.ts +++ b/packages/core/src/agent/Agent.ts @@ -148,6 +148,27 @@ export class Agent extends BaseAge } public async initialize() { + const stop$ = this.dependencyManager.resolve>(InjectionSymbols.Stop$) + + // Listen for new messages (either from transports or somewhere else in the framework / extensions) + // We create this before doing any other initialization, so the initialization could already receive messages + this.messageSubscription = this.eventEmitter + .observable(AgentEventTypes.AgentMessageReceived) + .pipe( + takeUntil(stop$), + concatMap((e) => + this.messageReceiver + .receiveMessage(e.payload.message, { + connection: e.payload.connection, + contextCorrelationId: e.payload.contextCorrelationId, + }) + .catch((error) => { + this.logger.error('Failed to process message', { error }) + }) + ) + ) + .subscribe() + await super.initialize() for (const [, module] of Object.entries(this.dependencyManager.registeredModules) as [string, Module][]) { @@ -179,26 +200,6 @@ export class Agent extends BaseAge await this.mediator.initialize() await this.mediationRecipient.initialize() - const stop$ = this.dependencyManager.resolve>(InjectionSymbols.Stop$) - - // Listen for new messages (either from transports or somewhere else in the framework / extensions) - this.messageSubscription = this.eventEmitter - .observable(AgentEventTypes.AgentMessageReceived) - .pipe( - takeUntil(stop$), - concatMap((e) => - this.messageReceiver - .receiveMessage(e.payload.message, { - connection: e.payload.connection, - contextCorrelationId: e.payload.contextCorrelationId, - }) - .catch((error) => { - this.logger.error('Failed to process message', { error }) - }) - ) - ) - .subscribe() - this._isInitialized = true }