Skip to content

Commit

Permalink
Ability to cancel subscription by throwing exception from supplier
Browse files Browse the repository at this point in the history
  • Loading branch information
vasyas committed Nov 22, 2023
1 parent f0936c9 commit e401a1e
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 47 deletions.
7 changes: 3 additions & 4 deletions packages/core/src/RpcSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -421,14 +421,13 @@ export class RpcSession {
try {
const ctx = this.createContext(messageId, topic.getTopicName())

// topic.subscribeSession will subscribe even when throwing the error, so lets add it to the list
this.subscriptions.push({topic, params})
this.listeners.subscribed(this.subscriptions.length)

const subscribeTopic = (p = params) => topic.subscribeSession(this, p, messageId, ctx)
const r = await this.localMiddleware(ctx, subscribeTopic, params, MessageType.Subscribe)

this.send(MessageType.Data, messageId, topic.getTopicName(), cloneParams(params), r)

this.subscriptions.push({topic, params})
this.listeners.subscribed(this.subscriptions.length)
} catch (e) {
log.error(`Unable to subscribe to topic ${topic.getTopicName()}`, e)
this.sendError(messageId, e)
Expand Down
7 changes: 6 additions & 1 deletion packages/core/src/local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ export class LocalTopicImpl<D, F, TD = D> extends TopicImpl implements Topic<D,
subscription.sessions.push(session)
this.subscriptions[key] = subscription

return await this.getData(filter, ctx, session.getConnectionContext())
try {
return await this.getData(filter, ctx, session.getConnectionContext())
} catch (e) {
this.unsubscribeSession(session, filter)
throw e
}
}

unsubscribeSession(session: RpcSession, filter: F) {
Expand Down
41 changes: 0 additions & 41 deletions packages/core/tests/topicBugs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,45 +115,4 @@ describe("Topic bugs", () => {

assert.equal(0, Object.keys(item["subscriptions"]).length)
})

it("exception in supplier leaves session referenced on unsubscribe", async () => {
const services = {
item: new LocalTopicImpl(async () => {
throw new Error()
}),
}

await startTestServer(services)

let ws

const client = await createRpcClient(async () => {
ws = new WebSocket(`ws://localhost:${TEST_PORT}`)
return wrapWebsocket(ws)
})

client.remote.item
.subscribe(() => {}, {})
.catch(e => {
// ignored
})

// pause the socket so that the server doesn't get the unsubscribe message
ws.send = () => {}

await new Promise(r => setTimeout(r, 20))

assert.equal(1, Object.keys(services.item["subscriptions"]).length)
assert.equal(1, Object.values(services.item["subscriptions"])[0].sessions.length)

const [session] = Object.values(services.item["subscriptions"])[0].sessions
assert.equal(1, session.subscriptions.length)

await client.disconnect()

// time to cleanup
await new Promise(r => setTimeout(r, 100))

assert.equal(0, Object.keys(services.item["subscriptions"]).length)
})
})
38 changes: 37 additions & 1 deletion packages/core/tests/topics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import {createRpcClient, LocalTopicImpl, MessageType} from "../src"
import {groupReducer} from "../src/local"
import {RemoteTopicImpl} from "../src/remote"
import {createTestClient, startTestServer, TEST_PORT} from "./testUtils"
import {createNodeWebsocket} from "../../websocket/src/server"
import {createNodeWebsocket, wrapWebsocket} from "../../websocket/src/server"
import {RpcSession} from "../src/RpcSession"
import WebSocket from "ws"

describe("Topics", () => {
it("error in supplier breaks subscribe", async () => {
Expand Down Expand Up @@ -483,4 +485,38 @@ describe("Topics", () => {

assert.equal(Object.keys(server.testUnsub.item["subscriptions"]).length, 0)
})

it("exception during subscribe do not create subscription", async () => {
// throwing exception from supplier is an indication that topic do not want to subscribe this consumer

const services = {
item: new LocalTopicImpl(async () => {
throw new Error()
}),
}

const server = await startTestServer(services)

let ws

const client = await createRpcClient(async () => {
ws = new WebSocket(`ws://localhost:${TEST_PORT}`)
return wrapWebsocket(ws)
})

client.remote.item
.subscribe(() => {}, {})
.catch(e => {
// ignored
})

// pause the socket so that the server doesn't get the unsubscribe message
ws.send = () => {}

await new Promise(r => setTimeout(r, 20))

assert.equal(0, Object.keys(services.item["subscriptions"]).length)
const session = Object.values(server["__sessions"])[0] as RpcSession
assert.equal(0, session.subscriptions.length)
})
})

0 comments on commit e401a1e

Please sign in to comment.