Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: mediator updates #432

Merged
7 changes: 4 additions & 3 deletions packages/core/src/agent/MessageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ export class MessageSender {
await transport.sendMessage({
payload: packedMessage,
endpoint: service.serviceEndpoint,
connectionId: connection.id,
})
break
}
Expand Down Expand Up @@ -268,20 +269,20 @@ export class MessageSender {
const allServices = this.transportService.findDidCommServices(connection)

//Separate queue service out
const services = allServices.filter((s) => !isDidCommTransportQueue(s.serviceEndpoint))
let services = allServices.filter((s) => !isDidCommTransportQueue(s.serviceEndpoint))
const queueService = allServices.find((s) => isDidCommTransportQueue(s.serviceEndpoint))

//If restrictive will remove services not listed in schemes list
if (transportPriority?.restrictive) {
services.filter((service) => {
services = services.filter((service) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JamesKEbert @burdettadam The services were not being reassigned meaning the sort and filter did not do anything. This should fix it.

This means that currently it will take the http endpoint, send the ping and receive the ping response after which the http endpoint is closed 😄

So if we could merge this rather sooner than later that would be great!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How embarrassing! Thanks for catching that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoooops--my bad!
As a side note, the array.sort() method does actually do its operation in place and returns too, so it doesn't necessarily have to be reassigned too, but doesn't hurt to reassign it. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you're right. So weird that filter does not do its operation in place, but sort does. Thanks

const serviceSchema = service.protocolScheme
return transportPriority.schemes.includes(serviceSchema)
})
}

//If transport priority is set we will sort services by our priority
if (transportPriority?.schemes) {
services.sort(function (a, b) {
services = services.sort(function (a, b) {
const aScheme = a.protocolScheme
const bScheme = b.protocolScheme
return transportPriority?.schemes.indexOf(aScheme) - transportPriority?.schemes.indexOf(bScheme)
Expand Down
34 changes: 19 additions & 15 deletions packages/core/src/modules/connections/services/ConnectionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import type { ConnectionStateChangedEvent } from '../ConnectionEvents'
import type { CustomConnectionTags } from '../repository/ConnectionRecord'

import { validateOrReject } from 'class-validator'
import { firstValueFrom, ReplaySubject } from 'rxjs'
import { first, map, timeout } from 'rxjs/operators'
import { inject, scoped, Lifecycle } from 'tsyringe'

import { AgentConfig } from '../../../agent/AgentConfig'
Expand Down Expand Up @@ -573,28 +575,30 @@ export class ConnectionService {
return connectionRecord
}

public async returnWhenIsConnected(connectionId: string): Promise<ConnectionRecord> {
public async returnWhenIsConnected(connectionId: string, timeoutMs = 20000): Promise<ConnectionRecord> {
const isConnected = (connection: ConnectionRecord) => {
return connection.id === connectionId && connection.state === ConnectionState.Complete
}

const promise = new Promise<ConnectionRecord>((resolve) => {
const listener = ({ payload: { connectionRecord } }: ConnectionStateChangedEvent) => {
if (isConnected(connectionRecord)) {
this.eventEmitter.off<ConnectionStateChangedEvent>(ConnectionEventTypes.ConnectionStateChanged, listener)
resolve(connectionRecord)
}
}
const observable = this.eventEmitter.observable<ConnectionStateChangedEvent>(
ConnectionEventTypes.ConnectionStateChanged
)
const subject = new ReplaySubject<ConnectionRecord>(1)

this.eventEmitter.on<ConnectionStateChangedEvent>(ConnectionEventTypes.ConnectionStateChanged, listener)
})
observable
.pipe(
map((e) => e.payload.connectionRecord),
first(isConnected), // Do not wait for longer than specified timeout
timeout(timeoutMs)
)
.subscribe(subject)

// Check if already done
const connection = await this.connectionRepository.findById(connectionId)
if (connection && isConnected(connection)) return connection //TODO: check if this leaves trailing listeners behind?
const connection = await this.getById(connectionId)
if (isConnected(connection)) {
subject.next(connection)
}

// return listener
return promise
return firstValueFrom(subject)
}
}

Expand Down
76 changes: 66 additions & 10 deletions packages/core/src/modules/routing/RecipientModule.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import type { Logger } from '../../logger'
import type { OutboundWebSocketClosedEvent } from '../../transport'
import type { ConnectionRecord } from '../connections'
import type { MediationStateChangedEvent } from './RoutingEvents'
import type { MediationRecord } from './index'

import { firstValueFrom, interval, ReplaySubject } from 'rxjs'
import { filter, first, takeUntil, timeout } from 'rxjs/operators'
import { filter, first, takeUntil, throttleTime, timeout, delay, tap } from 'rxjs/operators'
import { Lifecycle, scoped } from 'tsyringe'

import { AgentConfig } from '../../agent/AgentConfig'
Expand All @@ -13,6 +14,7 @@ import { EventEmitter } from '../../agent/EventEmitter'
import { MessageSender } from '../../agent/MessageSender'
import { createOutboundMessage } from '../../agent/helpers'
import { AriesFrameworkError } from '../../error'
import { TransportEventTypes } from '../../transport'
import { ConnectionInvitationMessage } from '../connections'
import { ConnectionService } from '../connections/services'

Expand Down Expand Up @@ -71,6 +73,57 @@ export class RecipientModule {
}
}

private async openMediationWebSocket(mediator: MediationRecord) {
const { message, connectionRecord } = await this.connectionService.createTrustPing(mediator.connectionId)

const websocketSchemes = ['ws', 'wss']
const hasWebSocketTransport =
connectionRecord.didDoc.didCommServices.filter((s) => websocketSchemes.includes(s.protocolScheme)).length > 0

if (!hasWebSocketTransport) {
throw new AriesFrameworkError('Cannot open websocket to connection without websocket service endpoint')
}

await this.messageSender.sendMessage(createOutboundMessage(connectionRecord, message), {
transportPriority: {
schemes: websocketSchemes,
restrictive: true,
// TODO: add keepAlive: true to enforce through the public api
// we need to keep the socket alive. It already works this way, but would
// be good to make more explicit from the public facing API.
// This would also make it easier to change the internal API later on.
// keepAlive: true,
},
})
}

private async initiateImplicitPickup(mediator: MediationRecord) {
let interval = 50

// Listens to Outbound websocket closed events and will reopen the websocket connection
// in a recursive back off strategy if it matches the following criteria:
// - Agent is not shutdown
// - Socket was for current mediator connection id
this.eventEmitter
.observable<OutboundWebSocketClosedEvent>(TransportEventTypes.OutboundWebSocketClosedEvent)
.pipe(
// Stop when the agent shuts down
takeUntil(this.agentConfig.stop$),
filter((e) => e.payload.connectionId === mediator.connectionId),
// Make sure we're not reconnecting multiple times
throttleTime(interval),
// Increase the interval (recursive back-off)
tap(() => (interval *= 2)),
// Wait for interval time before reconnecting
delay(interval)
)
.subscribe(() => {
this.openMediationWebSocket(mediator)
})

await this.openMediationWebSocket(mediator)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't those methods be part of the transport layer rather than inside the core of the framework? Or, it's just temporary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What it does is sending a trust ping message with a restriction to only use websocket. The websocket will be kept alive autmoatically (I'd like to make the keep alive more explicit)

I'm not sure yet how we could nicely integrate this into the transport layer. Would be nice to not make it WebSocket specific (so just send a ping and keep the socket alive), but we need to specify somewhere which transports are capable of this, as e.g. HTTP isn't.

Maybe a transport should be able to specify whether it can send multiple messages (WebSocket is ∞, while HTTP is 1 request, 1 response). That way we wouldn't have to specify WS explicitly, but rather whether we want a transport that can stay open (for implicit pickup)

I'm going to think about it for a while. We can have a discussion about it in the WG call

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have opened #435. Please leave your thoughts :)


public async initiateMessagePickup(mediator: MediationRecord) {
const { mediatorPickupStrategy, mediatorPollingInterval } = this.agentConfig

Expand All @@ -92,8 +145,7 @@ export class RecipientModule {
// such as WebSockets to work
else if (mediatorPickupStrategy === MediatorPickupStrategy.Implicit) {
this.agentConfig.logger.info(`Starting implicit pickup of messages from mediator '${mediator.id}'`)
const { message, connectionRecord } = await this.connectionService.createTrustPing(mediatorConnection.id)
await this.messageSender.sendMessage(createOutboundMessage(connectionRecord, message))
await this.initiateImplicitPickup(mediator)
} else {
this.agentConfig.logger.info(
`Skipping pickup of messages from mediator '${mediator.id}' due to pickup strategy none`
Expand Down Expand Up @@ -189,10 +241,14 @@ export class RecipientModule {
// Also requests mediation and sets as default mediator
// Assumption: processInvitation is a URL-encoded invitation
const invitation = await ConnectionInvitationMessage.fromUrl(mediatorConnInvite)

// Check if invitation has been used already
if (!invitation || !invitation.recipientKeys || !invitation.recipientKeys[0]) {
throw new AriesFrameworkError(`Invalid mediation invitation. Invitation must have at least one recipient key.`)
}

let mediationRecord: MediationRecord | null = null

const connection = await this.connectionService.findByInvitationKey(invitation.recipientKeys[0])
if (!connection) {
this.logger.debug('Mediation Connection does not exist, creating connection')
Expand All @@ -209,25 +265,22 @@ export class RecipientModule {
const outbound = createOutboundMessage(connectionRecord, message)
await this.messageSender.sendMessage(outbound)

// TODO: add timeout to returnWhenIsConnected
const completedConnectionRecord = await this.connectionService.returnWhenIsConnected(connectionRecord.id)
this.logger.debug('Connection completed, requesting mediation')
const mediationRecord = await this.requestAndAwaitGrant(completedConnectionRecord, 60000) // TODO: put timeout as a config parameter
mediationRecord = await this.requestAndAwaitGrant(completedConnectionRecord, 60000) // TODO: put timeout as a config parameter
this.logger.debug('Mediation Granted, setting as default mediator')
await this.setDefaultMediator(mediationRecord)
this.logger.debug('Default mediator set')
return
} else if (connection && !connection.isReady) {
const connectionRecord = await this.connectionService.returnWhenIsConnected(connection.id)
const mediationRecord = await this.requestAndAwaitGrant(connectionRecord, 60000) // TODO: put timeout as a config parameter
mediationRecord = await this.requestAndAwaitGrant(connectionRecord, 60000) // TODO: put timeout as a config parameter
await this.setDefaultMediator(mediationRecord)
return
} else if (connection.isReady) {
} else {
this.agentConfig.logger.warn('Mediator Invitation in configuration has already been used to create a connection.')
const mediator = await this.findByConnectionId(connection.id)
if (!mediator) {
this.agentConfig.logger.warn('requesting mediation over connection.')
const mediationRecord = await this.requestAndAwaitGrant(connection, 60000) // TODO: put timeout as a config parameter
mediationRecord = await this.requestAndAwaitGrant(connection, 60000) // TODO: put timeout as a config parameter
await this.setDefaultMediator(mediationRecord)
} else {
this.agentConfig.logger.warn(
Expand All @@ -237,7 +290,10 @@ export class RecipientModule {
)
}
}

return mediationRecord
}

// Register handlers for the several messages for the mediator.
private registerHandlers(dispatcher: Dispatcher) {
dispatcher.registerHandler(new KeylistUpdateResponseHandler(this.mediationRecipientService))
Expand Down
104 changes: 104 additions & 0 deletions packages/core/src/modules/routing/__tests__/mediation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,108 @@ describe('mediator establishment', () => {

expect(basicMessage.content).toBe(message)
})

test('restart recipient agent and create connection through mediator after recipient agent is restarted', async () => {
const mediatorMessages = new Subject<SubjectMessage>()
const recipientMessages = new Subject<SubjectMessage>()
const senderMessages = new Subject<SubjectMessage>()

const subjectMap = {
'rxjs:mediator': mediatorMessages,
'rxjs:sender': senderMessages,
}

// Initialize mediator
mediatorAgent = new Agent(mediatorConfig.config, recipientConfig.agentDependencies)
mediatorAgent.registerOutboundTransport(new SubjectOutboundTransport(mediatorMessages, subjectMap))
mediatorAgent.registerInboundTransport(new SubjectInboundTransport(mediatorMessages))
await mediatorAgent.initialize()

// Create connection to use for recipient
const {
invitation: mediatorInvitation,
connectionRecord: { id: mediatorRecipientConnectionId },
} = await mediatorAgent.connections.createConnection({
autoAcceptConnection: true,
})

// Initialize recipient with mediation connections invitation
recipientAgent = new Agent(
{ ...recipientConfig.config, mediatorConnectionsInvite: mediatorInvitation.toUrl() },
recipientConfig.agentDependencies
)
recipientAgent.registerOutboundTransport(new SubjectOutboundTransport(recipientMessages, subjectMap))
recipientAgent.registerInboundTransport(new SubjectInboundTransport(recipientMessages))
await recipientAgent.initialize()

const recipientMediator = await recipientAgent.mediationRecipient.findDefaultMediator()
// eslint-disable-next-line @typescript-eslint/no-non-null-asserted-optional-chain, @typescript-eslint/no-non-null-assertion
const recipientMediatorConnection = await recipientAgent.connections.getById(recipientMediator?.connectionId!)

expect(recipientMediatorConnection).toBeInstanceOf(ConnectionRecord)
expect(recipientMediatorConnection?.isReady).toBe(true)

const mediatorRecipientConnection = await mediatorAgent.connections.getById(mediatorRecipientConnectionId)
expect(mediatorRecipientConnection.isReady).toBe(true)

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
expect(mediatorRecipientConnection).toBeConnectedWith(recipientMediatorConnection!)
expect(recipientMediatorConnection).toBeConnectedWith(mediatorRecipientConnection)

expect(recipientMediator?.state).toBe(MediationState.Granted)

// Restart recipient agent
await recipientAgent.shutdown()
recipientAgent = new Agent(
{ ...recipientConfig.config, mediatorConnectionsInvite: mediatorInvitation.toUrl() },
recipientConfig.agentDependencies
)
recipientAgent.registerOutboundTransport(new SubjectOutboundTransport(recipientMessages, subjectMap))
recipientAgent.registerInboundTransport(new SubjectInboundTransport(recipientMessages))
await recipientAgent.initialize()

// Initialize sender agent
senderAgent = new Agent(senderConfig.config, senderConfig.agentDependencies)
senderAgent.registerOutboundTransport(new SubjectOutboundTransport(senderMessages, subjectMap))
senderAgent.registerInboundTransport(new SubjectInboundTransport(senderMessages))
await senderAgent.initialize()

const {
invitation: recipientInvitation,
connectionRecord: { id: recipientSenderConnectionId },
} = await recipientAgent.connections.createConnection({
autoAcceptConnection: true,
})

const endpoints = mediatorConfig.config.endpoints ?? []
expect(recipientInvitation.serviceEndpoint).toBe(endpoints[0])

let senderRecipientConnection = await senderAgent.connections.receiveInvitationFromUrl(
recipientInvitation.toUrl(),
{
autoAcceptConnection: true,
}
)

const recipientSenderConnection = await recipientAgent.connections.returnWhenIsConnected(
recipientSenderConnectionId
)

senderRecipientConnection = await senderAgent.connections.getById(senderRecipientConnection.id)

expect(recipientSenderConnection).toBeConnectedWith(senderRecipientConnection)
expect(senderRecipientConnection).toBeConnectedWith(recipientSenderConnection)

expect(recipientSenderConnection.isReady).toBe(true)
expect(senderRecipientConnection.isReady).toBe(true)

const message = 'hello, world'
await senderAgent.basicMessages.sendMessage(senderRecipientConnection, message)

const basicMessage = await waitForBasicMessage(recipientAgent, {
content: message,
})

expect(basicMessage.content).toBe(message)
})
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import type { TagsBase } from '../../../storage/BaseRecord'

import { BaseRecord } from '../../../storage/BaseRecord'
import { uuid } from '../../../utils/uuid'

export interface MediatorRoutingRecordProps {
id?: string
createdAt?: Date
routingKeys?: string[]
tags?: TagsBase
}

export class MediatorRoutingRecord extends BaseRecord implements MediatorRoutingRecordProps {
public routingKeys!: string[]

public static readonly type = 'MediatorRoutingRecord'
public readonly type = MediatorRoutingRecord.type

public constructor(props: MediatorRoutingRecordProps) {
super()

if (props) {
this.id = props.id ?? uuid()
this.createdAt = props.createdAt ?? new Date()
this.routingKeys = props.routingKeys || []
}
}

public getTags() {
return this._tags
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { inject, scoped, Lifecycle } from 'tsyringe'

import { InjectionSymbols } from '../../../constants'
import { Repository } from '../../../storage/Repository'
import { StorageService } from '../../../storage/StorageService'

import { MediatorRoutingRecord } from './MediatorRoutingRecord'

@scoped(Lifecycle.ContainerScoped)
export class MediatorRoutingRepository extends Repository<MediatorRoutingRecord> {
public readonly MEDIATOR_ROUTING_RECORD_ID = 'MEDIATOR_ROUTING_RECORD'

public constructor(@inject(InjectionSymbols.StorageService) storageService: StorageService<MediatorRoutingRecord>) {
super(MediatorRoutingRecord, storageService)
}
}
2 changes: 2 additions & 0 deletions packages/core/src/modules/routing/repository/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
export * from './MediationRepository'
export * from './MediatorRoutingRepository'
export * from './MediationRecord'
export * from './MediatorRoutingRecord'