Skip to content

Commit

Permalink
fix: e2e issues with latest websocket changes (#200)
Browse files Browse the repository at this point in the history
  • Loading branch information
elribonazo committed Apr 25, 2024
1 parent 2ceeca4 commit 31689c0
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 67 deletions.
7 changes: 7 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
],
"type": "node"
},
{
"command": "yarn test:sdk",
"name": "Run test:sdk",
"request": "launch",
"type": "node-terminal",
"cwd": "${workspaceFolder}/integration-tests/e2e-tests"
},
{
"name": "TESTS",
"type": "node",
Expand Down
12 changes: 6 additions & 6 deletions integration-tests/e2e-tests/src/steps/LifecycleSteps.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {Before, BeforeAll} from "@cucumber/cucumber"
import {Actor, actorCalled, Cast, engage, TakeNotes} from "@serenity-js/core"
import {CallAnApi} from "@serenity-js/rest"
import {Utils} from "../Utils"
import {WalletSdk} from "../abilities/WalletSdk"
import {axiosInstance, CloudAgentConfiguration} from "../configuration/CloudAgentConfiguration"
import { Before, BeforeAll } from "@cucumber/cucumber"
import { Actor, actorCalled, Cast, engage, TakeNotes } from "@serenity-js/core"
import { CallAnApi } from "@serenity-js/rest"
import { Utils } from "../Utils"
import { WalletSdk } from "../abilities/WalletSdk"
import { axiosInstance, CloudAgentConfiguration } from "../configuration/CloudAgentConfiguration"

BeforeAll(async () => {
Utils.prepareNotes()
Expand Down
46 changes: 13 additions & 33 deletions src/prism-agent/connectionsManager/ConnectionsManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ export class ConnectionsManager implements ConnectionsManagerClass {
async processMessages(unreadMessages: {
attachmentId: string;
message: Message;
}[]): Promise<void> {
}[] = []): Promise<void> {
if (!this.mediationHandler.mediator) {
throw new AgentError.NoMediatorAvailableError();
}
Expand Down Expand Up @@ -268,45 +268,25 @@ export class ConnectionsManager implements ConnectionsManagerClass {
const currentMediator = this.mediationHandler.mediator.mediatorDID;
const resolvedMediator = await this.castor.resolveDID(currentMediator.toString());
const hasWebsocket = resolvedMediator.services.find(({ serviceEndpoint: { uri } }) =>
(
uri.startsWith("ws://") ||
uri.startsWith("wss://")
) && this.withWebsocketsExperiment
(
uri.startsWith("ws://") ||
uri.startsWith("wss://")
)
);
if (!hasWebsocket) {
const timeInterval = Math.max(iterationPeriod, 5) * 1000;
this.cancellable = new CancellableTask(async () => {
const unreadMessages = await this.mediationHandler.pickupUnreadMessages(10);
await this.processMessages(unreadMessages);
}, timeInterval);
} else {
//Connecting to websockets, do not repeat the task
if (hasWebsocket && this.withWebsocketsExperiment) {
this.cancellable = new CancellableTask(async (signal) => {
this.mediationHandler.listenUnreadMessages(
signal,
hasWebsocket.serviceEndpoint.uri,
async (messages) => {
const unreadMessages = messages.reduce<{
attachmentId: string;
message: Message;
}[]>((unreads, message) => {
const attachment = message.attachments.at(0);
if (!attachment) {
return unreads;
}
return [
...unreads,
{
message: message,
attachmentId: attachment.id
}
];
}, []);

await this.processMessages(unreadMessages);
}
(messages) => this.processMessages(messages)
);
});
} else {
const timeInterval = Math.max(iterationPeriod, 5) * 1000;
this.cancellable = new CancellableTask(async () => {
const unreadMessages = await this.mediationHandler.pickupUnreadMessages(10);
await this.processMessages(unreadMessages);
}, timeInterval);
}

this.cancellable.then().catch((err) => {
Expand Down
18 changes: 10 additions & 8 deletions src/prism-agent/helpers/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ export class CancellableTask<T> {
}

private loopOnTaskEvery(task: Task<T>, reject: (reason?: Error) => void, signal: AbortSignal) {
task(signal)
.then(() => {
this.clearTimer();
this.timer = setTimeout(() => {
this.loopOnTaskEvery(task, reject, signal);
}, this.period);
})
.catch(reject);
if (!this.controller.signal.aborted) {
task(signal)
.then(() => {
this.clearTimer();
this.timer = setTimeout(() => {
this.loopOnTaskEvery(task, reject, signal);
}, this.period);
})
.catch(reject);
}
}

cancel() {
Expand Down
12 changes: 8 additions & 4 deletions src/prism-agent/mediator/BasicMediatorHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,18 @@ export class BasicMediatorHandler implements MediatorHandler {
listenUnreadMessages(
signal: AbortSignal,
serviceEndpointUri: string,
onMessage: EventCallback
onMessage: (messages: {
attachmentId: string;
message: Message;
}[]) => void | Promise<void>
) {
//Todo: we may want to abstract this to allow users to use their own native implementations for websockets
//Or potentially be TCP sockets directly, this can be used in electron and nodejs can establish tcp connections directly.
const socket = new WebSocket(serviceEndpointUri);
signal.addEventListener("abort", () => {
socket.close()
if (socket.readyState === socket.OPEN) {
socket.close()
}
});

socket.addEventListener("open", async () => {
Expand All @@ -229,8 +234,7 @@ export class BasicMediatorHandler implements MediatorHandler {
decryptMessage.piuri === ProtocolType.PickupStatus ||
decryptMessage.piuri === ProtocolType.PickupDelivery) {
const delivered = await new PickupRunner(decryptMessage, this.mercury).run()
const deliveredMessages = delivered.map(({ message }) => message);
onMessage(deliveredMessages)
await onMessage(delivered)
}
})

Expand Down
5 changes: 4 additions & 1 deletion src/prism-agent/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ export abstract class MediatorHandler {
abstract listenUnreadMessages(
signal: AbortSignal,
serviceEndpointUri: string,
onMessage: EventCallback
onMessage: (messages: {
attachmentId: string;
message: Message;
}[]) => void | Promise<void>
): void
}
58 changes: 45 additions & 13 deletions tests/agent/Agent.ConnectionsManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ async function createBasicMediationHandler(
handler: BasicMediatorHandler
}
> {

const seed = apollo.createRandomSeed().seed;
const keypair = apollo.createPrivateKey({
type: KeyTypes.EC,
Expand Down Expand Up @@ -66,24 +65,21 @@ async function createBasicMediationHandler(
}
}

jest.mock('isows', () => ({
WebSocket: jest.fn(() => ({
addEventListener: jest.fn(),
send: jest.fn(),
close: jest.fn(),
})),
}));

describe("ConnectionsManager tests", () => {

beforeEach(() => {
jest.mock('isows', () => ({
WebSocket: jest.fn(() => ({
addEventListener: jest.fn(),
send: jest.fn(),
close: jest.fn(),
})),
}));
})

afterEach(() => {
jest.restoreAllMocks();
});

it("Should use websockets if the mediator's did endpoint uri contains ws or wss and agent options have the opt in", async () => {
it.only("Should use websockets if the mediator's did endpoint uri contains ws or wss and agent options have the opt in", async () => {
const services = [
new Service(
"#didcomm-1",
Expand All @@ -92,7 +88,15 @@ describe("ConnectionsManager tests", () => {
)
];
const ConnectionsManager = jest.requireActual('../../src/prism-agent/connectionsManager/ConnectionsManager').ConnectionsManager;
const BasicMediatorHandler = jest.requireMock('../../src/prism-agent/mediator/BasicMediatorHandler').BasicMediatorHandler;
const BasicMediatorHandler = jest.requireActual('../../src/prism-agent/mediator/BasicMediatorHandler').BasicMediatorHandler;
jest.mock('isows', () => ({
WebSocket: jest.fn(() => ({
addEventListener: jest.fn(),
send: jest.fn(),
close: jest.fn(),
})),
}));

const { manager, handler } = await createBasicMediationHandler(
ConnectionsManager,
BasicMediatorHandler,
Expand Down Expand Up @@ -121,6 +125,13 @@ describe("ConnectionsManager tests", () => {
];
const ConnectionsManager = jest.requireActual('../../src/prism-agent/connectionsManager/ConnectionsManager').ConnectionsManager;
const BasicMediatorHandler = jest.requireMock('../../src/prism-agent/mediator/BasicMediatorHandler').BasicMediatorHandler;
jest.mock('isows', () => ({
WebSocket: jest.fn(() => ({
addEventListener: jest.fn(),
send: jest.fn(),
close: jest.fn(),
})),
}));
const { manager, handler } = await createBasicMediationHandler(
ConnectionsManager,
BasicMediatorHandler,
Expand All @@ -145,6 +156,13 @@ describe("ConnectionsManager tests", () => {
];
const ConnectionsManager = jest.requireActual('../../src/prism-agent/connectionsManager/ConnectionsManager').ConnectionsManager;
const BasicMediatorHandler = jest.requireMock('../../src/prism-agent/mediator/BasicMediatorHandler').BasicMediatorHandler;
jest.mock('isows', () => ({
WebSocket: jest.fn(() => ({
addEventListener: jest.fn(),
send: jest.fn(),
close: jest.fn(),
})),
}));
const { manager, handler } = await createBasicMediationHandler(
ConnectionsManager,
BasicMediatorHandler,
Expand Down Expand Up @@ -174,6 +192,13 @@ describe("ConnectionsManager tests", () => {
];
const ConnectionsManager = jest.requireActual('../../src/prism-agent/connectionsManager/ConnectionsManager').ConnectionsManager;
const BasicMediatorHandler = jest.requireMock('../../src/prism-agent/mediator/BasicMediatorHandler').BasicMediatorHandler;
jest.mock('isows', () => ({
WebSocket: jest.fn(() => ({
addEventListener: jest.fn(),
send: jest.fn(),
close: jest.fn(),
})),
}));
const { manager, handler } = await createBasicMediationHandler(
ConnectionsManager,
BasicMediatorHandler,
Expand Down Expand Up @@ -203,6 +228,13 @@ describe("ConnectionsManager tests", () => {
];
const ConnectionsManager = jest.requireActual('../../src/prism-agent/connectionsManager/ConnectionsManager').ConnectionsManager;
const BasicMediatorHandler = jest.requireMock('../../src/prism-agent/mediator/BasicMediatorHandler').BasicMediatorHandler;
jest.mock('isows', () => ({
WebSocket: jest.fn(() => ({
addEventListener: jest.fn(),
send: jest.fn(),
close: jest.fn(),
})),
}));
const { manager, handler } = await createBasicMediationHandler(
ConnectionsManager,
BasicMediatorHandler,
Expand Down
4 changes: 2 additions & 2 deletions tests/agent/mocks/ConnectionManagerMock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ export class ConnectionsManagerMock implements ConnectionsManagerClass {
pickupUnreadMessages: function (limit: number): Promise<{ attachmentId: string; message: Message; }[]> {
throw new Error("Mock pickupUnreadMessages Function not implemented.");
},
listenUnreadMessages: function (signal: AbortSignal, serviceEndpointUri: string, onMessage: EventCallback): void {
throw new Error("Mock listenUnreadMessages Function not implemented.");
listenUnreadMessages: function (signal: AbortSignal, serviceEndpointUri: string, onMessage: (messages: { attachmentId: string; message: Message; }[]) => void | Promise<void>): void {
throw new Error("Function not implemented.");
}
};

Expand Down

0 comments on commit 31689c0

Please sign in to comment.