Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion packages/restate-sdk-examples/src/example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,17 @@ const Greeter: GreeterService = { name: "greeter" };
const counter = restate.object({
name: "counter",
handlers: {
count: async (ctx: restate.ObjectContext): Promise<number> => {
count: async (ctx: restate.ObjectContext) => {
const seen = (await ctx.get<number>("seen")) ?? 0;
ctx.set("seen", seen + 1);
return seen;
},

get: restate.handlers.shared(
async (ctx: restate.ObjectSharedContext): Promise<number> => {
return (await ctx.get("count")) ?? 0;
}
),
},
});

Expand Down
30 changes: 30 additions & 0 deletions packages/restate-sdk/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,36 @@ export interface ObjectContext extends Context, KeyValueStore {
key: string;
}

/**
* The context that gives access to all Restate-backed operations, for example
* - sending reliable messages / RPC through Restate
* - execute non-deterministic closures and memoize their result
* - sleeps and delayed calls
* - awakeables
* - ...
*
* This context can be used only within a shared virtual objects.
*
*/
export interface ObjectSharedContext extends Context {
key: string;

/**
* Get/retrieve state from the Restate runtime.
* Note that state objects are serialized with `Buffer.from(JSON.stringify(theObject))`
* and deserialized with `JSON.parse(value.toString()) as T`.
*
* @param name key of the state to retrieve
* @returns a Promise that is resolved with the value of the state key
*
* @example
* const state = await ctx.get<string>("STATE");
*/
get<T>(name: string): Promise<T | null>;

stateKeys(): Promise<Array<string>>;
}

export interface Rand {
/**
* Equivalent of JS `Math.random()` but deterministic; seeded by the invocation ID of the current invocation,
Expand Down
44 changes: 19 additions & 25 deletions packages/restate-sdk/src/endpoint/endpoint_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,21 @@
/* eslint-disable @typescript-eslint/no-explicit-any */

import { RestateEndpoint, ServiceBundle } from "../public_api";
import { ServiceDefinition, VirtualObjectDefinition } from "../types/rpc";
import {
HandlerWrapper,
Service,
ServiceDefinition,
VirtualObject,
VirtualObjectDefinition,
} from "../types/rpc";
import { rlog } from "../logger";
import http2, { Http2ServerRequest, Http2ServerResponse } from "http2";
import { Http2Handler } from "./http2_handler";
import { LambdaHandler } from "./lambda_handler";
import {
Component,
ServiceComponent,
ServiceHandlerFunction,
VirtualObjectHandlerFunction,
VritualObjectComponent,
VirtualObjectComponent,
} from "../types/components";

import * as discovery from "../types/discovery";
Expand Down Expand Up @@ -70,13 +74,13 @@ export class EndpointImpl implements RestateEndpoint {
if (isServiceDefinition(definition)) {
const { name, service } = definition;
if (!service) {
throw new TypeError(`no service implemention found.`);
throw new TypeError(`no service implementation found.`);
}
this.bindServiceComponent(name, service);
} else if (isObjectDefinition(definition)) {
const { name, object } = definition;
if (!object) {
throw new TypeError(`no object implemention found.`);
throw new TypeError(`no object implementation found.`);
}
this.bindVirtualObjectComponent(name, object);
} else {
Expand Down Expand Up @@ -179,43 +183,33 @@ export class EndpointImpl implements RestateEndpoint {
return deployment;
}

private bindServiceComponent(name: string, router: RpcRouter) {
private bindServiceComponent(name: string, router: Service<any>) {
if (name.indexOf("/") !== -1) {
throw new Error("service name must not contain any slash '/'");
}
const component = new ServiceComponent(name);

for (const [route, handler] of Object.entries(router)) {
/* eslint-disable @typescript-eslint/no-explicit-any */
const fn = handler as ServiceHandlerFunction<any, any>;
component.add({
name: route,
fn: fn.bind(router),
});
const wrapper = HandlerWrapper.fromHandler(handler);
wrapper.bindInstance(router);
component.add(route, wrapper);
}

this.addComponent(component);
}

private bindVirtualObjectComponent(name: string, router: RpcRouter) {
private bindVirtualObjectComponent(name: string, router: VirtualObject<any>) {
if (name.indexOf("/") !== -1) {
throw new Error("service name must not contain any slash '/'");
}
const component = new VritualObjectComponent(name);
const component = new VirtualObjectComponent(name);

for (const [route, handler] of Object.entries(router)) {
/* eslint-disable @typescript-eslint/no-explicit-any */
const fn = handler as VirtualObjectHandlerFunction<any, any>;
component.add({
name: route,
fn: fn.bind(router),
});
const wrapper = HandlerWrapper.fromHandler(handler);
wrapper.bindInstance(router);
component.add(route, wrapper);
}

this.addComponent(component);
}
}

export type RpcRouter = {
[key: string]: Function;
};
9 changes: 8 additions & 1 deletion packages/restate-sdk/src/public_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@
* https://github.com/restatedev/sdk-typescript/blob/main/LICENSE
*/

export { Context, ObjectContext, CombineablePromise, Rand } from "./context";
export {
Context,
ObjectContext,
ObjectSharedContext,
CombineablePromise,
Rand,
} from "./context";
export {
service,
object,
Expand All @@ -19,6 +25,7 @@ export {
VirtualObjectDefinition,
Client,
SendClient,
handlers,
} from "./types/rpc";

export { endpoint, ServiceBundle, RestateEndpoint } from "./endpoint";
Expand Down
118 changes: 61 additions & 57 deletions packages/restate-sdk/src/types/components.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
/* eslint-disable @typescript-eslint/ban-types */
/* eslint-disable @typescript-eslint/no-explicit-any */

import { Context, ObjectContext } from "../context";
import * as d from "./discovery";
import { ContextImpl } from "../context_impl";
import { deserializeJson, serializeJson } from "../utils/serde";
import { HandlerKind, HandlerWrapper } from "./rpc";

//
// Interfaces
Expand All @@ -36,16 +35,6 @@ export interface ComponentHandler {
// Service
//

export type ServiceHandlerFunction<I, O> = (
ctx: Context,
param: I
) => Promise<O>;

export type ServiceHandlerOpts<I, O> = {
name: string;
fn: ServiceHandlerFunction<I, O>;
};

export class ServiceComponent implements Component {
private readonly handlers: Map<string, ServiceHandler> = new Map();

Expand All @@ -55,17 +44,29 @@ export class ServiceComponent implements Component {
return this.componentName;
}

add<I, O>(opts: ServiceHandlerOpts<I, O>) {
const c = new ServiceHandler(opts, this);
this.handlers.set(opts.name, c);
add(name: string, handlerWrapper: HandlerWrapper) {
const serviceHandler = new ServiceHandler(name, handlerWrapper, this);
this.handlers.set(name, serviceHandler);
}

discovery(): d.Service {
const handlers: d.Handler[] = [...this.handlers.keys()].map((name) => {
return {
name,
};
});
const handlers: d.Handler[] = [...this.handlers.entries()].map(
([name, serviceHandler]) => {
return {
name,
input: {
required: false,
contentType:
serviceHandler.handlerWrapper.accept ?? "application/json",
},
output: {
setContentTypeIfEmpty: true,
contentType:
serviceHandler.handlerWrapper.contentType ?? "application/json",
},
};
}
);

return {
name: this.componentName,
Expand All @@ -82,18 +83,20 @@ export class ServiceComponent implements Component {
export class ServiceHandler implements ComponentHandler {
private readonly handlerName: string;
private readonly parent: ServiceComponent;
private readonly fn: ServiceHandlerFunction<any, any>;
public readonly handlerWrapper: HandlerWrapper;

constructor(opts: ServiceHandlerOpts<any, any>, parent: ServiceComponent) {
this.handlerName = opts.name;
constructor(
name: string,
handlerWrapper: HandlerWrapper,
parent: ServiceComponent
) {
this.handlerName = name;
this.parent = parent;
this.fn = opts.fn;
this.handlerWrapper = handlerWrapper;
}

async invoke(context: ContextImpl, input: Uint8Array): Promise<Uint8Array> {
const req = deserializeJson(input);
const res = await this.fn(context, req);
return serializeJson(res);
invoke(context: ContextImpl, input: Uint8Array): Promise<Uint8Array> {
return this.handlerWrapper.invoke(context, input);
}

name(): string {
Expand All @@ -108,36 +111,39 @@ export class ServiceHandler implements ComponentHandler {
// Virtual Object
//

export type VirtualObjectHandlerFunction<I, O> = (
ctx: ObjectContext,
param: I
) => Promise<O>;

export type VirtualObjectHandlerOpts<I, O> = {
name: string;
fn: VirtualObjectHandlerFunction<I, O>;
};

export class VritualObjectComponent implements Component {
private readonly opts: Map<string, VirtualObjectHandlerOpts<any, any>> =
new Map();
export class VirtualObjectComponent implements Component {
private readonly handlers: Map<string, HandlerWrapper> = new Map();

constructor(public readonly componentName: string) {}

name(): string {
return this.componentName;
}

add<I, O>(opts: VirtualObjectHandlerOpts<I, O>) {
this.opts.set(opts.name, opts as VirtualObjectHandlerOpts<any, any>);
add(name: string, wrapper: HandlerWrapper) {
this.handlers.set(name, wrapper);
}

discovery(): d.Service {
const handlers: d.Handler[] = [...this.opts.keys()].map((name) => {
return {
name,
};
});
const handlers: d.Handler[] = [...this.handlers.entries()].map(
([name, opts]) => {
return {
name,
input: {
required: false,
contentType: opts.accept ?? "application/json",
},
output: {
setContentTypeIfEmpty: true,
contentType: opts.contentType ?? "application/json",
},
ty:
opts.kind == HandlerKind.EXCLUSIVE
? d.ServiceHandlerType.EXCLUSIVE
: d.ServiceHandlerType.SHARED,
};
}
);

return {
name: this.componentName,
Expand All @@ -147,19 +153,19 @@ export class VritualObjectComponent implements Component {
}

handlerMatching(url: UrlPathComponents): ComponentHandler | undefined {
const opts = this.opts.get(url.handlerName);
if (!opts) {
const wrapper = this.handlers.get(url.handlerName);
if (!wrapper) {
return undefined;
}
return new VirtualObjectHandler(url.handlerName, this, opts);
return new VirtualObjectHandler(url.handlerName, this, wrapper);
}
}

export class VirtualObjectHandler implements ComponentHandler {
constructor(
private readonly componentName: string,
private readonly parent: VritualObjectComponent,
private readonly opts: VirtualObjectHandlerOpts<any, any>
private readonly parent: VirtualObjectComponent,
private readonly handlerWrapper: HandlerWrapper
) {}

name(): string {
Expand All @@ -169,10 +175,8 @@ export class VirtualObjectHandler implements ComponentHandler {
return this.parent;
}

async invoke(context: ContextImpl, input: Uint8Array): Promise<Uint8Array> {
const req = deserializeJson(input);
const res = await this.opts.fn(context, req);
return serializeJson(res);
invoke(context: ContextImpl, input: Uint8Array): Promise<Uint8Array> {
return this.handlerWrapper.invoke(context, input);
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/restate-sdk/src/types/discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export enum ServiceHandlerType {

type InputPayload = {
contentType: string;
empty: "ALLOW" | "DISALLOW" | "REQUIRE";
required: boolean;

/* eslint-disable-next-line @typescript-eslint/no-explicit-any */
jsonSchema?: any; // You should specify the type of jsonSchema if known
Expand Down
Loading