Skip to content

Commit

Permalink
fix: mediator updates (#432)
Browse files Browse the repository at this point in the history
* fix: persist mediator routing keys
* fix: services were not reassigned
* test: add mediation recipient restart test
* fix: add timeout to return when is connected
* refactor: extract pickup logic from ws transport
* feat: return mediation record from provision
* refactor: connection id instead of record

Signed-off-by: Timo Glastra <timo@animo.id>
  • Loading branch information
TimoGlastra committed Aug 21, 2021
1 parent 56cb9f2 commit 163cda1
Show file tree
Hide file tree
Showing 13 changed files with 337 additions and 68 deletions.
7 changes: 4 additions & 3 deletions packages/core/src/agent/MessageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ export class MessageSender {
await transport.sendMessage({
payload: packedMessage,
endpoint: service.serviceEndpoint,
connectionId: connection.id,
})
break
}
Expand Down Expand Up @@ -268,20 +269,20 @@ export class MessageSender {
const allServices = this.transportService.findDidCommServices(connection)

//Separate queue service out
const services = allServices.filter((s) => !isDidCommTransportQueue(s.serviceEndpoint))
let services = allServices.filter((s) => !isDidCommTransportQueue(s.serviceEndpoint))
const queueService = allServices.find((s) => isDidCommTransportQueue(s.serviceEndpoint))

//If restrictive will remove services not listed in schemes list
if (transportPriority?.restrictive) {
services.filter((service) => {
services = services.filter((service) => {
const serviceSchema = service.protocolScheme
return transportPriority.schemes.includes(serviceSchema)
})
}

//If transport priority is set we will sort services by our priority
if (transportPriority?.schemes) {
services.sort(function (a, b) {
services = services.sort(function (a, b) {
const aScheme = a.protocolScheme
const bScheme = b.protocolScheme
return transportPriority?.schemes.indexOf(aScheme) - transportPriority?.schemes.indexOf(bScheme)
Expand Down
34 changes: 19 additions & 15 deletions packages/core/src/modules/connections/services/ConnectionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import type { ConnectionStateChangedEvent } from '../ConnectionEvents'
import type { CustomConnectionTags } from '../repository/ConnectionRecord'

import { validateOrReject } from 'class-validator'
import { firstValueFrom, ReplaySubject } from 'rxjs'
import { first, map, timeout } from 'rxjs/operators'
import { inject, scoped, Lifecycle } from 'tsyringe'

import { AgentConfig } from '../../../agent/AgentConfig'
Expand Down Expand Up @@ -573,28 +575,30 @@ export class ConnectionService {
return connectionRecord
}

public async returnWhenIsConnected(connectionId: string): Promise<ConnectionRecord> {
public async returnWhenIsConnected(connectionId: string, timeoutMs = 20000): Promise<ConnectionRecord> {
const isConnected = (connection: ConnectionRecord) => {
return connection.id === connectionId && connection.state === ConnectionState.Complete
}

const promise = new Promise<ConnectionRecord>((resolve) => {
const listener = ({ payload: { connectionRecord } }: ConnectionStateChangedEvent) => {
if (isConnected(connectionRecord)) {
this.eventEmitter.off<ConnectionStateChangedEvent>(ConnectionEventTypes.ConnectionStateChanged, listener)
resolve(connectionRecord)
}
}
const observable = this.eventEmitter.observable<ConnectionStateChangedEvent>(
ConnectionEventTypes.ConnectionStateChanged
)
const subject = new ReplaySubject<ConnectionRecord>(1)

this.eventEmitter.on<ConnectionStateChangedEvent>(ConnectionEventTypes.ConnectionStateChanged, listener)
})
observable
.pipe(
map((e) => e.payload.connectionRecord),
first(isConnected), // Do not wait for longer than specified timeout
timeout(timeoutMs)
)
.subscribe(subject)

// Check if already done
const connection = await this.connectionRepository.findById(connectionId)
if (connection && isConnected(connection)) return connection //TODO: check if this leaves trailing listeners behind?
const connection = await this.getById(connectionId)
if (isConnected(connection)) {
subject.next(connection)
}

// return listener
return promise
return firstValueFrom(subject)
}
}

Expand Down
76 changes: 66 additions & 10 deletions packages/core/src/modules/routing/RecipientModule.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import type { Logger } from '../../logger'
import type { OutboundWebSocketClosedEvent } from '../../transport'
import type { ConnectionRecord } from '../connections'
import type { MediationStateChangedEvent } from './RoutingEvents'
import type { MediationRecord } from './index'

import { firstValueFrom, interval, ReplaySubject } from 'rxjs'
import { filter, first, takeUntil, timeout } from 'rxjs/operators'
import { filter, first, takeUntil, throttleTime, timeout, delay, tap } from 'rxjs/operators'
import { Lifecycle, scoped } from 'tsyringe'

import { AgentConfig } from '../../agent/AgentConfig'
Expand All @@ -13,6 +14,7 @@ import { EventEmitter } from '../../agent/EventEmitter'
import { MessageSender } from '../../agent/MessageSender'
import { createOutboundMessage } from '../../agent/helpers'
import { AriesFrameworkError } from '../../error'
import { TransportEventTypes } from '../../transport'
import { ConnectionInvitationMessage } from '../connections'
import { ConnectionService } from '../connections/services'

Expand Down Expand Up @@ -71,6 +73,57 @@ export class RecipientModule {
}
}

private async openMediationWebSocket(mediator: MediationRecord) {
const { message, connectionRecord } = await this.connectionService.createTrustPing(mediator.connectionId)

const websocketSchemes = ['ws', 'wss']
const hasWebSocketTransport =
connectionRecord.didDoc.didCommServices.filter((s) => websocketSchemes.includes(s.protocolScheme)).length > 0

if (!hasWebSocketTransport) {
throw new AriesFrameworkError('Cannot open websocket to connection without websocket service endpoint')
}

await this.messageSender.sendMessage(createOutboundMessage(connectionRecord, message), {
transportPriority: {
schemes: websocketSchemes,
restrictive: true,
// TODO: add keepAlive: true to enforce through the public api
// we need to keep the socket alive. It already works this way, but would
// be good to make more explicit from the public facing API.
// This would also make it easier to change the internal API later on.
// keepAlive: true,
},
})
}

private async initiateImplicitPickup(mediator: MediationRecord) {
let interval = 50

// Listens to Outbound websocket closed events and will reopen the websocket connection
// in a recursive back off strategy if it matches the following criteria:
// - Agent is not shutdown
// - Socket was for current mediator connection id
this.eventEmitter
.observable<OutboundWebSocketClosedEvent>(TransportEventTypes.OutboundWebSocketClosedEvent)
.pipe(
// Stop when the agent shuts down
takeUntil(this.agentConfig.stop$),
filter((e) => e.payload.connectionId === mediator.connectionId),
// Make sure we're not reconnecting multiple times
throttleTime(interval),
// Increase the interval (recursive back-off)
tap(() => (interval *= 2)),
// Wait for interval time before reconnecting
delay(interval)
)
.subscribe(() => {
this.openMediationWebSocket(mediator)
})

await this.openMediationWebSocket(mediator)
}

public async initiateMessagePickup(mediator: MediationRecord) {
const { mediatorPickupStrategy, mediatorPollingInterval } = this.agentConfig

Expand All @@ -92,8 +145,7 @@ export class RecipientModule {
// such as WebSockets to work
else if (mediatorPickupStrategy === MediatorPickupStrategy.Implicit) {
this.agentConfig.logger.info(`Starting implicit pickup of messages from mediator '${mediator.id}'`)
const { message, connectionRecord } = await this.connectionService.createTrustPing(mediatorConnection.id)
await this.messageSender.sendMessage(createOutboundMessage(connectionRecord, message))
await this.initiateImplicitPickup(mediator)
} else {
this.agentConfig.logger.info(
`Skipping pickup of messages from mediator '${mediator.id}' due to pickup strategy none`
Expand Down Expand Up @@ -189,10 +241,14 @@ export class RecipientModule {
// Also requests mediation and sets as default mediator
// Assumption: processInvitation is a URL-encoded invitation
const invitation = await ConnectionInvitationMessage.fromUrl(mediatorConnInvite)

// Check if invitation has been used already
if (!invitation || !invitation.recipientKeys || !invitation.recipientKeys[0]) {
throw new AriesFrameworkError(`Invalid mediation invitation. Invitation must have at least one recipient key.`)
}

let mediationRecord: MediationRecord | null = null

const connection = await this.connectionService.findByInvitationKey(invitation.recipientKeys[0])
if (!connection) {
this.logger.debug('Mediation Connection does not exist, creating connection')
Expand All @@ -209,25 +265,22 @@ export class RecipientModule {
const outbound = createOutboundMessage(connectionRecord, message)
await this.messageSender.sendMessage(outbound)

// TODO: add timeout to returnWhenIsConnected
const completedConnectionRecord = await this.connectionService.returnWhenIsConnected(connectionRecord.id)
this.logger.debug('Connection completed, requesting mediation')
const mediationRecord = await this.requestAndAwaitGrant(completedConnectionRecord, 60000) // TODO: put timeout as a config parameter
mediationRecord = await this.requestAndAwaitGrant(completedConnectionRecord, 60000) // TODO: put timeout as a config parameter
this.logger.debug('Mediation Granted, setting as default mediator')
await this.setDefaultMediator(mediationRecord)
this.logger.debug('Default mediator set')
return
} else if (connection && !connection.isReady) {
const connectionRecord = await this.connectionService.returnWhenIsConnected(connection.id)
const mediationRecord = await this.requestAndAwaitGrant(connectionRecord, 60000) // TODO: put timeout as a config parameter
mediationRecord = await this.requestAndAwaitGrant(connectionRecord, 60000) // TODO: put timeout as a config parameter
await this.setDefaultMediator(mediationRecord)
return
} else if (connection.isReady) {
} else {
this.agentConfig.logger.warn('Mediator Invitation in configuration has already been used to create a connection.')
const mediator = await this.findByConnectionId(connection.id)
if (!mediator) {
this.agentConfig.logger.warn('requesting mediation over connection.')
const mediationRecord = await this.requestAndAwaitGrant(connection, 60000) // TODO: put timeout as a config parameter
mediationRecord = await this.requestAndAwaitGrant(connection, 60000) // TODO: put timeout as a config parameter
await this.setDefaultMediator(mediationRecord)
} else {
this.agentConfig.logger.warn(
Expand All @@ -237,7 +290,10 @@ export class RecipientModule {
)
}
}

return mediationRecord
}

// Register handlers for the several messages for the mediator.
private registerHandlers(dispatcher: Dispatcher) {
dispatcher.registerHandler(new KeylistUpdateResponseHandler(this.mediationRecipientService))
Expand Down
104 changes: 104 additions & 0 deletions packages/core/src/modules/routing/__tests__/mediation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,108 @@ describe('mediator establishment', () => {

expect(basicMessage.content).toBe(message)
})

test('restart recipient agent and create connection through mediator after recipient agent is restarted', async () => {
const mediatorMessages = new Subject<SubjectMessage>()
const recipientMessages = new Subject<SubjectMessage>()
const senderMessages = new Subject<SubjectMessage>()

const subjectMap = {
'rxjs:mediator': mediatorMessages,
'rxjs:sender': senderMessages,
}

// Initialize mediator
mediatorAgent = new Agent(mediatorConfig.config, recipientConfig.agentDependencies)
mediatorAgent.registerOutboundTransport(new SubjectOutboundTransport(mediatorMessages, subjectMap))
mediatorAgent.registerInboundTransport(new SubjectInboundTransport(mediatorMessages))
await mediatorAgent.initialize()

// Create connection to use for recipient
const {
invitation: mediatorInvitation,
connectionRecord: { id: mediatorRecipientConnectionId },
} = await mediatorAgent.connections.createConnection({
autoAcceptConnection: true,
})

// Initialize recipient with mediation connections invitation
recipientAgent = new Agent(
{ ...recipientConfig.config, mediatorConnectionsInvite: mediatorInvitation.toUrl() },
recipientConfig.agentDependencies
)
recipientAgent.registerOutboundTransport(new SubjectOutboundTransport(recipientMessages, subjectMap))
recipientAgent.registerInboundTransport(new SubjectInboundTransport(recipientMessages))
await recipientAgent.initialize()

const recipientMediator = await recipientAgent.mediationRecipient.findDefaultMediator()
// eslint-disable-next-line @typescript-eslint/no-non-null-asserted-optional-chain, @typescript-eslint/no-non-null-assertion
const recipientMediatorConnection = await recipientAgent.connections.getById(recipientMediator?.connectionId!)

expect(recipientMediatorConnection).toBeInstanceOf(ConnectionRecord)
expect(recipientMediatorConnection?.isReady).toBe(true)

const mediatorRecipientConnection = await mediatorAgent.connections.getById(mediatorRecipientConnectionId)
expect(mediatorRecipientConnection.isReady).toBe(true)

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
expect(mediatorRecipientConnection).toBeConnectedWith(recipientMediatorConnection!)
expect(recipientMediatorConnection).toBeConnectedWith(mediatorRecipientConnection)

expect(recipientMediator?.state).toBe(MediationState.Granted)

// Restart recipient agent
await recipientAgent.shutdown()
recipientAgent = new Agent(
{ ...recipientConfig.config, mediatorConnectionsInvite: mediatorInvitation.toUrl() },
recipientConfig.agentDependencies
)
recipientAgent.registerOutboundTransport(new SubjectOutboundTransport(recipientMessages, subjectMap))
recipientAgent.registerInboundTransport(new SubjectInboundTransport(recipientMessages))
await recipientAgent.initialize()

// Initialize sender agent
senderAgent = new Agent(senderConfig.config, senderConfig.agentDependencies)
senderAgent.registerOutboundTransport(new SubjectOutboundTransport(senderMessages, subjectMap))
senderAgent.registerInboundTransport(new SubjectInboundTransport(senderMessages))
await senderAgent.initialize()

const {
invitation: recipientInvitation,
connectionRecord: { id: recipientSenderConnectionId },
} = await recipientAgent.connections.createConnection({
autoAcceptConnection: true,
})

const endpoints = mediatorConfig.config.endpoints ?? []
expect(recipientInvitation.serviceEndpoint).toBe(endpoints[0])

let senderRecipientConnection = await senderAgent.connections.receiveInvitationFromUrl(
recipientInvitation.toUrl(),
{
autoAcceptConnection: true,
}
)

const recipientSenderConnection = await recipientAgent.connections.returnWhenIsConnected(
recipientSenderConnectionId
)

senderRecipientConnection = await senderAgent.connections.getById(senderRecipientConnection.id)

expect(recipientSenderConnection).toBeConnectedWith(senderRecipientConnection)
expect(senderRecipientConnection).toBeConnectedWith(recipientSenderConnection)

expect(recipientSenderConnection.isReady).toBe(true)
expect(senderRecipientConnection.isReady).toBe(true)

const message = 'hello, world'
await senderAgent.basicMessages.sendMessage(senderRecipientConnection, message)

const basicMessage = await waitForBasicMessage(recipientAgent, {
content: message,
})

expect(basicMessage.content).toBe(message)
})
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import type { TagsBase } from '../../../storage/BaseRecord'

import { BaseRecord } from '../../../storage/BaseRecord'
import { uuid } from '../../../utils/uuid'

export interface MediatorRoutingRecordProps {
id?: string
createdAt?: Date
routingKeys?: string[]
tags?: TagsBase
}

export class MediatorRoutingRecord extends BaseRecord implements MediatorRoutingRecordProps {
public routingKeys!: string[]

public static readonly type = 'MediatorRoutingRecord'
public readonly type = MediatorRoutingRecord.type

public constructor(props: MediatorRoutingRecordProps) {
super()

if (props) {
this.id = props.id ?? uuid()
this.createdAt = props.createdAt ?? new Date()
this.routingKeys = props.routingKeys || []
}
}

public getTags() {
return this._tags
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { inject, scoped, Lifecycle } from 'tsyringe'

import { InjectionSymbols } from '../../../constants'
import { Repository } from '../../../storage/Repository'
import { StorageService } from '../../../storage/StorageService'

import { MediatorRoutingRecord } from './MediatorRoutingRecord'

@scoped(Lifecycle.ContainerScoped)
export class MediatorRoutingRepository extends Repository<MediatorRoutingRecord> {
public readonly MEDIATOR_ROUTING_RECORD_ID = 'MEDIATOR_ROUTING_RECORD'

public constructor(@inject(InjectionSymbols.StorageService) storageService: StorageService<MediatorRoutingRecord>) {
super(MediatorRoutingRecord, storageService)
}
}
2 changes: 2 additions & 0 deletions packages/core/src/modules/routing/repository/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
export * from './MediationRepository'
export * from './MediatorRoutingRepository'
export * from './MediationRecord'
export * from './MediatorRoutingRecord'

0 comments on commit 163cda1

Please sign in to comment.