From 2fee597d15a51bd71b609de44b060ba348672b38 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Thu, 25 Jul 2024 12:08:58 +0200 Subject: [PATCH 1/2] add utilities for route generation --- .changeset/fair-planes-flow.md | 5 + .../src/router/ReactiveSocketRouter.ts | 5 +- .../src/routes/configure-fastify.ts | 102 ++++++++++++++++++ .../src/routes/configure-rsocket.ts | 59 ++++++++++ .../src/routes/endpoints/socket-route.ts | 8 +- .../service-core/src/routes/route-register.ts | 4 +- .../service-core/src/routes/router-socket.ts | 10 +- packages/service-core/src/routes/router.ts | 2 + .../service-core/src/routes/routes-index.ts | 2 + packages/service-core/src/sync/sync-index.ts | 1 + service/src/runners/server.ts | 99 +---------------- 11 files changed, 187 insertions(+), 110 deletions(-) create mode 100644 .changeset/fair-planes-flow.md create mode 100644 packages/service-core/src/routes/configure-fastify.ts create mode 100644 packages/service-core/src/routes/configure-rsocket.ts diff --git a/.changeset/fair-planes-flow.md b/.changeset/fair-planes-flow.md new file mode 100644 index 000000000..e29d3ece5 --- /dev/null +++ b/.changeset/fair-planes-flow.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-core': minor +--- + +Added utility functions for registering routes diff --git a/packages/rsocket-router/src/router/ReactiveSocketRouter.ts b/packages/rsocket-router/src/router/ReactiveSocketRouter.ts index 06cfcafc5..9743df209 100644 --- a/packages/rsocket-router/src/router/ReactiveSocketRouter.ts +++ b/packages/rsocket-router/src/router/ReactiveSocketRouter.ts @@ -3,10 +3,12 @@ * to expose reactive websocket stream in an interface similar to * other Journey micro routers. */ +import { errors, logger } from '@powersync/lib-services-framework'; import * as http from 'http'; import { Payload, RSocketServer } from 'rsocket-core'; import * as ws from 'ws'; import { SocketRouterObserver } from './SocketRouterListener.js'; +import { WebsocketServerTransport } from './transport/WebSocketServerTransport.js'; import { CommonParams, IReactiveStream, @@ -15,8 +17,6 @@ import { ReactiveSocketRouterOptions, SocketResponder } from './types.js'; -import { WebsocketServerTransport } from './transport/WebSocketServerTransport.js'; -import { errors, logger } from '@powersync/lib-services-framework'; export class ReactiveSocketRouter { constructor(protected options?: ReactiveSocketRouterOptions) {} @@ -56,6 +56,7 @@ export class ReactiveSocketRouter { acceptor: { accept: async (payload) => { const { max_concurrent_connections } = this.options ?? {}; + logger.info(`Currently have ${wss.clients.size} active WebSocket connection(s)`); // wss.clients.size includes this connection, so we check for greater than // TODO: Share connection limit between this and http stream connections if (max_concurrent_connections && wss.clients.size > max_concurrent_connections) { diff --git a/packages/service-core/src/routes/configure-fastify.ts b/packages/service-core/src/routes/configure-fastify.ts new file mode 100644 index 000000000..d8934b3fa --- /dev/null +++ b/packages/service-core/src/routes/configure-fastify.ts @@ -0,0 +1,102 @@ +import type fastify from 'fastify'; +import { registerFastifyRoutes } from './route-register.js'; + +import * as system from '../system/system-index.js'; + +import { ADMIN_ROUTES } from './endpoints/admin.js'; +import { CHECKPOINT_ROUTES } from './endpoints/checkpointing.js'; +import { DEV_ROUTES } from './endpoints/dev.js'; +import { SYNC_RULES_ROUTES } from './endpoints/sync-rules.js'; +import { SYNC_STREAM_ROUTES } from './endpoints/sync-stream.js'; +import { createRequestQueueHook, CreateRequestQueueParams } from './hooks.js'; +import { RouteDefinition } from './router.js'; + +/** + * A list of route definitions to be registered as endpoints. + * Supplied concurrency limits will be applied to the grouped routes. + */ +export type RouteRegistrationOptions = { + routes: RouteDefinition[]; + queueOptions: CreateRequestQueueParams; +}; + +/** + * HTTP routes separated by API and Sync stream categories. + * This allows for separate concurrency limits. + */ +export type RouteDefinitions = { + api?: Partial; + syncStream?: Partial; +}; + +export type FastifyServerConfig = { + system: system.CorePowerSyncSystem; + routes?: RouteDefinitions; +}; + +export const DEFAULT_ROUTE_OPTIONS = { + api: { + routes: [...ADMIN_ROUTES, ...CHECKPOINT_ROUTES, ...DEV_ROUTES, ...SYNC_RULES_ROUTES], + queueOptions: { + concurrency: 10, + max_queue_depth: 20 + } + }, + syncStream: { + routes: [...SYNC_STREAM_ROUTES], + queueOptions: { + concurrency: 200, + max_queue_depth: 0 + } + } +}; + +/** + * Registers default routes on a Fastify server. Consumers can optionally configure + * concurrency queue limits or override routes. + */ +export function configureFastifyServer(server: fastify.FastifyInstance, options: FastifyServerConfig) { + const { system, routes = DEFAULT_ROUTE_OPTIONS } = options; + /** + * Fastify creates an encapsulated context for each `.register` call. + * Creating a separate context here to separate the concurrency limits for Admin APIs + * and Sync Streaming routes. + * https://github.com/fastify/fastify/blob/main/docs/Reference/Encapsulation.md + */ + server.register(async function (childContext) { + registerFastifyRoutes( + childContext, + async () => { + return { + user_id: undefined, + system: system + }; + }, + routes.api?.routes ?? DEFAULT_ROUTE_OPTIONS.api.routes + ); + // Limit the active concurrent requests + childContext.addHook( + 'onRequest', + createRequestQueueHook(routes.api?.queueOptions ?? DEFAULT_ROUTE_OPTIONS.api.queueOptions) + ); + }); + + // Create a separate context for concurrency queueing + server.register(async function (childContext) { + registerFastifyRoutes( + childContext, + async () => { + return { + user_id: undefined, + system: system + }; + }, + routes.syncStream?.routes ?? DEFAULT_ROUTE_OPTIONS.syncStream.routes + ); + // Limit the active concurrent requests + childContext.addHook( + 'onRequest', + createRequestQueueHook(routes.syncStream?.queueOptions ?? DEFAULT_ROUTE_OPTIONS.syncStream.queueOptions) + ); + }); +} diff --git a/packages/service-core/src/routes/configure-rsocket.ts b/packages/service-core/src/routes/configure-rsocket.ts new file mode 100644 index 000000000..a4535774c --- /dev/null +++ b/packages/service-core/src/routes/configure-rsocket.ts @@ -0,0 +1,59 @@ +import { deserialize } from 'bson'; +import * as http from 'http'; + +import { errors, logger } from '@powersync/lib-services-framework'; +import { ReactiveSocketRouter, RSocketRequestMeta } from '@powersync/service-rsocket-router'; + +import { CorePowerSyncSystem } from '../system/CorePowerSyncSystem.js'; +import { generateContext, getTokenFromHeader } from './auth.js'; +import { syncStreamReactive } from './endpoints/socket-route.js'; +import { RSocketContextMeta, SocketRouteGenerator } from './router-socket.js'; +import { Context } from './router.js'; + +export type RSockerRouterConfig = { + system: CorePowerSyncSystem; + server: http.Server; + routeGenerators?: SocketRouteGenerator[]; +}; + +export const DEFAULT_SOCKET_ROUTES = [syncStreamReactive]; + +export function configureRSocket(router: ReactiveSocketRouter, options: RSockerRouterConfig) { + const { routeGenerators = DEFAULT_SOCKET_ROUTES, server, system } = options; + + router.applyWebSocketEndpoints(server, { + contextProvider: async (data: Buffer) => { + const { token } = RSocketContextMeta.decode(deserialize(data) as any); + + if (!token) { + throw new errors.AuthorizationError('No token provided'); + } + + try { + const extracted_token = getTokenFromHeader(token); + if (extracted_token != null) { + const { context, errors: token_errors } = await generateContext(system, extracted_token); + if (context?.token_payload == null) { + throw new errors.AuthorizationError(token_errors ?? 'Authentication required'); + } + return { + token, + ...context, + token_errors: token_errors, + system + }; + } else { + throw new errors.AuthorizationError('No token provided'); + } + } catch (ex) { + logger.error(ex); + throw ex; + } + }, + endpoints: routeGenerators.map((generator) => generator(router)), + metaDecoder: async (meta: Buffer) => { + return RSocketRequestMeta.decode(deserialize(meta) as any); + }, + payloadDecoder: async (rawData?: Buffer) => rawData && deserialize(rawData) + }); +} diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index 799f94400..5c818b6bf 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -3,20 +3,14 @@ import { RequestParameters } from '@powersync/service-sync-rules'; import { serialize } from 'bson'; import { Metrics } from '../../metrics/Metrics.js'; +import { RequestTracker } from '../../sync/RequestTracker.js'; import { streamResponse } from '../../sync/sync.js'; import * as util from '../../util/util-index.js'; import { SocketRouteGenerator } from '../router-socket.js'; import { SyncRoutes } from './sync-stream.js'; -import { RequestTracker } from '../../sync/RequestTracker.js'; export const syncStreamReactive: SocketRouteGenerator = (router) => router.reactiveStream(SyncRoutes.STREAM, { - authorize: ({ context }) => { - return { - authorized: !!context.token_payload, - errors: ['Authentication required'].concat(context.token_errors ?? []) - }; - }, validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }), handler: async ({ context, params, responder, observer, initialN }) => { const { system } = context; diff --git a/packages/service-core/src/routes/route-register.ts b/packages/service-core/src/routes/route-register.ts index 3643396b6..8b798160b 100644 --- a/packages/service-core/src/routes/route-register.ts +++ b/packages/service-core/src/routes/route-register.ts @@ -1,6 +1,6 @@ -import fastify from 'fastify'; +import type fastify from 'fastify'; -import { errors, router, HTTPMethod, logger } from '@powersync/lib-services-framework'; +import { errors, HTTPMethod, logger, router } from '@powersync/lib-services-framework'; import { Context, ContextProvider, RequestEndpoint, RequestEndpointHandlerPayload } from './router.js'; export type FastifyEndpoint = RequestEndpoint & { diff --git a/packages/service-core/src/routes/router-socket.ts b/packages/service-core/src/routes/router-socket.ts index ca7954987..31b8a3adf 100644 --- a/packages/service-core/src/routes/router-socket.ts +++ b/packages/service-core/src/routes/router-socket.ts @@ -1,13 +1,13 @@ +import { IReactiveStream, ReactiveSocketRouter } from '@powersync/service-rsocket-router'; import * as t from 'ts-codec'; -import { ReactiveSocketRouter, IReactiveStream } from '@powersync/service-rsocket-router'; import { Context } from './router.js'; -export const RSocketContextMeta = t.object({ - token: t.string -}); - /** * Creates a socket route handler given a router instance */ export type SocketRouteGenerator = (router: ReactiveSocketRouter) => IReactiveStream; + +export const RSocketContextMeta = t.object({ + token: t.string +}); diff --git a/packages/service-core/src/routes/router.ts b/packages/service-core/src/routes/router.ts index 6eae50531..ac2a5f525 100644 --- a/packages/service-core/src/routes/router.ts +++ b/packages/service-core/src/routes/router.ts @@ -36,6 +36,8 @@ export type RequestEndpointHandlerPayload< request: Request; }; +export type RouteDefinition = RequestEndpoint; + /** * Helper function for making generics work well when defining routes */ diff --git a/packages/service-core/src/routes/routes-index.ts b/packages/service-core/src/routes/routes-index.ts index fe07904a6..b1c14c2a5 100644 --- a/packages/service-core/src/routes/routes-index.ts +++ b/packages/service-core/src/routes/routes-index.ts @@ -1,4 +1,6 @@ export * as auth from './auth.js'; +export * from './configure-fastify.js'; +export * from './configure-rsocket.js'; export * as endpoints from './endpoints/route-endpoints-index.js'; export * as hooks from './hooks.js'; export * from './route-register.js'; diff --git a/packages/service-core/src/sync/sync-index.ts b/packages/service-core/src/sync/sync-index.ts index 7591f759c..0d9d2f87c 100644 --- a/packages/service-core/src/sync/sync-index.ts +++ b/packages/service-core/src/sync/sync-index.ts @@ -1,6 +1,7 @@ export * from './BroadcastIterable.js'; export * from './LastValueSink.js'; export * from './merge.js'; +export * from './RequestTracker.js'; export * from './safeRace.js'; export * from './sync.js'; export * from './util.js'; diff --git a/service/src/runners/server.ts b/service/src/runners/server.ts index e4868303c..386d7da4a 100644 --- a/service/src/runners/server.ts +++ b/service/src/runners/server.ts @@ -1,12 +1,10 @@ -import { deserialize } from 'bson'; -import fastify from 'fastify'; import cors from '@fastify/cors'; +import { container, logger } from '@powersync/lib-services-framework'; import * as core from '@powersync/service-core'; -import { container, errors, logger } from '@powersync/lib-services-framework'; -import { RSocketRequestMeta } from '@powersync/service-rsocket-router'; +import fastify from 'fastify'; -import { PowerSyncSystem } from '../system/PowerSyncSystem.js'; import { SocketRouter } from '../routes/router.js'; +import { PowerSyncSystem } from '../system/PowerSyncSystem.js'; /** * Starts an API server */ @@ -18,60 +16,6 @@ export async function startServer(runnerConfig: core.utils.RunnerConfig) { const server = fastify.fastify(); - /** - * Fastify creates an encapsulated context for each `.register` call. - * Creating a separate context here to separate the concurrency limits for Admin APIs - * and Sync Streaming routes. - * https://github.com/fastify/fastify/blob/main/docs/Reference/Encapsulation.md - */ - server.register(async function (childContext) { - core.routes.registerFastifyRoutes( - childContext, - async () => { - return { - user_id: undefined, - system: system - }; - }, - [ - ...core.routes.endpoints.ADMIN_ROUTES, - ...core.routes.endpoints.CHECKPOINT_ROUTES, - ...core.routes.endpoints.DEV_ROUTES, - ...core.routes.endpoints.SYNC_RULES_ROUTES - ] - ); - // Limit the active concurrent requests - childContext.addHook( - 'onRequest', - core.routes.hooks.createRequestQueueHook({ - max_queue_depth: 20, - concurrency: 10 - }) - ); - }); - - // Create a separate context for concurrency queueing - server.register(async function (childContext) { - core.routes.registerFastifyRoutes( - childContext, - async () => { - return { - user_id: undefined, - system: system - }; - }, - [...core.routes.endpoints.SYNC_STREAM_ROUTES] - ); - // Limit the active concurrent requests - childContext.addHook( - 'onRequest', - core.routes.hooks.createRequestQueueHook({ - max_queue_depth: 0, - concurrency: 200 - }) - ); - }); - server.register(cors, { origin: '*', allowedHeaders: ['Content-Type', 'Authorization'], @@ -80,41 +24,8 @@ export async function startServer(runnerConfig: core.utils.RunnerConfig) { maxAge: 3600 }); - SocketRouter.applyWebSocketEndpoints(server.server, { - contextProvider: async (data: Buffer) => { - const { token } = core.routes.RSocketContextMeta.decode(deserialize(data) as any); - - if (!token) { - throw new errors.AuthorizationError('No token provided'); - } - - try { - const extracted_token = core.routes.auth.getTokenFromHeader(token); - if (extracted_token != null) { - const { context, errors: token_errors } = await core.routes.auth.generateContext(system, extracted_token); - if (context?.token_payload == null) { - throw new errors.AuthorizationError(token_errors ?? 'Authentication required'); - } - return { - token, - ...context, - token_errors: token_errors, - system - }; - } else { - throw new errors.AuthorizationError('No token provided'); - } - } catch (ex) { - logger.error(ex); - throw ex; - } - }, - endpoints: [core.routes.endpoints.syncStreamReactive(SocketRouter)], - metaDecoder: async (meta: Buffer) => { - return RSocketRequestMeta.decode(deserialize(meta) as any); - }, - payloadDecoder: async (rawData?: Buffer) => rawData && deserialize(rawData) - }); + core.routes.configureFastifyServer(server, { system }); + core.routes.configureRSocket(SocketRouter, { server: server.server, system }); logger.info('Starting system'); await system.start(); From 1ac7e5af4e07203cd57221862c747bc61d13e96c Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Thu, 25 Jul 2024 12:10:06 +0200 Subject: [PATCH 2/2] cleanup imports --- packages/service-core/src/routes/endpoints/socket-route.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index 5c818b6bf..4ae52deff 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -3,8 +3,7 @@ import { RequestParameters } from '@powersync/service-sync-rules'; import { serialize } from 'bson'; import { Metrics } from '../../metrics/Metrics.js'; -import { RequestTracker } from '../../sync/RequestTracker.js'; -import { streamResponse } from '../../sync/sync.js'; +import * as sync from '../../sync/sync-index.js'; import * as util from '../../util/util-index.js'; import { SocketRouteGenerator } from '../router-socket.js'; import { SyncRoutes } from './sync-stream.js'; @@ -61,9 +60,9 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => }); Metrics.getInstance().concurrent_connections.add(1); - const tracker = new RequestTracker(); + const tracker = new sync.RequestTracker(); try { - for await (const data of streamResponse({ + for await (const data of sync.streamResponse({ storage, params: { ...params,