Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: outbound message send via session #1335

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a56036a
chore: wip outbound message send via session
Feb 17, 2023
b3c7a54
chore: make some progress add some logging
Feb 17, 2023
356a0b5
chore: wip - possibly working
Feb 17, 2023
ff98207
chore: wip broken
Feb 21, 2023
b8a9b47
chore: almost there
Feb 21, 2023
27cc421
chore: still need to tidy up but it worx
Feb 21, 2023
8e2148f
feat: use session based connection for sending without outbound
Feb 22, 2023
ec30c75
feat: added protocol v2 tests and renamed test files
jimezesinachi Feb 22, 2023
634a86f
chore: minor cosmetics of test naming
Feb 22, 2023
6f98f98
chore: code review feedback
Feb 23, 2023
fc573c0
chore: merge remote-tracking branch 'upstream/main' into feat/outboun…
Feb 23, 2023
a599cfb
chore: code review feedback
Feb 24, 2023
42104f1
chore: code review feedback working with debug log
Feb 28, 2023
d42fc89
chore: code review feedback
Feb 28, 2023
88e8172
chore: lint
Feb 28, 2023
aa5a5d7
chore: fix typing and logic for agentcontext optional
Feb 28, 2023
28bdc73
chore: remove non-applicable tests
Mar 1, 2023
d6dc513
Merge remote-tracking branch 'upstream/main' into feat/outbound-messa…
TimoGlastra Mar 8, 2023
32c9a29
chore: small tweaks
TimoGlastra Mar 8, 2023
1f8cc8b
chore: additional tweaks
TimoGlastra Mar 8, 2023
d918b3f
refactor: session logic
TimoGlastra Mar 8, 2023
c37d970
refactor: more tweaks
TimoGlastra Mar 8, 2023
8932ec2
keep aligned with previous implementations
TimoGlastra Mar 8, 2023
03e79f6
restore tests
TimoGlastra Mar 8, 2023
5f6dbb2
fix tests
TimoGlastra Mar 8, 2023
547372a
Merge branch 'main' into feat/outbound-message-send-via-session
morrieinmaas Mar 9, 2023
89019af
Merge branch 'main' into feat/outbound-message-send-via-session
TimoGlastra Mar 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions packages/core/src/agent/Agent.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import type { AgentDependencies } from './AgentDependencies'
import type { AgentModulesInput } from './AgentModules'
import type { AgentMessageReceivedEvent } from './Events'
import type { InboundTransport } from '../transport/InboundTransport'
import type { OutboundTransport } from '../transport/OutboundTransport'
import type { InitConfig } from '../types'
import type { AgentDependencies } from './AgentDependencies'
import type { AgentModulesInput } from './AgentModules'
import type { AgentMessageReceivedEvent } from './Events'
import type { Subscription } from 'rxjs'

import { Subject } from 'rxjs'
Expand Down Expand Up @@ -141,6 +141,10 @@ export class Agent<AgentModules extends AgentModulesInput = any> extends BaseAge
this.messageSender.registerOutboundTransport(outboundTransport)
}
morrieinmaas marked this conversation as resolved.
Show resolved Hide resolved

public async resetOutboundTransport() {
await this.messageSender.resetOutboundTransport()
}
morrieinmaas marked this conversation as resolved.
Show resolved Hide resolved

public get outboundTransports() {
return this.messageSender.outboundTransports
}
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/agent/Dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class Dispatcher {
outboundMessage = await messageHandler.handle(messageContext)
} catch (error) {
const problemReportMessage = error.problemReport

if (problemReportMessage instanceof ProblemReportMessage && messageContext.connection) {
problemReportMessage.setThread({
threadId: message.threadId,
Expand All @@ -78,6 +77,9 @@ class Dispatcher {
}

if (outboundMessage) {
// Store the sessionId of the inbound message, if there is one, so messages can later be send without
// outbound transport.
outboundMessage.sessionIdFromInbound = messageContext.sessionId
if (outboundMessage.isOutboundServiceMessage()) {
await this.messageSender.sendMessageToService(outboundMessage)
} else {
Expand Down
31 changes: 23 additions & 8 deletions packages/core/src/agent/MessageReceiver.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import type { AgentMessage } from './AgentMessage'
import type { DecryptedMessageContext } from './EnvelopeService'
import type { TransportSession } from './TransportService'
import type { AgentContext } from './context'
import type { ConnectionRecord } from '../modules/connections'
import type { InboundTransport } from '../transport'
import type { EncryptedMessage, PlaintextMessage } from '../types'
import type { AgentMessage } from './AgentMessage'
import type { DecryptedMessageContext, EnvelopeKeys } from './EnvelopeService'
morrieinmaas marked this conversation as resolved.
Show resolved Hide resolved
import type { TransportSession } from './TransportService'
import type { AgentContext } from './context'

import { InjectionSymbols } from '../constants'
import { AriesFrameworkError } from '../error'
Expand Down Expand Up @@ -85,7 +85,7 @@ export class MessageReceiver {
if (this.isEncryptedMessage(inboundMessage)) {
await this.receiveEncryptedMessage(agentContext, inboundMessage as EncryptedMessage, session)
} else if (this.isPlaintextMessage(inboundMessage)) {
await this.receivePlaintextMessage(agentContext, inboundMessage, connection)
await this.receivePlaintextMessage(agentContext, inboundMessage, connection, session)
morrieinmaas marked this conversation as resolved.
Show resolved Hide resolved
} else {
throw new AriesFrameworkError('Unable to parse incoming message: unrecognized format')
}
Expand All @@ -98,10 +98,12 @@ export class MessageReceiver {
private async receivePlaintextMessage(
agentContext: AgentContext,
plaintextMessage: PlaintextMessage,
connection?: ConnectionRecord
connection?: ConnectionRecord,
session?: TransportSession
) {
const message = await this.transformAndValidate(agentContext, plaintextMessage)
const messageContext = new InboundMessageContext(message, { connection, agentContext })
const messageContext = new InboundMessageContext(message, { connection, agentContext, sessionId: session?.id })

await this.dispatcher.dispatch(messageContext)
}

Expand Down Expand Up @@ -150,11 +152,24 @@ export class MessageReceiver {
session.connectionId = connection?.id
messageContext.sessionId = session.id
this.transportService.saveSession(session)
} else if (senderKey && recipientKey && message.service?.serviceEndpoint && session) {
morrieinmaas marked this conversation as resolved.
Show resolved Hide resolved
const keys = {
recipientKeys: [senderKey],
routingKeys: [],
senderKey: recipientKey,
}
session.keys = keys
session.inboundMessage = message
// We allow unready connections to be attached to the session as we want to be able to
// use return routing to make connections. This is especially useful for creating connections
// with mediators when you don't have a public endpoint yet.
session.connectionId = connection?.id
messageContext.sessionId = session.id
this.transportService.saveSession(session)
} else if (session) {
// No need to wait for session to stay open if we're not actually going to respond to the message.
await session.close()
}

await this.dispatcher.dispatch(messageContext)
}

Expand Down
89 changes: 74 additions & 15 deletions packages/core/src/agent/MessageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class MessageSender {
private didResolverService: DidResolverService
private didCommDocumentService: DidCommDocumentService
private eventEmitter: EventEmitter
public readonly outboundTransports: OutboundTransport[] = []
public outboundTransports: OutboundTransport[] = []
morrieinmaas marked this conversation as resolved.
Show resolved Hide resolved

public constructor(
envelopeService: EnvelopeService,
Expand All @@ -68,6 +68,15 @@ export class MessageSender {
this.outboundTransports.push(outboundTransport)
}

public async resetOutboundTransport() {
// NOTE: we could also make this void instead of an async fct.
morrieinmaas marked this conversation as resolved.
Show resolved Hide resolved
// Maybe void is a bit dirty?
this.outboundTransports.forEach(async (obt) => {
await obt.stop()
})
this.outboundTransports = []
}

public async packMessage(
agentContext: AgentContext,
{
Expand Down Expand Up @@ -185,7 +194,7 @@ export class MessageSender {
transportPriority?: TransportPriorityOptions
}
) {
const { agentContext, connection, outOfBand, sessionId, message } = outboundMessageContext
const { agentContext, connection, outOfBand, sessionId, message, sessionIdFromInbound } = outboundMessageContext
const errors: Error[] = []

if (!connection) {
Expand All @@ -203,8 +212,10 @@ export class MessageSender {

let session: TransportSession | undefined

if (sessionId) {
session = this.transportService.findSessionById(sessionId)
if (sessionIdFromInbound) {
session = this.transportService.findSessionById(sessionIdFromInbound)
} else if (sessionId) {
morrieinmaas marked this conversation as resolved.
Show resolved Hide resolved
this.transportService.findSessionById(sessionId)
morrieinmaas marked this conversation as resolved.
Show resolved Hide resolved
}
if (!session) {
// Try to send to already open session
Expand Down Expand Up @@ -348,7 +359,7 @@ export class MessageSender {
this.emitMessageSentEvent(outboundMessageContext, OutboundMessageSendStatus.SentToTransport)
} catch (error) {
this.logger.error(
`Message is undeliverable to service with id ${outboundMessageContext.serviceParams?.service.id}: ${error.message}`,
`Message is undeliverable to service with id ${outboundMessageContext.serviceParams?.service.id}: ${error}`,
morrieinmaas marked this conversation as resolved.
Show resolved Hide resolved
{
message: outboundMessageContext.message,
error,
Expand All @@ -364,14 +375,14 @@ export class MessageSender {
}

private async sendToService(outboundMessageContext: OutboundMessageContext) {
const { agentContext, message, serviceParams, connection } = outboundMessageContext
const { agentContext, message, serviceParams, connection, sessionIdFromInbound, sessionId } = outboundMessageContext

if (!serviceParams) {
throw new AriesFrameworkError('No service parameters found in outbound message context')
}
const { service, senderKey, returnRoute } = serviceParams

if (this.outboundTransports.length === 0) {
if (this.outboundTransports.length === 0 && !sessionIdFromInbound && !sessionId) {
throw new AriesFrameworkError('Agent has no outbound transport!')
}

Expand All @@ -388,7 +399,7 @@ export class MessageSender {

// Set return routing for message if requested
if (returnRoute) {
message.setReturnRouting(ReturnRouteTypes.all)
message.setReturnRouting(ReturnRouteTypes.all, message.threadId)
}

try {
Expand All @@ -408,13 +419,61 @@ export class MessageSender {
const outboundPackage = await this.packMessage(agentContext, { message, keys, endpoint: service.serviceEndpoint })
outboundPackage.endpoint = service.serviceEndpoint
outboundPackage.connectionId = connection?.id
for (const transport of this.outboundTransports) {
const protocolScheme = getProtocolScheme(service.serviceEndpoint)
if (!protocolScheme) {
this.logger.warn('Service does not have valid protocolScheme.')
} else if (transport.supportedSchemes.includes(protocolScheme)) {
await transport.sendMessage(outboundPackage)
return

morrieinmaas marked this conversation as resolved.
Show resolved Hide resolved
// Outbound transport is present
if (this.outboundTransports.length !== 0) {
for (const transport of this.outboundTransports) {
const protocolScheme = getProtocolScheme(service.serviceEndpoint)
if (!protocolScheme) {
this.logger.warn('Service does not have valid protocolScheme.')
morrieinmaas marked this conversation as resolved.
Show resolved Hide resolved
} else if (transport.supportedSchemes.includes(protocolScheme)) {
await transport.sendMessage(outboundPackage)
return
}
}
// No outbound transport: Try retrieving session-based connection instead
} else if (sessionId || sessionIdFromInbound) {
const session = sessionIdFromInbound
? this.transportService.findSessionById(sessionIdFromInbound)
: sessionId
? this.transportService.findSessionById(sessionId)
: undefined

if (session?.inboundMessage?.hasReturnRouting(message.threadId)) {
this.logger.debug(
`Found session with return routing for message '${message.id}' (connection '${connection?.id}'`
)
try {
await this.sendMessageToService(outboundMessageContext)
this.emitMessageSentEvent(outboundMessageContext, OutboundMessageSendStatus.SentToSession)
return
} catch (error) {
this.logger.debug(`Sending an outbound message via session failed with error: ${error.message}.`, error)
throw new MessageSendingError(
`Unable to send message to service: ${service.serviceEndpoint} ${JSON.stringify(
message.threadId
morrieinmaas marked this conversation as resolved.
Show resolved Hide resolved
)} ${JSON.stringify(session)}`,
{
outboundMessageContext,
}
)
}
} else if (outboundMessageContext.serviceParams?.service.serviceEndpoint && session) {
try {
await this.sendMessageToSession(agentContext, session, message)
this.emitMessageSentEvent(outboundMessageContext, OutboundMessageSendStatus.SentToSession)
return
} catch (error) {
this.logger.debug(`Sending an outbound message via session failed with error: ${error.message}.`, error)
morrieinmaas marked this conversation as resolved.
Show resolved Hide resolved
throw new MessageSendingError(
`Unable to send message to service: ${
outboundMessageContext.serviceParams?.service.serviceEndpoint
} with threadId ${JSON.stringify(message.threadId)} `,
morrieinmaas marked this conversation as resolved.
Show resolved Hide resolved
{
outboundMessageContext,
}
)
}
}
}
throw new MessageSendingError(`Unable to send message to service: ${service.serviceEndpoint}`, {
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/agent/TransportService.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { AgentMessage } from './AgentMessage'
import type { EnvelopeKeys } from './EnvelopeService'
import type { DidDocument } from '../modules/dids'
import type { EncryptedMessage } from '../types'
import type { AgentMessage } from './AgentMessage'
import type { EnvelopeKeys } from './EnvelopeService'

import { DID_COMM_TRANSPORT_QUEUE } from '../constants'
import { injectable } from '../plugins'
Expand Down
7 changes: 6 additions & 1 deletion packages/core/src/agent/models/OutboundMessageContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { OutOfBandRecord } from '../../modules/oob'
import type { BaseRecord } from '../../storage/BaseRecord'
import type { AgentMessage } from '../AgentMessage'
import type { AgentContext } from '../context'
import type { InboundMessageContext } from './InboundMessageContext'

import { AriesFrameworkError } from '../../error'

Expand All @@ -17,6 +18,7 @@ export interface ServiceMessageParams {

export interface OutboundMessageContextParams {
agentContext: AgentContext
inboundMessageContext?: InboundMessageContext
associatedRecord?: BaseRecord<any, any, any>
connection?: ConnectionRecord
serviceParams?: ServiceMessageParams
Expand All @@ -26,15 +28,17 @@ export interface OutboundMessageContextParams {

export class OutboundMessageContext<T extends AgentMessage = AgentMessage> {
public message: T
public sessionIdFromInbound?: string | undefined
morrieinmaas marked this conversation as resolved.
Show resolved Hide resolved
public connection?: ConnectionRecord
public serviceParams?: ServiceMessageParams
public outOfBand?: OutOfBandRecord
public associatedRecord?: BaseRecord<any, any, any>
public sessionId?: string
public readonly agentContext: AgentContext

public constructor(message: T, context: OutboundMessageContextParams) {
public constructor(message: T, context: OutboundMessageContextParams, inboundContext?: InboundMessageContext) {
this.message = message
this.sessionIdFromInbound = inboundContext?.sessionId ? inboundContext.sessionId : undefined
morrieinmaas marked this conversation as resolved.
Show resolved Hide resolved
this.connection = context.connection
this.sessionId = context.sessionId
this.outOfBand = context.outOfBand
Expand Down Expand Up @@ -67,6 +71,7 @@ export class OutboundMessageContext<T extends AgentMessage = AgentMessage> {
return {
message: this.message,
outOfBand: this.outOfBand,
sessionIdFromInbound: this.sessionIdFromInbound,
associatedRecord: this.associatedRecord,
sessionId: this.sessionId,
serviceParams: this.serviceParams,
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/modules/proofs/ProofsApi.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { AgentMessage } from '../../agent/AgentMessage'
import type { Query } from '../../storage/StorageService'
import type {
AcceptProofOptions,
AcceptProofProposalOptions,
Expand All @@ -21,8 +23,6 @@ import type {
import type { ProofProtocol } from './protocol/ProofProtocol'
import type { ProofFormatsFromProtocols } from './protocol/ProofProtocolOptions'
import type { ProofExchangeRecord } from './repository/ProofExchangeRecord'
import type { AgentMessage } from '../../agent/AgentMessage'
import type { Query } from '../../storage/StorageService'

import { injectable } from 'tsyringe'

Expand Down
20 changes: 10 additions & 10 deletions packages/core/src/modules/proofs/protocol/BaseProofProtocol.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
import type { AgentMessage } from '../../../agent/AgentMessage'
morrieinmaas marked this conversation as resolved.
Show resolved Hide resolved
import type { FeatureRegistry } from '../../../agent/FeatureRegistry'
import type { AgentContext } from '../../../agent/context/AgentContext'
import type { InboundMessageContext } from '../../../agent/models/InboundMessageContext'
import type { DependencyManager } from '../../../plugins'
import type { Query } from '../../../storage/StorageService'
import type { ProblemReportMessage } from '../../problem-reports'
import type { ProofStateChangedEvent } from '../ProofEvents'
import type { ExtractProofFormats, ProofFormatService } from '../formats'
import type { ProofExchangeRecord } from '../repository'
import type { ProofProtocol } from './ProofProtocol'
import type {
CreateProofProposalOptions,
Expand All @@ -16,16 +26,6 @@ import type {
SelectCredentialsForRequestOptions,
SelectCredentialsForRequestReturn,
} from './ProofProtocolOptions'
import type { AgentMessage } from '../../../agent/AgentMessage'
import type { FeatureRegistry } from '../../../agent/FeatureRegistry'
import type { AgentContext } from '../../../agent/context/AgentContext'
import type { InboundMessageContext } from '../../../agent/models/InboundMessageContext'
import type { DependencyManager } from '../../../plugins'
import type { Query } from '../../../storage/StorageService'
import type { ProblemReportMessage } from '../../problem-reports'
import type { ProofStateChangedEvent } from '../ProofEvents'
import type { ExtractProofFormats, ProofFormatService } from '../formats'
import type { ProofExchangeRecord } from '../repository'

import { EventEmitter } from '../../../agent/EventEmitter'
import { DidCommMessageRepository } from '../../../storage'
Expand Down
Loading