Skip to content

Commit

Permalink
feat: Add new WebSocket link function based on universal message-base…
Browse files Browse the repository at this point in the history
…d linker
  • Loading branch information
pojntfx committed Dec 29, 2023
1 parent 8a9efb6 commit e022fe4
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 48 deletions.
3 changes: 2 additions & 1 deletion ts/ltsrpc-example-websocket-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ await new Promise<void>((res, rej) => {
socket.addEventListener("error", rej);
});

const remote = linkWebSocket(
const { remote, close } = linkWebSocket(
socket,

{
Expand All @@ -39,6 +39,7 @@ const remote = linkWebSocket(
(v) => v,
(v) => v
);
socket.addEventListener("close", close);

console.log("Connected to", raddr);

Expand Down
184 changes: 137 additions & 47 deletions ts/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,75 @@ interface ICallResponse {
err: string;
}

export interface IRequestResponseReader<T> {
on(event: "request", listener: (message: T) => void): this;
on(event: "response", listener: (message: T) => void): this;
}

class WebSocketRequestResponseReader<T> implements IRequestResponseReader<T> {
private socket: WebSocket;

private handler: (event: MessageEvent<string | Buffer>) => any;

private requestListener?: (message: T) => void;

private responseListener?: (message: T) => void;

constructor(socket: WebSocket, parse: (text: string) => any) {
this.socket = socket;

this.handler = (event: MessageEvent<string | Buffer>) => {
const msg = unmarshalMessage<T>(event.data as string, parse);

if (msg.request) {
this.requestListener?.(msg.request);
} else if (msg.response) {
this.responseListener?.(msg.response);
}
};
}

open() {
this.socket.addEventListener("message", this.handler);
}

close() {
this.socket.removeEventListener("message", this.handler);
}

on(event: "request" | "response", listener: (message: T) => void): this {
if (event === "request") {
this.requestListener = listener;
} else if (event === "response") {
this.responseListener = listener;
}

return this;
}
}

/**
* Expose local functions and link remote ones to a WebSocket
* Expose local functions and link remote ones to a message-based transport
* @param local Local functions to explose
* @param remote Remote functions to implement
* @param writeRequest Function to write a request
* @param writeResponse Function to write a response
* @param requestResponseReader Emitter to read requests and responses
* @param stringify Function to marshal a message
* @param parse Function to unmarshal a message
* @returns Remote functions
*/
export const linkWebSocket = <L extends ILocal, R extends IRemote, T>(
socket: WebSocket,

export const linkMessage = <L extends ILocal, R extends IRemote, T>(
local: L,
remote: R,

stringify: (value: any) => string,
parse: (text: string) => any,
writeRequest: (text: T) => Promise<any>,
writeResponse: (text: T) => Promise<any>,

stringifyNested: (value: any) => T,
parseNested: (text: T) => any
requestResponseReader: IRequestResponseReader<T>,

stringify: (value: any) => T,
parse: (text: T) => any
) => {
const broker = new EventEmitter();

Expand Down Expand Up @@ -88,60 +142,96 @@ export const linkWebSocket = <L extends ILocal, R extends IRemote, T>(
};
broker.addListener(`rpc:${id}`, returnListener);

socket.send(
marshalMessage<T>(
marshalRequest<T>(id, functionName, rest, stringifyNested),
undefined,
stringify
)
);
writeRequest(
marshalRequest<T>(id, functionName, rest, stringify)
).catch(rej);
});
}

socket.addEventListener("message", async (event) => {
const msg = unmarshalMessage<T>(event.data as string, parse);
requestResponseReader.on("request", async (message) => {
const { call, functionName, args } = unmarshalRequest<T>(message, parse);

if (msg.request) {
const { call, functionName, args } = unmarshalRequest<T>(
msg.request,
parseNested
);
let res: T;
try {
const ctx: ILocalContext = { remoteID: "1" }; // TODO: Use remote-unique ID here

let res: T;
try {
const ctx: ILocalContext = { remoteID: "1" }; // TODO: Use remote-unique ID here
const rv = await (local as any)[functionName](ctx, ...args);

const rv = await (local as any)[functionName](ctx, ...args);
res = marshalResponse<T>(call, rv, "", stringify);
} catch (e) {
res = marshalResponse<T>(
call,
undefined,
(e as Error).message,
stringify
);
}

res = marshalResponse<T>(call, rv, "", stringifyNested);
} catch (e) {
res = marshalResponse<T>(
call,
undefined,
(e as Error).message,
stringifyNested
);
}
await writeResponse(res);
});

socket.send(marshalMessage<T>(undefined, res, stringify));
} else if (msg.response) {
const { call, value, err } = unmarshalResponse<T>(
msg.response,
parseNested
);
requestResponseReader.on("response", async (message) => {
const { call, value, err } = unmarshalResponse<T>(message, parse);

const callResponse: ICallResponse = {
value,
err,
};
const callResponse: ICallResponse = {
value,
err,
};

broker.emit(`rpc:${call}`, callResponse);
}
broker.emit(`rpc:${call}`, callResponse);
});

return r;
};

/**
* Expose local functions and link remote ones to a WebSocket
* @param socket Socket to link functions to
* @param local Local functions to expose
* @param remote Remote functions to implement
* @param stringify Function to marshal a message
* @param parse Function to unmarshal a message
* @param stringifyNested Function to marshal a nested message
* @param parseNested Function to unmarshal a nested message
* @returns Remote functions
*/
export const linkWebSocket = <L extends ILocal, R extends IRemote, T>(
socket: WebSocket,

local: L,
remote: R,

stringify: (value: any) => string,
parse: (text: string) => any,

stringifyNested: (value: any) => T,
parseNested: (text: T) => any
) => {
const requestResponseReceiver = new WebSocketRequestResponseReader<T>(
socket,
parse
);
requestResponseReceiver.open();

return {
remote: linkMessage(
local,
remote,

async (text: T) =>
socket.send(marshalMessage<T>(text, undefined, stringify)),
async (text: T) =>
socket.send(marshalMessage<T>(undefined, text, stringify)),

requestResponseReceiver,

stringifyNested,
parseNested
),
close: requestResponseReceiver.close,
};
};

/**
* Expose local functions and link remote ones to a TCP socket
* @param socket TCP socket to use
Expand Down

0 comments on commit e022fe4

Please sign in to comment.