Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
d23166c
feat: 🎸 setup caller
streamich Nov 14, 2023
744344f
feat: 🎸 setup server
streamich Nov 14, 2023
eef322d
refactor: 💡 rename server folder
streamich Nov 14, 2023
6dfe015
style: 💄 run Prettier
streamich Nov 14, 2023
834acf8
feat: 🎸 wire in services into routes
streamich Nov 14, 2023
87b6fbb
test: 💍 setup route testing
streamich Nov 14, 2023
1774ea3
style: 💄 run Prettier
streamich Nov 14, 2023
5a5f6ba
chore: 🤖 fix build error
streamich Nov 14, 2023
4ff9e63
feat: 🎸 implement pubsub.subscribe method
streamich Nov 14, 2023
957f06c
feat: 🎸 improve pubsub service
streamich Nov 14, 2023
c773e72
test: 💍 add pubsub server tests
streamich Nov 14, 2023
544d45f
feat: 🎸 add publish method docs
streamich Nov 14, 2023
97f7704
feat: 🎸 start presence server implementation
streamich Nov 14, 2023
10e57ef
feat(json-crdt): 🎸 setup routes for presence service and add presence…
streamich Nov 14, 2023
35aa635
feat: 🎸 add "presence.listen" route
streamich Nov 14, 2023
a15b185
feat: 🎸 allow to inject services dependency
streamich Nov 14, 2023
d135b49
feat: 🎸 emit existing entries on presence.listen
streamich Nov 14, 2023
d4ff303
refactor: 💡 rename "pubsub.subscribe" to "pubsub.listen"
streamich Nov 14, 2023
d67516e
style: 💄 run Prettier
streamich Nov 14, 2023
4d6c751
fix: 🐛 correct build errors
streamich Nov 14, 2023
eb482c4
feat: 🎸 add ability to delete presence entries
streamich Nov 14, 2023
e0768e7
style: 💄 run Prettier
streamich Nov 14, 2023
3265c3e
feat: 🎸 start store implementation
streamich Nov 14, 2023
438fbce
feat: 🎸 start store service implementation
streamich Nov 15, 2023
5c15c70
style: 💄 run Prettier
streamich Nov 15, 2023
dfd7c0c
refactor: 💡 rename blocks service
streamich Nov 15, 2023
1edec3a
feat: 🎸 add blocks.create internal method
streamich Nov 15, 2023
f9e81c5
feat: 🎸 add ability to create blocks and retrieve them by ID
streamich Nov 15, 2023
a842aea
test: 💍 add tests for blocks.get method
streamich Nov 15, 2023
f791c7a
feat: 🎸 add blocks.remove method
streamich Nov 15, 2023
c798c03
feat: 🎸 improve blocks.edit method
streamich Nov 15, 2023
8f8aa13
style: 💄 run Prettier
streamich Nov 15, 2023
823c37e
feat: 🎸 add "blocks.listen" method
streamich Nov 15, 2023
2da0936
style: 💄 run Prettier
streamich Nov 15, 2023
6ab411f
style: 💄 suppress linter warnings
streamich Nov 15, 2023
fdd44ac
feat: 🎸 add util.info route
streamich Nov 15, 2023
0ef6bb3
style: 💄 run Prettier
streamich Nov 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion src/json-type/system/TypeRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,23 @@ export class TypeRouter<Routes extends RoutesBase> {
const router = new TypeRouter({system: this.system, routes: routes(this)});
return this.merge(router);
}

public fn<K extends string, R extends classes.FunctionType<any, any>>(
name: K,
type: R,
): TypeRouter<Routes & {[KK in K]: R}> {
this.routes[name] = <any>type;
return <any>this;
}

public fn$<K extends string, R extends classes.FunctionStreamingType<any, any>>(
name: K,
type: R,
): TypeRouter<Routes & {[KK in K]: R}> {
this.routes[name] = <any>type;
return <any>this;
}
}

export type RoutesBase = Record<string, classes.FunctionType<any, any>>;
export type RoutesBase = Record<string, classes.FunctionType<any, any> | classes.FunctionStreamingType<any, any>>;
type TypeRouterRoutes<R extends TypeRouter<any>> = R extends TypeRouter<infer R2> ? R2 : never;
1 change: 1 addition & 0 deletions src/reactive-rpc/__demos__/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const app = new RpcApp({
caller,
codecs,
maxRequestBodySize: 1024 * 1024,
augmentContext: (ctx) => ctx,
});

app.enableCors();
Expand Down
20 changes: 19 additions & 1 deletion src/reactive-rpc/common/rpc/caller/TypeRouterCaller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import {StaticRpcMethod, type StaticRpcMethodOptions} from '../methods/StaticRpc
import {StreamingRpcMethod, type StreamingRpcMethodOptions} from '../methods/StreamingRpcMethod';
import type {Schema, SchemaOf, TypeOf, TypeSystem} from '../../../../json-type';
import type {TypeRouter} from '../../../../json-type/system/TypeRouter';
import type {Value} from '../../messages/Value';
import type {Observable} from 'rxjs';

export interface TypedApiCallerOptions<Router extends TypeRouter<any>, Ctx = unknown>
extends Omit<RpcApiCallerOptions<Ctx>, 'getMethod'> {
Expand Down Expand Up @@ -68,9 +70,25 @@ export class TypeRouterCaller<Router extends TypeRouter<any>, Ctx = unknown> ext
id: K,
request: MethodReq<Routes<Router>[K]>,
ctx: Ctx,
): Promise<MethodRes<Routes<Router>[K]>> {
): Promise<Value<MethodRes<Routes<Router>[K]>>> {
return super.call(id as string, request, ctx) as any;
}

public async callSimple<K extends keyof Routes<Router>>(
id: K,
request: MethodReq<Routes<Router>[K]>,
ctx: Ctx = {} as any,
): Promise<MethodRes<Routes<Router>[K]>> {
return (await super.call(id as string, request, ctx)).data as any;
}

public call$<K extends keyof Routes<Router>>(
id: K,
request: Observable<MethodReq<Routes<Router>[K]>>,
ctx: Ctx,
): Observable<Value<MethodRes<Routes<Router>[K]>>> {
return super.call$(id as string, request, ctx) as any;
}
}

type Routes<Router> = Router extends TypeRouter<infer R> ? R : never;
Expand Down
8 changes: 4 additions & 4 deletions src/reactive-rpc/common/rpc/caller/error/RpcError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,21 @@ export class RpcError extends Error implements IRpcError {
return RpcError.internal();
}

public static fromCode(errno: RpcErrorCodes, message: string = '', meta: unknown = undefined) {
public static fromCode(errno: RpcErrorCodes, message: string = '', meta: unknown = undefined): RpcError {
const code = RpcErrorCodes[errno];
return new RpcError(message || code, code, errno, undefined, meta || undefined);
}

public static internal(message: string = 'Internal Server Error') {
public static internal(message: string = 'Internal Server Error'): RpcError {
return RpcError.fromCode(RpcErrorCodes.INTERNAL_ERROR, message);
}

/** @todo Rename to "badRequest". */
public static invalidRequest() {
public static invalidRequest(): RpcError {
return RpcError.fromCode(RpcErrorCodes.BAD_REQUEST, 'Bad Request');
}

public static validation(message: string, meta?: unknown) {
public static validation(message: string, meta?: unknown): RpcError {
return RpcError.fromCode(RpcErrorCodes.BAD_REQUEST, message, meta);
}

Expand Down
42 changes: 19 additions & 23 deletions src/reactive-rpc/server/uws/RpcApp.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {enableCors} from './util';
import {Match, Router} from '../../../util/router';
import {listToUint8} from '../../../util/buffers/concat';
import {IncomingBatchMessage, RpcMessageBatchProcessor} from '../../common/rpc/RpcMessageBatchProcessor';
import {RpcError, RpcErrorCodes, RpcErrorType} from '../../common/rpc/caller/error';
import {ConnectionContext} from '../context';
Expand All @@ -11,15 +10,7 @@ import {RpcMessageFormat} from '../../common/codec/constants';
import {RpcCodecs} from '../../common/codec/RpcCodecs';
import {type ReactiveRpcMessage, RpcMessageStreamProcessor, ReactiveRpcClientMessage} from '../../common';
import type {Codecs} from '../../../json-pack/codecs/Codecs';
import type {
TemplatedApp,
HttpRequest,
HttpResponse,
HttpMethodPermissive,
JsonRouteHandler,
WebSocket,
RpcWebSocket,
} from './types';
import type * as types from './types';
import type {RouteHandler} from './types';
import type {RpcCaller} from '../../common/rpc/caller/RpcCaller';
import type {JsonValueCodec} from '../../../json-pack/codecs/types';
Expand All @@ -31,15 +22,16 @@ const ERR_NOT_FOUND = RpcError.fromCode(RpcErrorCodes.NOT_FOUND, 'Not Found');
const ERR_INTERNAL = RpcError.internal();

export interface RpcAppOptions {
uws: TemplatedApp;
uws: types.TemplatedApp;
maxRequestBodySize: number;
codecs: Codecs;
caller: RpcCaller;
caller: RpcCaller<any>;
augmentContext: (ctx: ConnectionContext) => void;
}

export class RpcApp<Ctx extends ConnectionContext> {
public readonly codecs: RpcCodecs;
protected readonly app: TemplatedApp;
protected readonly app: types.TemplatedApp;
protected readonly maxRequestBodySize: number;
protected readonly router = new Router();
protected readonly batchProcessor: RpcMessageBatchProcessor<Ctx>;
Expand All @@ -55,12 +47,12 @@ export class RpcApp<Ctx extends ConnectionContext> {
enableCors(this.options.uws);
}

public routeRaw(method: HttpMethodPermissive, path: string, handler: RouteHandler<Ctx>): void {
method = method.toLowerCase() as HttpMethodPermissive;
public routeRaw(method: types.HttpMethodPermissive, path: string, handler: RouteHandler<Ctx>): void {
method = method.toLowerCase() as types.HttpMethodPermissive;
this.router.add(method + path, handler);
}

public route(method: HttpMethodPermissive, path: string, handler: JsonRouteHandler<Ctx>): void {
public route(method: types.HttpMethodPermissive, path: string, handler: types.JsonRouteHandler<Ctx>): void {
this.routeRaw(method, path, async (ctx: Ctx) => {
const result = await handler(ctx);
const res = ctx.res!;
Expand Down Expand Up @@ -112,6 +104,7 @@ export class RpcApp<Ctx extends ConnectionContext> {

public enableWsRpc(path: string = '/rpc'): this {
const maxBackpressure = 4 * 1024 * 1024;
const augmentContext = this.options.augmentContext;
this.app.ws(path, {
idleTimeout: 0,
maxPayloadLength: 4 * 1024 * 1024,
Expand All @@ -120,11 +113,12 @@ export class RpcApp<Ctx extends ConnectionContext> {
const secWebSocketProtocol = req.getHeader('sec-websocket-protocol');
const secWebSocketExtensions = req.getHeader('sec-websocket-extensions');
const ctx = ConnectionContext.fromReqRes(req, res, null, this);
augmentContext(ctx);
/* This immediately calls open handler, you must not use res after this call */
res.upgrade({ctx}, secWebSocketKey, secWebSocketProtocol, secWebSocketExtensions, context);
},
open: (ws_: WebSocket) => {
const ws = ws_ as RpcWebSocket<Ctx>;
open: (ws_: types.WebSocket) => {
const ws = ws_ as types.RpcWebSocket<Ctx>;
const ctx = ws.ctx;
const resCodec = ctx.resCodec;
const msgCodec = ctx.msgCodec;
Expand All @@ -144,8 +138,8 @@ export class RpcApp<Ctx extends ConnectionContext> {
bufferTime: 0,
});
},
message: (ws_: WebSocket, buf: ArrayBuffer, isBinary: boolean) => {
const ws = ws_ as RpcWebSocket<Ctx>;
message: (ws_: types.WebSocket, buf: ArrayBuffer, isBinary: boolean) => {
const ws = ws_ as types.RpcWebSocket<Ctx>;
const ctx = ws.ctx;
const reqCodec = ctx.reqCodec;
const msgCodec = ctx.msgCodec;
Expand All @@ -158,8 +152,8 @@ export class RpcApp<Ctx extends ConnectionContext> {
rpc.sendNotification('.err', RpcError.value(RpcError.invalidRequest()));
}
},
close: (ws_: WebSocket, code: number, message: ArrayBuffer) => {
const ws = ws_ as RpcWebSocket<Ctx>;
close: (ws_: types.WebSocket, code: number, message: ArrayBuffer) => {
const ws = ws_ as types.RpcWebSocket<Ctx>;
ws.rpc!.stop();
},
});
Expand All @@ -170,7 +164,8 @@ export class RpcApp<Ctx extends ConnectionContext> {
const matcher = this.router.compile();
const codecs = this.codecs;
let responseCodec: JsonValueCodec = codecs.value.json;
this.app.any('/*', async (res: HttpResponse, req: HttpRequest) => {
const augmentContext = this.options.augmentContext;
this.app.any('/*', async (res: types.HttpResponse, req: types.HttpRequest) => {
res.onAborted(() => {
res.aborted = true;
});
Expand All @@ -189,6 +184,7 @@ export class RpcApp<Ctx extends ConnectionContext> {
const params = match.params;
const ctx = ConnectionContext.fromReqRes(req, res, params, this) as Ctx;
responseCodec = ctx.resCodec;
augmentContext(ctx);
await handler(ctx);
} catch (err) {
if (err instanceof RpcError) {
Expand Down
Loading