Skip to content

Commit

Permalink
fix: WebSocket priority in Message Pick Up V2 (#1888)
Browse files Browse the repository at this point in the history
Signed-off-by: Ariel Gentile <gentilester@gmail.com>
  • Loading branch information
genaris committed Jun 8, 2024
1 parent 2de96bb commit 648d375
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
9 changes: 6 additions & 3 deletions packages/core/src/modules/message-pickup/MessagePickupApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ export class MessagePickupApi<MPPs extends MessagePickupProtocol[] = [V1MessageP
.subscribe(replaySubject)
}

await this.messageSender.sendMessage(outboundMessageContext)
// For picking up messages we prefer a long-lived transport session, so we will set a higher priority to
// WebSocket endpoints. However, it is not extrictly required.
await this.messageSender.sendMessage(outboundMessageContext, { transportPriority: { schemes: ['wss', 'ws'] } })

if (options.awaitCompletion) {
await firstValueFrom(replaySubject)
Expand All @@ -255,18 +257,19 @@ export class MessagePickupApi<MPPs extends MessagePickupProtocol[] = [V1MessageP
*/
public async setLiveDeliveryMode(options: SetLiveDeliveryModeOptions): Promise<SetLiveDeliveryModeReturnType> {
const connectionRecord = await this.connectionService.getById(this.agentContext, options.connectionId)

const protocol = this.getProtocol(options.protocolVersion)
const { message } = await protocol.setLiveDeliveryMode(this.agentContext, {
connectionRecord,
liveDelivery: options.liveDelivery,
})

// Live mode requires a long-lived transport session, so we'll require WebSockets to send this message
await this.messageSender.sendMessage(
new OutboundMessageContext(message, {
agentContext: this.agentContext,
connection: connectionRecord,
})
}),
{ transportPriority: { schemes: ['wss', 'ws'], restrictive: options.liveDelivery } }
)
}
}
23 changes: 18 additions & 5 deletions packages/core/src/modules/routing/MediationRecipientApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,16 @@ export class MediationRecipientApi {
})
}

private async openMediationWebSocket(mediator: MediationRecord) {
/**
* Implicit mode consists simply on initiating a long-lived session to a mediator and wait for the
* messages to arrive automatically.
*
* In order to do initiate this session, we open a suitable connection (using WebSocket transport) and
* send a Trust Ping message.
*
* @param mediator
*/
private async initiateImplicitMode(mediator: MediationRecord) {
const connection = await this.connectionService.getById(this.agentContext, mediator.connectionId)
const { message, connectionRecord } = await this.connectionService.createTrustPing(this.agentContext, connection, {
responseRequested: false,
Expand Down Expand Up @@ -218,7 +227,7 @@ export class MediationRecipientApi {
protocolVersion: 'v2',
})
} else {
await this.openMediationWebSocket(mediator)
await this.initiateImplicitMode(mediator)
}
} catch (error) {
this.logger.warn('Unable to re-open websocket connection to mediator', { error })
Expand Down Expand Up @@ -252,15 +261,19 @@ export class MediationRecipientApi {
case MediatorPickupStrategy.PickUpV2: {
const stopConditions$ = merge(this.stop$, this.stopMessagePickup$).pipe()
// PickUpV1/PickUpV2 means polling every X seconds with batch message
this.logger.info(`Starting explicit (batch) pickup of messages from mediator '${mediatorRecord.id}'`)
const protocolVersion = mediatorPickupStrategy === MediatorPickupStrategy.PickUpV2 ? 'v2' : 'v1'

this.logger.info(
`Starting explicit pickup of messages from mediator '${mediatorRecord.id}' using ${protocolVersion}`
)
const subscription = interval(mediatorPollingInterval)
.pipe(takeUntil(stopConditions$))
.subscribe({
next: async () => {
await this.messagePickupApi.pickupMessages({
connectionId: mediatorConnection.id,
batchSize: this.config.maximumMessagePickup,
protocolVersion: mediatorPickupStrategy === MediatorPickupStrategy.PickUpV2 ? 'v2' : 'v1',
protocolVersion,
})
},
complete: () => this.logger.info(`Stopping pickup of messages from mediator '${mediatorRecord.id}'`),
Expand Down Expand Up @@ -290,7 +303,7 @@ export class MediationRecipientApi {
// such as WebSockets to work
this.logger.info(`Starting implicit pickup of messages from mediator '${mediatorRecord.id}'`)
await this.monitorMediatorWebSocketEvents(mediatorRecord, mediatorPickupStrategy)
await this.openMediationWebSocket(mediatorRecord)
await this.initiateImplicitMode(mediatorRecord)
break
default:
this.logger.info(`Skipping pickup of messages from mediator '${mediatorRecord.id}' due to pickup strategy none`)
Expand Down

0 comments on commit 648d375

Please sign in to comment.