diff --git a/playground/_shared.ts b/playground/_shared.ts index efd75b0..bb27fb6 100644 --- a/playground/_shared.ts +++ b/playground/_shared.ts @@ -18,7 +18,9 @@ export function createDemo( ); }, open(peer) { - peer.send(`Hello ${peer}`); + peer.send("Welcome to the server!"); + peer.subscribe("welcome"); + peer.publish("welcome", `New user joined! ${peer}`); }, message(peer, message) { if (message.text() === "ping") { @@ -38,17 +40,11 @@ export function createDemo( const resolve: CrossWSOptions["resolve"] = (info) => { return { open: (peer) => { - peer.send( - JSON.stringify( - { - url: info.url, - headers: - info.headers && Object.fromEntries(new Headers(info.headers)), - }, - undefined, - 2, - ), - ); + peer.send({ + url: info.url, + headers: + info.headers && Object.fromEntries(new Headers(info.headers)), + }); }, }; }; diff --git a/src/_utils.ts b/src/_utils.ts new file mode 100644 index 0000000..ee87f0f --- /dev/null +++ b/src/_utils.ts @@ -0,0 +1,45 @@ +type BufferLike = string | Buffer | Uint8Array | ArrayBuffer; + +export function toBufferLike(val: any): BufferLike { + if (val === undefined || val === null) { + return ""; + } + + if (typeof val === "string") { + return val; + } + + if (isPlainObject(val)) { + return JSON.stringify(val); + } + + return val; +} + +// Forked from sindresorhus/is-plain-obj (MIT) +// Copyright (c) Sindre Sorhus (https://sindresorhus.com) +// From https://github.com/unjs/defu/blob/main/src/_utils.ts +export function isPlainObject(value: unknown): boolean { + if (value === null || typeof value !== "object") { + return false; + } + const prototype = Object.getPrototypeOf(value); + + if ( + prototype !== null && + prototype !== Object.prototype && + Object.getPrototypeOf(prototype) !== null + ) { + return false; + } + + if (Symbol.iterator in value) { + return false; + } + + if (Symbol.toStringTag in value) { + return Object.prototype.toString.call(value) === "[object Module]"; + } + + return true; +} diff --git a/src/adapters/bun.ts b/src/adapters/bun.ts index 91f5b96..a511e1d 100644 --- a/src/adapters/bun.ts +++ b/src/adapters/bun.ts @@ -2,11 +2,11 @@ import type { WebSocketHandler, ServerWebSocket, Server } from "bun"; -import { WebSocketMessage } from "../message"; -import { WebSocketError } from "../error"; +import { WSMessage } from "../message"; import { WSPeer } from "../peer"; import { defineWebSocketAdapter } from "../adapter"; import { CrossWSOptions, createCrossWS } from "../crossws"; +import { toBufferLike } from "../_utils"; export interface AdapterOptions extends CrossWSOptions {} @@ -50,7 +50,7 @@ export default defineWebSocketAdapter( message: (ws, message) => { const peer = getWSPeer(ws); crossws.$("bun:message", peer, ws, message); - crossws.message(peer, new WebSocketMessage(message)); + crossws.message(peer, new WSMessage(message)); }, open: (ws) => { const peer = getWSPeer(ws); @@ -102,8 +102,23 @@ class BunWSPeer extends WSPeer<{ return this.ctx.bun.ws.data.req?.headers || new Headers(); } - send(message: string | ArrayBuffer) { - this.ctx.bun.ws.send(message); - return 0; + send(message: any, options?: { compress?: boolean }) { + return this.ctx.bun.ws.send(toBufferLike(message), options?.compress); + } + + publish(topic: string, message: any, options?: { compress?: boolean }) { + return this.ctx.bun.ws.publish( + topic, + toBufferLike(message), + options?.compress, + ); + } + + subscribe(topic: string): void { + this.ctx.bun.ws.subscribe(topic); + } + + unsubscribe(topic: string): void { + this.ctx.bun.ws.unsubscribe(topic); } } diff --git a/src/adapters/cloudflare.ts b/src/adapters/cloudflare.ts index de25685..935bb03 100644 --- a/src/adapters/cloudflare.ts +++ b/src/adapters/cloudflare.ts @@ -4,9 +4,10 @@ import type * as _cf from "@cloudflare/workers-types"; import { WSPeer } from "../peer"; import { defineWebSocketAdapter } from "../adapter.js"; -import { WebSocketMessage } from "../message"; +import { WSMessage } from "../message"; import { WebSocketError } from "../error"; import { CrossWSOptions, createCrossWS } from "../crossws"; +import { toBufferLike } from "../_utils"; type Env = Record; @@ -48,7 +49,7 @@ export default defineWebSocketAdapter( server.addEventListener("message", (event) => { crossws.$("cloudflare:message", peer, event); - crossws.message(peer, new WebSocketMessage(event.data)); + crossws.message(peer, new WSMessage(event.data)); }); server.addEventListener("error", (event) => { @@ -100,8 +101,8 @@ class CloudflarePeer extends WSPeer<{ return this.ctx.cloudflare.client.readyState as -1 | 0 | 1 | 2 | 3; } - send(message: string | ArrayBuffer) { - this.ctx.cloudflare.server.send(message); + send(message: any) { + this.ctx.cloudflare.server.send(toBufferLike(message)); return 0; } } diff --git a/src/adapters/deno.ts b/src/adapters/deno.ts index 5a576e8..2f3423d 100644 --- a/src/adapters/deno.ts +++ b/src/adapters/deno.ts @@ -2,11 +2,12 @@ // https://deno.land/api?s=Deno.upgradeWebSocket // https://examples.deno.land/http-server-websocket -import { WebSocketMessage } from "../message"; +import { WSMessage } from "../message"; import { WebSocketError } from "../error"; import { WSPeer } from "../peer"; import { defineWebSocketAdapter } from "../adapter.js"; import { CrossWSOptions, createCrossWS } from "../crossws"; +import { toBufferLike } from "../_utils"; export interface AdapterOptions extends CrossWSOptions {} @@ -18,6 +19,8 @@ declare global { const Deno: typeof import("@deno/types").Deno; } +type WebSocketUpgrade = import("@deno/types").Deno.WebSocketUpgrade; + export default defineWebSocketAdapter( (hooks, options = {}) => { const crossws = createCrossWS(hooks, options); @@ -43,7 +46,7 @@ export default defineWebSocketAdapter( }); upgrade.socket.addEventListener("message", (event) => { crossws.$("deno:message", peer, event); - crossws.message(peer, new WebSocketMessage(event.data)); + crossws.message(peer, new WSMessage(event.data)); }); upgrade.socket.addEventListener("close", () => { crossws.$("deno:close", peer); @@ -63,9 +66,10 @@ export default defineWebSocketAdapter( ); class DenoWSPeer extends WSPeer<{ - deno: { ws: any; req: Request }; + deno: { ws: WebSocketUpgrade["socket"]; req: Request }; }> { get id() { + // @ts-expect-error types missing return this.ctx.deno.ws.remoteAddress; } @@ -81,8 +85,8 @@ class DenoWSPeer extends WSPeer<{ return this.ctx.deno.req.headers || new Headers(); } - send(message: string | ArrayBuffer) { - this.ctx.deno.ws.send(message); + send(message: any) { + this.ctx.deno.ws.send(toBufferLike(message)); return 0; } } diff --git a/src/adapters/node.ts b/src/adapters/node.ts index 02d23b3..ea1ac69 100644 --- a/src/adapters/node.ts +++ b/src/adapters/node.ts @@ -11,10 +11,11 @@ import type { WebSocket as WebSocketT, } from "../../types/ws"; import { WSPeer } from "../peer"; -import { WebSocketMessage } from "../message"; +import { WSMessage } from "../message"; import { WebSocketError } from "../error"; import { defineWebSocketAdapter } from "../adapter"; import { CrossWSOptions, createCrossWS } from "../crossws"; +import { toBufferLike } from "../_utils"; export interface AdapterOptions extends CrossWSOptions { wss?: WebSocketServer; @@ -46,7 +47,7 @@ export default defineWebSocketAdapter( if (Array.isArray(data)) { data = Buffer.concat(data); } - crossws.message(peer, new WebSocketMessage(data, isBinary)); + crossws.message(peer, new WSMessage(data, isBinary)); }); ws.on("error", (error: Error) => { crossws.$("node:error", peer, error); @@ -137,8 +138,12 @@ class NodeWSPeer extends WSPeer<{ return this.ctx.node.ws.readyState; } - send(message: string, compress?: boolean) { - this.ctx.node.ws.send(message, { compress }); + send(message: any, options?: { compress?: boolean; binary?: boolean }) { + this.ctx.node.ws.send(toBufferLike(message), { + compress: options?.compress, + binary: options?.binary, + ...options, + }); return 0; } } diff --git a/src/adapters/uws.ts b/src/adapters/uws.ts index 76de1dc..1c314b9 100644 --- a/src/adapters/uws.ts +++ b/src/adapters/uws.ts @@ -8,9 +8,10 @@ import type { HttpResponse, } from "uWebSockets.js"; import { WSPeer } from "../peer"; -import { WebSocketMessage } from "../message"; +import { WSMessage } from "../message"; import { defineWebSocketAdapter } from "../adapter"; import { CrossWSOptions, createCrossWS } from "../crossws"; +import { toBufferLike } from "../_utils"; type UserData = { _peer?: any; @@ -67,7 +68,7 @@ export default defineWebSocketAdapter( message(ws, message, isBinary) { const peer = getWSPeer(ws); crossws.$("uws:message", peer, ws, message, isBinary); - const msg = new WebSocketMessage(message, isBinary); + const msg = new WSMessage(message, isBinary); crossws.message(peer, msg); }, open(ws) { @@ -166,8 +167,24 @@ class UWSWSPeer extends WSPeer<{ return this._headers; } - send(message: string, compress?: boolean) { - this.ctx.uws.ws.send(message, false, compress); + send(message: any, options?: { compress?: boolean; binary?: boolean }) { + return this.ctx.uws.ws.send( + toBufferLike(message), + options?.binary, + options?.compress, + ); + } + + subscribe(topic: string): void { + this.ctx.uws.ws.subscribe(topic); + } + + publish( + topic: string, + message: string, + options?: { compress?: boolean; binary?: boolean }, + ) { + this.ctx.uws.ws.publish(topic, message, options?.binary, options?.compress); return 0; } } diff --git a/src/hooks.ts b/src/hooks.ts index 950c723..0b4906f 100644 --- a/src/hooks.ts +++ b/src/hooks.ts @@ -1,5 +1,5 @@ import { WebSocketError } from "./error"; -import type { WebSocketMessage } from "./message"; +import type { WSMessage } from "./message"; import type { WSPeer, WSRequest } from "./peer"; type MaybePromise = T | Promise; @@ -28,7 +28,7 @@ export interface WebSocketHooks { upgrade: (req: WSRequest) => MaybePromise; /** A message is received */ - message: (peer: WSPeer, message: WebSocketMessage) => MaybePromise; + message: (peer: WSPeer, message: WSMessage) => MaybePromise; /** A socket is opened */ open: (peer: WSPeer) => MaybePromise; diff --git a/src/message.ts b/src/message.ts index 43c6a0f..10d4363 100644 --- a/src/message.ts +++ b/src/message.ts @@ -1,6 +1,8 @@ -export class WebSocketMessage { +import { toBufferLike } from "./_utils"; + +export class WSMessage { constructor( - public readonly rawData: string | ArrayBuffer | Uint8Array, + public readonly rawData: any, public readonly isBinary?: boolean, ) {} @@ -8,14 +10,18 @@ export class WebSocketMessage { if (typeof this.rawData === "string") { return this.rawData; } - return new TextDecoder().decode(this.rawData); + const buff = toBufferLike(this.rawData); + if (typeof buff === "string") { + return buff; + } + return new TextDecoder().decode(buff); } toString() { - return ``; + return this.text(); } [Symbol.for("nodejs.util.inspect.custom")]() { - return this.toString(); + return this.text(); } } diff --git a/src/peer.ts b/src/peer.ts index c7f4021..9db9426 100644 --- a/src/peer.ts +++ b/src/peer.ts @@ -1,7 +1,9 @@ +import { WSMessage } from "./message"; + // https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState type ReadyState = 0 | 1 | 2 | 3; const ReadyStateMap = { - "-1": "unkown", + "-1": "unknown", 0: "connecting", 1: "open", 2: "closing", @@ -14,6 +16,8 @@ export interface WSRequest { } export abstract class WSPeer implements WSRequest { + _subscriptions: Set = new Set(); + constructor(public ctx: AdapterContext) {} get id(): string | undefined { @@ -32,13 +36,22 @@ export abstract class WSPeer implements WSRequest { return -1; } - abstract send( - message: string | ArrayBuffer | Uint8Array, - compress?: boolean, - ): number; + abstract send(message: any, options?: { compress?: boolean }): number; + + publish(topic: string, message: any, options?: { compress?: boolean }) { + // noop + } + + subscribe(topic: string) { + this._subscriptions.add(topic); + } + + unsubscribe(topic: string) { + this._subscriptions.delete(topic); + } toString() { - return `${this.id || ""}${this.readyState === 1 ? "" : ` [${ReadyStateMap[this.readyState]}]`}`; + return `${this.id || ""}${this.readyState === 1 || this.readyState === -1 ? "" : ` [${ReadyStateMap[this.readyState]}]`}`; } [Symbol.for("nodejs.util.inspect.custom")]() {