Skip to content

Commit

Permalink
feat: outbound message send via session (openwallet-foundation#1335)
Browse files Browse the repository at this point in the history
Co-authored-by: Jim Ezesinachi <ezesinachijim@gmail.com>
Co-authored-by: Timo Glastra <timo@animo.id>
Signed-off-by: Moriarty <moritz@animo.id>
  • Loading branch information
3 people committed Mar 11, 2023
1 parent 19cefa5 commit 582c711
Show file tree
Hide file tree
Showing 21 changed files with 665 additions and 403 deletions.
4 changes: 2 additions & 2 deletions packages/action-menu/tests/action-menu.e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { Agent } from '@aries-framework/core'
import { getAgentOptions, makeConnection, testLogger, setupSubjectTransports, indySdk } from '../../core/tests'
import { IndySdkModule } from '../../indy-sdk/src'

import { waitForActionMenuRecord } from './helpers'

import {
ActionMenu,
ActionMenuModule,
Expand All @@ -13,8 +15,6 @@ import {
ActionMenuState,
} from '@aries-framework/action-menu'

import { waitForActionMenuRecord } from './helpers'

const modules = {
actionMenu: new ActionMenuModule(),
indySdk: new IndySdkModule({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,106 @@ describe('V1 Proofs - Connectionless - Indy', () => {

await waitForProofExchangeRecordSubject(aliceReplay, {
state: ProofState.Done,
threadId: message.threadId,
})

await waitForProofExchangeRecordSubject(faberReplay, {
state: ProofState.Done,
threadId: message.threadId,
})
})

test('Faber starts with connection-less proof requests to Alice with auto-accept enabled and without an outbound transport', async () => {
const {
holderAgent: aliceAgent,
issuerAgent: faberAgent,
holderReplay: aliceReplay,
credentialDefinitionId,
issuerReplay: faberReplay,
issuerHolderConnectionId: faberConnectionId,
} = await setupAnonCredsTests({
issuerName: 'Faber v1 connection-less Proofs - Always',
holderName: 'Alice v1 connection-less Proofs - Always',
autoAcceptProofs: AutoAcceptProof.Always,
attributeNames: ['name', 'age'],
})

await issueLegacyAnonCredsCredential({
issuerAgent: faberAgent,
holderAgent: aliceAgent,
issuerReplay: faberReplay,
holderReplay: aliceReplay,
issuerHolderConnectionId: faberConnectionId,
offer: {
credentialDefinitionId,
attributes: [
{
name: 'name',
value: 'John',
},
{
name: 'age',
value: '99',
},
],
},
})

agents = [aliceAgent, faberAgent]

const { message, proofRecord: faberProofExchangeRecord } = await faberAgent.proofs.createRequest({
protocolVersion: 'v1',
proofFormats: {
indy: {
name: 'test-proof-request',
version: '1.0',
requested_attributes: {
name: {
name: 'name',
restrictions: [
{
cred_def_id: credentialDefinitionId,
},
],
},
},
requested_predicates: {
age: {
name: 'age',
p_type: '>=',
p_value: 50,
restrictions: [
{
cred_def_id: credentialDefinitionId,
},
],
},
},
},
},
autoAcceptProof: AutoAcceptProof.ContentApproved,
})

const { message: requestMessage } = await faberAgent.oob.createLegacyConnectionlessInvitation({
recordId: faberProofExchangeRecord.id,
message,
domain: 'https://a-domain.com',
})

for (const transport of faberAgent.outboundTransports) {
await faberAgent.unregisterOutboundTransport(transport)
}

await aliceAgent.receiveMessage(requestMessage.toJSON())

await waitForProofExchangeRecordSubject(aliceReplay, {
state: ProofState.Done,
threadId: requestMessage.threadId,
})

await waitForProofExchangeRecordSubject(faberReplay, {
state: ProofState.Done,
threadId: requestMessage.threadId,
})
})

Expand Down Expand Up @@ -364,14 +460,6 @@ describe('V1 Proofs - Connectionless - Indy', () => {
},
})

const aliceProofExchangeRecordPromise = waitForProofExchangeRecordSubject(aliceReplay, {
state: ProofState.Done,
})

const faberProofExchangeRecordPromise = waitForProofExchangeRecordSubject(faberReplay, {
state: ProofState.Done,
})

// eslint-disable-next-line prefer-const
let { message, proofRecord: faberProofExchangeRecord } = await faberAgent.proofs.createRequest({
protocolVersion: 'v1',
Expand Down Expand Up @@ -427,9 +515,15 @@ describe('V1 Proofs - Connectionless - Indy', () => {

await aliceAgent.receiveMessage(requestMessage.toJSON())

await aliceProofExchangeRecordPromise
await waitForProofExchangeRecordSubject(aliceReplay, {
state: ProofState.Done,
threadId: requestMessage.threadId,
})

await faberProofExchangeRecordPromise
await waitForProofExchangeRecordSubject(faberReplay, {
state: ProofState.Done,
threadId: requestMessage.threadId,
})

await aliceAgent.mediationRecipient.stopMessagePickup()
await faberAgent.mediationRecipient.stopMessagePickup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export class V1PresentationHandler implements MessageHandler {
serviceParams: {
service: recipientService.resolvedDidCommService,
senderKey: ourService.resolvedDidCommService.recipientKeys[0],
returnRoute: true,
},
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export class V1RequestPresentationHandler implements MessageHandler {
serviceParams: {
service: recipientService.resolvedDidCommService,
senderKey: message.service.resolvedDidCommService.recipientKeys[0],
returnRoute: true,
},
})
}
Expand Down
8 changes: 8 additions & 0 deletions packages/core/src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ export class Agent<AgentModules extends AgentModulesInput = any> extends BaseAge
this.messageReceiver.registerInboundTransport(inboundTransport)
}

public async unregisterInboundTransport(inboundTransport: InboundTransport) {
await this.messageReceiver.unregisterInboundTransport(inboundTransport)
}

public get inboundTransports() {
return this.messageReceiver.inboundTransports
}
Expand All @@ -144,6 +148,10 @@ export class Agent<AgentModules extends AgentModulesInput = any> extends BaseAge
this.messageSender.registerOutboundTransport(outboundTransport)
}

public async unregisterOutboundTransport(outboundTransport: OutboundTransport) {
await this.messageSender.unregisterOutboundTransport(outboundTransport)
}

public get outboundTransports() {
return this.messageSender.outboundTransports
}
Expand Down
7 changes: 6 additions & 1 deletion packages/core/src/agent/Dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class Dispatcher {
outboundMessage = new OutboundMessageContext(problemReportMessage, {
agentContext,
connection: messageContext.connection,
inboundMessageContext: messageContext,
})
} else {
this.logger.error(`Error handling message with type ${message.type}`, {
Expand All @@ -83,10 +84,14 @@ class Dispatcher {
}

if (outboundMessage) {
// set the inbound message context, if not already defined
if (!outboundMessage.inboundMessageContext) {
outboundMessage.inboundMessageContext = messageContext
}

if (outboundMessage.isOutboundServiceMessage()) {
await this.messageSender.sendMessageToService(outboundMessage)
} else {
outboundMessage.sessionId = messageContext.sessionId
await this.messageSender.sendMessage(outboundMessage)
}
}
Expand Down
14 changes: 12 additions & 2 deletions packages/core/src/agent/MessageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class MessageReceiver {
private connectionService: ConnectionService
private messageHandlerRegistry: MessageHandlerRegistry
private agentContextProvider: AgentContextProvider
public readonly inboundTransports: InboundTransport[] = []
private _inboundTransports: InboundTransport[] = []

public constructor(
envelopeService: EnvelopeService,
Expand All @@ -54,10 +54,20 @@ export class MessageReceiver {
this.messageHandlerRegistry = messageHandlerRegistry
this.agentContextProvider = agentContextProvider
this.logger = logger
this._inboundTransports = []
}

public get inboundTransports() {
return this._inboundTransports
}

public registerInboundTransport(inboundTransport: InboundTransport) {
this.inboundTransports.push(inboundTransport)
this._inboundTransports.push(inboundTransport)
}

public async unregisterInboundTransport(inboundTransport: InboundTransport) {
this._inboundTransports = this._inboundTransports.filter((transport) => transport !== inboundTransport)
await inboundTransport.stop()
}

/**
Expand Down
64 changes: 49 additions & 15 deletions packages/core/src/agent/MessageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class MessageSender {
private didResolverService: DidResolverService
private didCommDocumentService: DidCommDocumentService
private eventEmitter: EventEmitter
public readonly outboundTransports: OutboundTransport[] = []
private _outboundTransports: OutboundTransport[] = []

public constructor(
envelopeService: EnvelopeService,
Expand All @@ -61,11 +61,20 @@ export class MessageSender {
this.didResolverService = didResolverService
this.didCommDocumentService = didCommDocumentService
this.eventEmitter = eventEmitter
this.outboundTransports = []
this._outboundTransports = []
}

public get outboundTransports() {
return this._outboundTransports
}

public registerOutboundTransport(outboundTransport: OutboundTransport) {
this.outboundTransports.push(outboundTransport)
this._outboundTransports.push(outboundTransport)
}

public async unregisterOutboundTransport(outboundTransport: OutboundTransport) {
this._outboundTransports = this.outboundTransports.filter((transport) => transport !== outboundTransport)
await outboundTransport.stop()
}

public async packMessage(
Expand Down Expand Up @@ -185,7 +194,7 @@ export class MessageSender {
transportPriority?: TransportPriorityOptions
}
) {
const { agentContext, connection, outOfBand, sessionId, message } = outboundMessageContext
const { agentContext, connection, outOfBand, message } = outboundMessageContext
const errors: Error[] = []

if (!connection) {
Expand All @@ -201,17 +210,9 @@ export class MessageSender {
connectionId: connection.id,
})

let session: TransportSession | undefined

if (sessionId) {
session = this.transportService.findSessionById(sessionId)
}
if (!session) {
// Try to send to already open session
session = this.transportService.findSessionByConnectionId(connection.id)
}
const session = this.findSessionForOutboundContext(outboundMessageContext)

if (session?.inboundMessage?.hasReturnRouting(message.threadId)) {
if (session) {
this.logger.debug(`Found session with return routing for message '${message.id}' (connection '${connection.id}'`)
try {
await this.sendMessageToSession(agentContext, session, message)
Expand Down Expand Up @@ -343,6 +344,20 @@ export class MessageSender {
}

public async sendMessageToService(outboundMessageContext: OutboundMessageContext) {
const session = this.findSessionForOutboundContext(outboundMessageContext)

if (session) {
this.logger.debug(`Found session with return routing for message '${outboundMessageContext.message.id}'`)
try {
await this.sendMessageToSession(outboundMessageContext.agentContext, session, outboundMessageContext.message)
this.emitMessageSentEvent(outboundMessageContext, OutboundMessageSendStatus.SentToSession)
return
} catch (error) {
this.logger.debug(`Sending an outbound message via session failed with error: ${error.message}.`, error)
}
}

// If there is no session try sending to service instead
try {
await this.sendToService(outboundMessageContext)
this.emitMessageSentEvent(outboundMessageContext, OutboundMessageSendStatus.SentToTransport)
Expand Down Expand Up @@ -411,7 +426,7 @@ export class MessageSender {
for (const transport of this.outboundTransports) {
const protocolScheme = getProtocolScheme(service.serviceEndpoint)
if (!protocolScheme) {
this.logger.warn('Service does not have valid protocolScheme.')
this.logger.warn('Service does not have a protocol scheme.')
} else if (transport.supportedSchemes.includes(protocolScheme)) {
await transport.sendMessage(outboundPackage)
return
Expand All @@ -422,6 +437,25 @@ export class MessageSender {
})
}

private findSessionForOutboundContext(outboundContext: OutboundMessageContext) {
let session: TransportSession | undefined = undefined

// Use session id from outbound context if present, or use the session from the inbound message context
const sessionId = outboundContext.sessionId ?? outboundContext.inboundMessageContext?.sessionId

// Try to find session by id
if (sessionId) {
session = this.transportService.findSessionById(sessionId)
}

// Try to find session by connection id
if (!session && outboundContext.connection?.id) {
session = this.transportService.findSessionByConnectionId(outboundContext.connection.id)
}

return session && session.inboundMessage?.hasAnyReturnRoute() ? session : null
}

private async retrieveServicesByConnection(
agentContext: AgentContext,
connection: ConnectionRecord,
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/agent/models/OutboundMessageContext.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import type { InboundMessageContext } from './InboundMessageContext'
import type { Key } from '../../crypto'
import type { ConnectionRecord } from '../../modules/connections'
import type { ResolvedDidCommService } from '../../modules/didcomm'
Expand All @@ -17,6 +18,7 @@ export interface ServiceMessageParams {

export interface OutboundMessageContextParams {
agentContext: AgentContext
inboundMessageContext?: InboundMessageContext
associatedRecord?: BaseRecord<any, any, any>
connection?: ConnectionRecord
serviceParams?: ServiceMessageParams
Expand All @@ -31,6 +33,7 @@ export class OutboundMessageContext<T extends AgentMessage = AgentMessage> {
public outOfBand?: OutOfBandRecord
public associatedRecord?: BaseRecord<any, any, any>
public sessionId?: string
public inboundMessageContext?: InboundMessageContext
public readonly agentContext: AgentContext

public constructor(message: T, context: OutboundMessageContextParams) {
Expand All @@ -40,6 +43,7 @@ export class OutboundMessageContext<T extends AgentMessage = AgentMessage> {
this.outOfBand = context.outOfBand
this.serviceParams = context.serviceParams
this.associatedRecord = context.associatedRecord
this.inboundMessageContext = context.inboundMessageContext
this.agentContext = context.agentContext
}

Expand Down
Loading

0 comments on commit 582c711

Please sign in to comment.