Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/silly-impalas-walk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-rsocket-router': patch
'@powersync/lib-services-framework': patch
'powersync-open-service': patch
---

Fix concurrent connection limiting for websockets
10 changes: 9 additions & 1 deletion libs/lib-services/src/errors/framework-errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,16 @@ export class JourneyError extends Error {
return input instanceof JourneyError || input?.is_journey_error == true;
}

private static errorMessage(data: ErrorData) {
let message = `[${data.code}] ${data.description}`;
if (data.details) {
message += `\n ${data.details}`;
}
return message;
}

constructor(data: ErrorData) {
super(`[${data.code}] ${data.description}\n ${data.details}`);
super(JourneyError.errorMessage(data));

this.errorData = data;
if (data.stack) {
Expand Down
21 changes: 10 additions & 11 deletions packages/rsocket-router/src/router/ReactiveSocketRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@ import { WebsocketServerTransport } from './transport/WebSocketServerTransport.j
import { errors, logger } from '@powersync/lib-services-framework';

export class ReactiveSocketRouter<C> {
protected activeConnections: number;

constructor(protected options?: ReactiveSocketRouterOptions<C>) {
this.activeConnections = 0;
}
constructor(protected options?: ReactiveSocketRouterOptions<C>) {}

reactiveStream<I, O>(path: string, stream: IReactiveStreamInput<I, O, C>): IReactiveStream<I, O, C> {
return {
Expand Down Expand Up @@ -60,11 +56,16 @@ export class ReactiveSocketRouter<C> {
acceptor: {
accept: async (payload) => {
const { max_concurrent_connections } = this.options ?? {};
if (max_concurrent_connections && this.activeConnections >= max_concurrent_connections) {
throw new errors.JourneyError({
code: '429',
// wss.clients.size includes this connection, so we check for greater than
// TODO: Share connection limit between this and http stream connections
if (max_concurrent_connections && wss.clients.size > max_concurrent_connections) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs for wss.clients: https://github.com/powersync-ja/powersync-service/pull/36/files

The size of the set did seem to accurately reflect the number of concurrent connections in the testing I did.

const err = new errors.JourneyError({
status: 429,
code: 'SERVER_BUSY',
description: `Maximum active concurrent connections limit has been reached`
});
logger.warn(err);
throw err;
}

// Throwing an exception in this context will be returned to the client side request
Expand All @@ -80,16 +81,14 @@ export class ReactiveSocketRouter<C> {
requestStream: (payload, initialN, responder) => {
const observer = new SocketRouterObserver();

// TODO: Consider limiting the number of active streams per connection to prevent abuse
handleReactiveStream(context, { payload, initialN, responder }, observer, params).catch((ex) => {
logger.error(ex);
responder.onError(ex);
responder.onComplete();
});

this.activeConnections++;
return {
cancel: () => {
this.activeConnections--;
observer.triggerCancel();
},
onExtension: () => observer.triggerExtension(),
Expand Down