From e401a1ea6ef666a29afc0b1bb934cae91b321da7 Mon Sep 17 00:00:00 2001 From: vasyas Date: Wed, 22 Nov 2023 18:22:19 +0100 Subject: [PATCH] Ability to cancel subscription by throwing exception from supplier --- packages/core/src/RpcSession.ts | 7 +++--- packages/core/src/local.ts | 7 +++++- packages/core/tests/topicBugs.ts | 41 -------------------------------- packages/core/tests/topics.ts | 38 ++++++++++++++++++++++++++++- 4 files changed, 46 insertions(+), 47 deletions(-) diff --git a/packages/core/src/RpcSession.ts b/packages/core/src/RpcSession.ts index 003df04..13efc9f 100644 --- a/packages/core/src/RpcSession.ts +++ b/packages/core/src/RpcSession.ts @@ -421,14 +421,13 @@ export class RpcSession { try { const ctx = this.createContext(messageId, topic.getTopicName()) - // topic.subscribeSession will subscribe even when throwing the error, so lets add it to the list - this.subscriptions.push({topic, params}) - this.listeners.subscribed(this.subscriptions.length) - const subscribeTopic = (p = params) => topic.subscribeSession(this, p, messageId, ctx) const r = await this.localMiddleware(ctx, subscribeTopic, params, MessageType.Subscribe) this.send(MessageType.Data, messageId, topic.getTopicName(), cloneParams(params), r) + + this.subscriptions.push({topic, params}) + this.listeners.subscribed(this.subscriptions.length) } catch (e) { log.error(`Unable to subscribe to topic ${topic.getTopicName()}`, e) this.sendError(messageId, e) diff --git a/packages/core/src/local.ts b/packages/core/src/local.ts index 0bdc692..ad78723 100644 --- a/packages/core/src/local.ts +++ b/packages/core/src/local.ts @@ -92,7 +92,12 @@ export class LocalTopicImpl extends TopicImpl implements Topic { assert.equal(0, Object.keys(item["subscriptions"]).length) }) - - it("exception in supplier leaves session referenced on unsubscribe", async () => { - const services = { - item: new LocalTopicImpl(async () => { - throw new Error() - }), - } - - await startTestServer(services) - - let ws - - const client = await createRpcClient(async () => { - ws = new WebSocket(`ws://localhost:${TEST_PORT}`) - return wrapWebsocket(ws) - }) - - client.remote.item - .subscribe(() => {}, {}) - .catch(e => { - // ignored - }) - - // pause the socket so that the server doesn't get the unsubscribe message - ws.send = () => {} - - await new Promise(r => setTimeout(r, 20)) - - assert.equal(1, Object.keys(services.item["subscriptions"]).length) - assert.equal(1, Object.values(services.item["subscriptions"])[0].sessions.length) - - const [session] = Object.values(services.item["subscriptions"])[0].sessions - assert.equal(1, session.subscriptions.length) - - await client.disconnect() - - // time to cleanup - await new Promise(r => setTimeout(r, 100)) - - assert.equal(0, Object.keys(services.item["subscriptions"]).length) - }) }) diff --git a/packages/core/tests/topics.ts b/packages/core/tests/topics.ts index 5c23138..32e76da 100644 --- a/packages/core/tests/topics.ts +++ b/packages/core/tests/topics.ts @@ -3,7 +3,9 @@ import {createRpcClient, LocalTopicImpl, MessageType} from "../src" import {groupReducer} from "../src/local" import {RemoteTopicImpl} from "../src/remote" import {createTestClient, startTestServer, TEST_PORT} from "./testUtils" -import {createNodeWebsocket} from "../../websocket/src/server" +import {createNodeWebsocket, wrapWebsocket} from "../../websocket/src/server" +import {RpcSession} from "../src/RpcSession" +import WebSocket from "ws" describe("Topics", () => { it("error in supplier breaks subscribe", async () => { @@ -483,4 +485,38 @@ describe("Topics", () => { assert.equal(Object.keys(server.testUnsub.item["subscriptions"]).length, 0) }) + + it("exception during subscribe do not create subscription", async () => { + // throwing exception from supplier is an indication that topic do not want to subscribe this consumer + + const services = { + item: new LocalTopicImpl(async () => { + throw new Error() + }), + } + + const server = await startTestServer(services) + + let ws + + const client = await createRpcClient(async () => { + ws = new WebSocket(`ws://localhost:${TEST_PORT}`) + return wrapWebsocket(ws) + }) + + client.remote.item + .subscribe(() => {}, {}) + .catch(e => { + // ignored + }) + + // pause the socket so that the server doesn't get the unsubscribe message + ws.send = () => {} + + await new Promise(r => setTimeout(r, 20)) + + assert.equal(0, Object.keys(services.item["subscriptions"]).length) + const session = Object.values(server["__sessions"])[0] as RpcSession + assert.equal(0, session.subscriptions.length) + }) })