Skip to content

Commit

Permalink
fix: ConnectionManager emit Messages (#190)
Browse files Browse the repository at this point in the history
  • Loading branch information
curtis-h authored and elribonazo committed Apr 25, 2024
1 parent 0163b4e commit e51359b
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 133 deletions.
163 changes: 62 additions & 101 deletions integration-tests/e2e-tests/src/abilities/WalletSdk.ts
Original file line number Diff line number Diff line change
@@ -1,196 +1,157 @@
import { Ability, Discardable, Initialisable, Interaction, Question, QuestionAdapter } from "@serenity-js/core"
import SDK from "@atala/prism-wallet-sdk"
import { Message } from "@atala/prism-wallet-sdk/build/typings/domain"
import axios from "axios"
import { CloudAgentConfiguration } from "../configuration/CloudAgentConfiguration"
import { Utils } from "../Utils"
import { InMemoryStore } from "../configuration/InMemoryStore"
const {
Agent,
ApiImpl,
Apollo,
BasicMediatorHandler,
Castor,
ConnectionsManager,
DIDCommWrapper,
Domain, ListenerKey,
Mercury,
PublicMediatorStore
} = SDK;
import { Ability, Discardable, Initialisable, Interaction, Question, QuestionAdapter } from "@serenity-js/core";
import SDK from "@atala/prism-wallet-sdk";
import { Message } from "@atala/prism-wallet-sdk/build/typings/domain";
import axios from "axios";
import { CloudAgentConfiguration } from "../configuration/CloudAgentConfiguration";
import { Utils } from "../Utils";
import { InMemoryStore } from "../configuration/InMemoryStore";

const { Agent, Apollo, Domain, ListenerKey, } = SDK;

export class WalletSdk extends Ability implements Initialisable, Discardable {
sdk!: SDK.Agent
messages: MessageQueue = new MessageQueue()
sdk!: SDK.Agent;
messages: MessageQueue = new MessageQueue();

static async withANewInstance(): Promise<Ability> {
const instance: SDK.Agent = await Utils.retry(2, async () => {
return await WalletSdkBuilder.createInstance()
})
return new WalletSdk(instance)
return await WalletSdkBuilder.createInstance();
});
return new WalletSdk(instance);
}

constructor(sdk: SDK.Agent) {
super()
this.sdk = sdk
super();
this.sdk = sdk;
}

static credentialOfferStackSize(): QuestionAdapter<number> {
return Question.about("credential offer stack", actor => {
return WalletSdk.as(actor).messages.credentialOfferStack.length
})
return WalletSdk.as(actor).messages.credentialOfferStack.length;
});
}

static issuedCredentialStackSize(): QuestionAdapter<number> {
return Question.about("issued credential stack", actor => {
return WalletSdk.as(actor).messages.issuedCredentialStack.length
})
return WalletSdk.as(actor).messages.issuedCredentialStack.length;
});
}

static proofOfRequestStackSize(): QuestionAdapter<number> {
return Question.about("proof of request stack", actor => {
return WalletSdk.as(actor).messages.proofRequestStack.length
})
return WalletSdk.as(actor).messages.proofRequestStack.length;
});
}

static execute(callback: (sdk: SDK.Agent, messages: {
credentialOfferStack: Message[],
issuedCredentialStack: Message[],
proofRequestStack: Message[]
credentialOfferStack: Message[];
issuedCredentialStack: Message[];
proofRequestStack: Message[];
}) => Promise<void>): Interaction {
return Interaction.where("#actor uses wallet sdk", async actor => {
await callback(WalletSdk.as(actor).sdk, {
credentialOfferStack: WalletSdk.as(actor).messages.credentialOfferStack,
issuedCredentialStack: WalletSdk.as(actor).messages.issuedCredentialStack,
proofRequestStack: WalletSdk.as(actor).messages.proofRequestStack
})
})
});
});
}

async discard(): Promise<void> {
await this.sdk.stop()
await this.sdk.stop();
}

async initialise(): Promise<void> {
this.sdk.addListener(
ListenerKey.MESSAGE, (messages: SDK.Domain.Message[]) => {
for (const message of messages) {
this.messages.enqueue(message)
this.messages.enqueue(message);
}
}
)
);

await this.sdk.start()
await this.sdk.start();
}

isInitialised(): boolean {
return this.sdk.state != "stopped"
return this.sdk.state != "stopped";
}
}

class WalletSdkBuilder {
private static async getMediatorDidThroughOob(): Promise<string> {
const response = await axios.get(CloudAgentConfiguration.mediatorOobUrl)
const encodedData = response.data.split("?_oob=")[1]
const oobData = JSON.parse(Buffer.from(encodedData, "base64").toString())
return oobData.from
const response = await axios.get(CloudAgentConfiguration.mediatorOobUrl);
const encodedData = response.data.split("?_oob=")[1];
const oobData = JSON.parse(Buffer.from(encodedData, "base64").toString());
return oobData.from;
}

static async createInstance() {
const apollo = new Apollo()
const castor = new Castor(apollo);
const store = new InMemoryStore()
const apollo = new Apollo();
const store = new InMemoryStore();
const pluto = new SDK.Pluto(store, apollo);
await pluto.start()

const api = new ApiImpl()
const didcomm = new DIDCommWrapper(apollo, castor, pluto)
const mercury = new Mercury(castor, didcomm, api)

const mediatorDID = Domain.DID.fromString(await WalletSdkBuilder.getMediatorDidThroughOob())
const mediatorStore = new PublicMediatorStore(pluto)

const mediatorHandler = new BasicMediatorHandler(
mediatorDID,
mercury,
mediatorStore,
)

const connectionsManager = new ConnectionsManager(
castor,
mercury,
pluto,
mediatorHandler,
)

const seed = apollo.createRandomSeed().seed
return new Agent(
apollo,
castor,
pluto,
mercury,
mediatorHandler,
connectionsManager,
seed,
)
const mediatorDID = Domain.DID.fromString(await WalletSdkBuilder.getMediatorDidThroughOob());

return Agent.initialize({ apollo, pluto, mediatorDID });
}
}

/**
* Helper class for message queueing processor
*/
class MessageQueue {
private processingId: NodeJS.Timeout | null = null
private queue: Message[] = []
private processingId: NodeJS.Timeout | null = null;
private queue: Message[] = [];

credentialOfferStack: Message[] = []
proofRequestStack: Message[] = []
issuedCredentialStack: Message[] = []
receivedMessages: string[] = []
credentialOfferStack: Message[] = [];
proofRequestStack: Message[] = [];
issuedCredentialStack: Message[] = [];
receivedMessages: string[] = [];

enqueue(message: Message) {
this.queue.push(message)
this.queue.push(message);

// auto start processing messages
if (!this.processingId) {
this.processMessages()
this.processMessages();
}
}

dequeue(): Message {
return this.queue.shift()!
return this.queue.shift()!;
}

// Check if the queue is empty
isEmpty(): boolean {
return this.queue.length === 0
return this.queue.length === 0;
}

// Get the number of messages in the queue
size(): number {
return this.queue.length
return this.queue.length;
}

processMessages() {
this.processingId = setInterval(() => {
if (!this.isEmpty()) {
const message: Message = this.dequeue()
const message: Message = this.dequeue();
// checks if sdk already received message
if (this.receivedMessages.includes(message.id)) {
return
return;
}

this.receivedMessages.push(message.id)
this.receivedMessages.push(message.id);

if (message.piuri.includes("/offer-credential")) {
this.credentialOfferStack.push(message)
this.credentialOfferStack.push(message);
} else if (message.piuri.includes("/present-proof")) {
this.proofRequestStack.push(message)
this.proofRequestStack.push(message);
} else if (message.piuri.includes("/issue-credential")) {
this.issuedCredentialStack.push(message)
this.issuedCredentialStack.push(message);
}
} else {
clearInterval(this.processingId!)
this.processingId = null
clearInterval(this.processingId!);
this.processingId = null;
}
}, 50)
}, 50);
}
}
53 changes: 21 additions & 32 deletions src/prism-agent/connectionsManager/ConnectionsManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ export class ConnectionsManager implements ConnectionsManagerClass {
async awaitMessageResponse(id: string): Promise<Message | undefined> {
console.log("Deprecated, use agent.addListener('THREAD-{{Your thread || messageId}}', fn), this method does not support live-mode.");
const messages = await this.mediationHandler.pickupUnreadMessages(10);
return messages
.find(({ message }) => message.thid === id)?.message
return messages.find(x => x.message.thid === id)?.message;
}

/**
Expand All @@ -133,53 +132,43 @@ export class ConnectionsManager implements ConnectionsManagerClass {
}

if (unreadMessages.length) {
const received = unreadMessages
.filter(({ message }) => message.direction === MessageDirection.RECEIVED)
const received = unreadMessages.filter(x => x.message.direction === MessageDirection.RECEIVED);
const messages = received.map(x => x.message);
const messageIds = received.map(x => x.attachmentId);

const messages = received
.map(({ message }) => message);

const messageIds = received
.map(({ attachmentId }) => attachmentId);

if (messages.length) {
if (messages.length > 0) {
await this.pluto.storeMessages(messages);
}

const revokeMessages = messages
.filter((message) => message.piuri === ProtocolType.PrismRevocation);
const revokeMessages = messages.filter(x => x.piuri === ProtocolType.PrismRevocation);
const allMessages = await this.pluto.getAllMessages();

const allMessages = await this.pluto.getAllMessages()
for (let message of revokeMessages) {
const revokeMessage = RevocationNotification.fromMessage(message)
const revokeMessage = RevocationNotification.fromMessage(message);
const threadId = revokeMessage.body.issueCredentialProtocolThreadId;

const matchingMessages = allMessages.filter(({ thid, piuri }) =>
thid === threadId &&
piuri === ProtocolType.DidcommIssueCredential
);

if (matchingMessages.length) {

if (matchingMessages.length > 0) {
for (let message of matchingMessages) {
const issueMessage = IssueCredential.fromMessage(message)
const issueMessage = IssueCredential.fromMessage(message);
const credential = await this.agentCredentials.processIssuedCredentialMessage(
issueMessage
)
await this.pluto.revokeCredential(credential)
this.events.emit(ListenerKey.REVOKE, credential)
);
await this.pluto.revokeCredential(credential);
this.events.emit(ListenerKey.REVOKE, credential);
}
}

}

if (messageIds.length) {
await this.mediationHandler.registerMessagesAsRead(messageIds);
}

this.events.emit(ListenerKey.MESSAGE, unreadMessages);


this.events.emit(ListenerKey.MESSAGE, messages);
}
}

Expand Down Expand Up @@ -272,16 +261,16 @@ export class ConnectionsManager implements ConnectionsManagerClass {
return;
}
const currentMediator = this.mediationHandler.mediator.mediatorDID;
const resolvedMediator = await this.castor.resolveDID(currentMediator.toString())
const resolvedMediator = await this.castor.resolveDID(currentMediator.toString());
const hasWebsocket = resolvedMediator.services.find(({ serviceEndpoint: { uri } }) =>
uri.startsWith("ws://") ||
uri.startsWith("wss://")
)
);
if (!hasWebsocket) {
const timeInterval = Math.max(iterationPeriod, 5) * 1000;
this.cancellable = new CancellableTask(async () => {
const unreadMessages = await this.mediationHandler.pickupUnreadMessages(10);
await this.processMessages(unreadMessages)
await this.processMessages(unreadMessages);
}, timeInterval);
} else {
//Connecting to websockets, do not repeat the task
Expand All @@ -304,13 +293,13 @@ export class ConnectionsManager implements ConnectionsManagerClass {
message: message,
attachmentId: attachment.id
}
]
];
}, []);

await this.processMessages(unreadMessages)
await this.processMessages(unreadMessages);
}
)
})
);
});
}

this.cancellable.then().catch((err) => {
Expand Down

0 comments on commit e51359b

Please sign in to comment.