From 43165cc8616382a06e715ee075a4c1c5707e4b96 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 10 Jul 2024 21:26:49 +0200 Subject: [PATCH 1/3] Fix websocket connection limit. --- .../src/router/ReactiveSocketRouter.ts | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/packages/rsocket-router/src/router/ReactiveSocketRouter.ts b/packages/rsocket-router/src/router/ReactiveSocketRouter.ts index 3dada5325..06cfcafc5 100644 --- a/packages/rsocket-router/src/router/ReactiveSocketRouter.ts +++ b/packages/rsocket-router/src/router/ReactiveSocketRouter.ts @@ -19,11 +19,7 @@ import { WebsocketServerTransport } from './transport/WebSocketServerTransport.j import { errors, logger } from '@powersync/lib-services-framework'; export class ReactiveSocketRouter { - protected activeConnections: number; - - constructor(protected options?: ReactiveSocketRouterOptions) { - this.activeConnections = 0; - } + constructor(protected options?: ReactiveSocketRouterOptions) {} reactiveStream(path: string, stream: IReactiveStreamInput): IReactiveStream { return { @@ -60,11 +56,16 @@ export class ReactiveSocketRouter { acceptor: { accept: async (payload) => { const { max_concurrent_connections } = this.options ?? {}; - if (max_concurrent_connections && this.activeConnections >= max_concurrent_connections) { - throw new errors.JourneyError({ - code: '429', + // wss.clients.size includes this connection, so we check for greater than + // TODO: Share connection limit between this and http stream connections + if (max_concurrent_connections && wss.clients.size > max_concurrent_connections) { + const err = new errors.JourneyError({ + status: 429, + code: 'SERVER_BUSY', description: `Maximum active concurrent connections limit has been reached` }); + logger.warn(err); + throw err; } // Throwing an exception in this context will be returned to the client side request @@ -80,16 +81,14 @@ export class ReactiveSocketRouter { requestStream: (payload, initialN, responder) => { const observer = new SocketRouterObserver(); + // TODO: Consider limiting the number of active streams per connection to prevent abuse handleReactiveStream(context, { payload, initialN, responder }, observer, params).catch((ex) => { logger.error(ex); responder.onError(ex); responder.onComplete(); }); - - this.activeConnections++; return { cancel: () => { - this.activeConnections--; observer.triggerCancel(); }, onExtension: () => observer.triggerExtension(), From b23e89308ae5c22e71201a9c13d6ca5975017d99 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 10 Jul 2024 21:34:21 +0200 Subject: [PATCH 2/3] Improve error logging. --- libs/lib-services/src/errors/framework-errors.ts | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/libs/lib-services/src/errors/framework-errors.ts b/libs/lib-services/src/errors/framework-errors.ts index e09cc0772..7c6e3abb3 100644 --- a/libs/lib-services/src/errors/framework-errors.ts +++ b/libs/lib-services/src/errors/framework-errors.ts @@ -30,8 +30,16 @@ export class JourneyError extends Error { return input instanceof JourneyError || input?.is_journey_error == true; } + private static errorMessage(data: ErrorData) { + let message = `[${data.code}] ${data.description}`; + if (data.details) { + message += `\n ${data.details}`; + } + return message; + } + constructor(data: ErrorData) { - super(`[${data.code}] ${data.description}\n ${data.details}`); + super(JourneyError.errorMessage(data)); this.errorData = data; if (data.stack) { From 909f71a614fca6d91a1abb8dfb303609207457fa Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 10 Jul 2024 21:35:14 +0200 Subject: [PATCH 3/3] Add changeset. --- .changeset/silly-impalas-walk.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/silly-impalas-walk.md diff --git a/.changeset/silly-impalas-walk.md b/.changeset/silly-impalas-walk.md new file mode 100644 index 000000000..c7a879970 --- /dev/null +++ b/.changeset/silly-impalas-walk.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-rsocket-router': patch +'@powersync/lib-services-framework': patch +'powersync-open-service': patch +--- + +Fix concurrent connection limiting for websockets