Skip to content

Commit

Permalink
feat: pub/sub support for bun and uws
Browse files Browse the repository at this point in the history
  • Loading branch information
pi0 committed Feb 24, 2024
1 parent 91edb54 commit a486f45
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 48 deletions.
20 changes: 8 additions & 12 deletions playground/_shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ export function createDemo<T extends WebSocketAdapter>(
);
},
open(peer) {
peer.send(`Hello ${peer}`);
peer.send("Welcome to the server!");
peer.subscribe("welcome");
peer.publish("welcome", `New user joined! ${peer}`);
},
message(peer, message) {
if (message.text() === "ping") {
Expand All @@ -38,17 +40,11 @@ export function createDemo<T extends WebSocketAdapter>(
const resolve: CrossWSOptions["resolve"] = (info) => {
return {
open: (peer) => {
peer.send(
JSON.stringify(
{
url: info.url,
headers:
info.headers && Object.fromEntries(new Headers(info.headers)),
},
undefined,
2,
),
);
peer.send({
url: info.url,
headers:
info.headers && Object.fromEntries(new Headers(info.headers)),
});
},
};
};
Expand Down
45 changes: 45 additions & 0 deletions src/_utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
type BufferLike = string | Buffer | Uint8Array | ArrayBuffer;

export function toBufferLike(val: any): BufferLike {
if (val === undefined || val === null) {
return "";
}

if (typeof val === "string") {
return val;
}

if (isPlainObject(val)) {
return JSON.stringify(val);
}

return val;
}

// Forked from sindresorhus/is-plain-obj (MIT)
// Copyright (c) Sindre Sorhus <sindresorhus@gmail.com> (https://sindresorhus.com)
// From https://github.com/unjs/defu/blob/main/src/_utils.ts
export function isPlainObject(value: unknown): boolean {
if (value === null || typeof value !== "object") {
return false;
}
const prototype = Object.getPrototypeOf(value);

if (
prototype !== null &&
prototype !== Object.prototype &&
Object.getPrototypeOf(prototype) !== null
) {
return false;
}

if (Symbol.iterator in value) {
return false;
}

if (Symbol.toStringTag in value) {
return Object.prototype.toString.call(value) === "[object Module]";
}

return true;
}
27 changes: 21 additions & 6 deletions src/adapters/bun.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import type { WebSocketHandler, ServerWebSocket, Server } from "bun";

import { WebSocketMessage } from "../message";
import { WebSocketError } from "../error";
import { WSMessage } from "../message";
import { WSPeer } from "../peer";
import { defineWebSocketAdapter } from "../adapter";
import { CrossWSOptions, createCrossWS } from "../crossws";
import { toBufferLike } from "../_utils";

export interface AdapterOptions extends CrossWSOptions {}

Expand Down Expand Up @@ -50,7 +50,7 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
message: (ws, message) => {
const peer = getWSPeer(ws);
crossws.$("bun:message", peer, ws, message);
crossws.message(peer, new WebSocketMessage(message));
crossws.message(peer, new WSMessage(message));
},
open: (ws) => {
const peer = getWSPeer(ws);
Expand Down Expand Up @@ -102,8 +102,23 @@ class BunWSPeer extends WSPeer<{
return this.ctx.bun.ws.data.req?.headers || new Headers();
}

send(message: string | ArrayBuffer) {
this.ctx.bun.ws.send(message);
return 0;
send(message: any, options?: { compress?: boolean }) {
return this.ctx.bun.ws.send(toBufferLike(message), options?.compress);
}

publish(topic: string, message: any, options?: { compress?: boolean }) {
return this.ctx.bun.ws.publish(
topic,
toBufferLike(message),
options?.compress,
);
}

subscribe(topic: string): void {
this.ctx.bun.ws.subscribe(topic);
}

unsubscribe(topic: string): void {
this.ctx.bun.ws.unsubscribe(topic);
}
}
9 changes: 5 additions & 4 deletions src/adapters/cloudflare.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import type * as _cf from "@cloudflare/workers-types";

import { WSPeer } from "../peer";
import { defineWebSocketAdapter } from "../adapter.js";
import { WebSocketMessage } from "../message";
import { WSMessage } from "../message";
import { WebSocketError } from "../error";
import { CrossWSOptions, createCrossWS } from "../crossws";
import { toBufferLike } from "../_utils";

type Env = Record<string, any>;

Expand Down Expand Up @@ -48,7 +49,7 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(

server.addEventListener("message", (event) => {
crossws.$("cloudflare:message", peer, event);
crossws.message(peer, new WebSocketMessage(event.data));
crossws.message(peer, new WSMessage(event.data));
});

server.addEventListener("error", (event) => {
Expand Down Expand Up @@ -100,8 +101,8 @@ class CloudflarePeer extends WSPeer<{
return this.ctx.cloudflare.client.readyState as -1 | 0 | 1 | 2 | 3;
}

send(message: string | ArrayBuffer) {
this.ctx.cloudflare.server.send(message);
send(message: any) {
this.ctx.cloudflare.server.send(toBufferLike(message));
return 0;
}
}
14 changes: 9 additions & 5 deletions src/adapters/deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
// https://deno.land/api?s=Deno.upgradeWebSocket
// https://examples.deno.land/http-server-websocket

import { WebSocketMessage } from "../message";
import { WSMessage } from "../message";
import { WebSocketError } from "../error";
import { WSPeer } from "../peer";
import { defineWebSocketAdapter } from "../adapter.js";
import { CrossWSOptions, createCrossWS } from "../crossws";
import { toBufferLike } from "../_utils";

export interface AdapterOptions extends CrossWSOptions {}

Expand All @@ -18,6 +19,8 @@ declare global {
const Deno: typeof import("@deno/types").Deno;
}

type WebSocketUpgrade = import("@deno/types").Deno.WebSocketUpgrade;

export default defineWebSocketAdapter<Adapter, AdapterOptions>(
(hooks, options = {}) => {
const crossws = createCrossWS(hooks, options);
Expand All @@ -43,7 +46,7 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
});
upgrade.socket.addEventListener("message", (event) => {
crossws.$("deno:message", peer, event);
crossws.message(peer, new WebSocketMessage(event.data));
crossws.message(peer, new WSMessage(event.data));
});
upgrade.socket.addEventListener("close", () => {
crossws.$("deno:close", peer);
Expand All @@ -63,9 +66,10 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
);

class DenoWSPeer extends WSPeer<{
deno: { ws: any; req: Request };
deno: { ws: WebSocketUpgrade["socket"]; req: Request };
}> {
get id() {
// @ts-expect-error types missing
return this.ctx.deno.ws.remoteAddress;
}

Expand All @@ -81,8 +85,8 @@ class DenoWSPeer extends WSPeer<{
return this.ctx.deno.req.headers || new Headers();
}

send(message: string | ArrayBuffer) {
this.ctx.deno.ws.send(message);
send(message: any) {
this.ctx.deno.ws.send(toBufferLike(message));
return 0;
}
}
13 changes: 9 additions & 4 deletions src/adapters/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ import type {
WebSocket as WebSocketT,
} from "../../types/ws";
import { WSPeer } from "../peer";
import { WebSocketMessage } from "../message";
import { WSMessage } from "../message";
import { WebSocketError } from "../error";
import { defineWebSocketAdapter } from "../adapter";
import { CrossWSOptions, createCrossWS } from "../crossws";
import { toBufferLike } from "../_utils";

export interface AdapterOptions extends CrossWSOptions {
wss?: WebSocketServer;
Expand Down Expand Up @@ -46,7 +47,7 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
if (Array.isArray(data)) {
data = Buffer.concat(data);
}
crossws.message(peer, new WebSocketMessage(data, isBinary));
crossws.message(peer, new WSMessage(data, isBinary));
});
ws.on("error", (error: Error) => {
crossws.$("node:error", peer, error);
Expand Down Expand Up @@ -137,8 +138,12 @@ class NodeWSPeer extends WSPeer<{
return this.ctx.node.ws.readyState;
}

send(message: string, compress?: boolean) {
this.ctx.node.ws.send(message, { compress });
send(message: any, options?: { compress?: boolean; binary?: boolean }) {
this.ctx.node.ws.send(toBufferLike(message), {
compress: options?.compress,
binary: options?.binary,
...options,
});
return 0;
}
}
25 changes: 21 additions & 4 deletions src/adapters/uws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import type {
HttpResponse,
} from "uWebSockets.js";
import { WSPeer } from "../peer";
import { WebSocketMessage } from "../message";
import { WSMessage } from "../message";
import { defineWebSocketAdapter } from "../adapter";
import { CrossWSOptions, createCrossWS } from "../crossws";
import { toBufferLike } from "../_utils";

type UserData = {
_peer?: any;
Expand Down Expand Up @@ -67,7 +68,7 @@ export default defineWebSocketAdapter<Adapter, AdapterOptions>(
message(ws, message, isBinary) {
const peer = getWSPeer(ws);
crossws.$("uws:message", peer, ws, message, isBinary);
const msg = new WebSocketMessage(message, isBinary);
const msg = new WSMessage(message, isBinary);
crossws.message(peer, msg);
},
open(ws) {
Expand Down Expand Up @@ -166,8 +167,24 @@ class UWSWSPeer extends WSPeer<{
return this._headers;
}

send(message: string, compress?: boolean) {
this.ctx.uws.ws.send(message, false, compress);
send(message: any, options?: { compress?: boolean; binary?: boolean }) {
return this.ctx.uws.ws.send(
toBufferLike(message),
options?.binary,
options?.compress,
);
}

subscribe(topic: string): void {
this.ctx.uws.ws.subscribe(topic);
}

publish(
topic: string,
message: string,
options?: { compress?: boolean; binary?: boolean },
) {
this.ctx.uws.ws.publish(topic, message, options?.binary, options?.compress);
return 0;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/hooks.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { WebSocketError } from "./error";
import type { WebSocketMessage } from "./message";
import type { WSMessage } from "./message";
import type { WSPeer, WSRequest } from "./peer";

type MaybePromise<T> = T | Promise<T>;
Expand Down Expand Up @@ -28,7 +28,7 @@ export interface WebSocketHooks {
upgrade: (req: WSRequest) => MaybePromise<void | { headers?: HeadersInit }>;

/** A message is received */
message: (peer: WSPeer, message: WebSocketMessage) => MaybePromise<void>;
message: (peer: WSPeer, message: WSMessage) => MaybePromise<void>;

/** A socket is opened */
open: (peer: WSPeer) => MaybePromise<void>;
Expand Down
16 changes: 11 additions & 5 deletions src/message.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
export class WebSocketMessage {
import { toBufferLike } from "./_utils";

export class WSMessage {
constructor(
public readonly rawData: string | ArrayBuffer | Uint8Array,
public readonly rawData: any,
public readonly isBinary?: boolean,
) {}

text(): string {
if (typeof this.rawData === "string") {
return this.rawData;
}
return new TextDecoder().decode(this.rawData);
const buff = toBufferLike(this.rawData);
if (typeof buff === "string") {
return buff;
}
return new TextDecoder().decode(buff);
}

toString() {
return `<WebSocketMessage: ${this.text()}>`;
return this.text();
}

[Symbol.for("nodejs.util.inspect.custom")]() {
return this.toString();
return this.text();
}
}
25 changes: 19 additions & 6 deletions src/peer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { WSMessage } from "./message";

// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState
type ReadyState = 0 | 1 | 2 | 3;
const ReadyStateMap = {
"-1": "unkown",
"-1": "unknown",
0: "connecting",
1: "open",
2: "closing",
Expand All @@ -14,6 +16,8 @@ export interface WSRequest {
}

export abstract class WSPeer<AdapterContext = any> implements WSRequest {
_subscriptions: Set<string> = new Set();

constructor(public ctx: AdapterContext) {}

get id(): string | undefined {
Expand All @@ -32,13 +36,22 @@ export abstract class WSPeer<AdapterContext = any> implements WSRequest {
return -1;
}

abstract send(
message: string | ArrayBuffer | Uint8Array,
compress?: boolean,
): number;
abstract send(message: any, options?: { compress?: boolean }): number;

publish(topic: string, message: any, options?: { compress?: boolean }) {
// noop
}

subscribe(topic: string) {
this._subscriptions.add(topic);
}

unsubscribe(topic: string) {
this._subscriptions.delete(topic);
}

toString() {
return `${this.id || ""}${this.readyState === 1 ? "" : ` [${ReadyStateMap[this.readyState]}]`}`;
return `${this.id || ""}${this.readyState === 1 || this.readyState === -1 ? "" : ` [${ReadyStateMap[this.readyState]}]`}`;
}

[Symbol.for("nodejs.util.inspect.custom")]() {
Expand Down

0 comments on commit a486f45

Please sign in to comment.