diff --git a/docs/1.guide/5.pubsub.md b/docs/1.guide/5.pubsub.md index 38f2d08..18ae4cb 100644 --- a/docs/1.guide/5.pubsub.md +++ b/docs/1.guide/5.pubsub.md @@ -7,7 +7,7 @@ icon: simple-icons:googlepubsub CrossWS supports native pub-sub API integration. A [peer](/guide/peer) can be subscribed to a set of named channels using `peer.subscribe()`. Messages can be published tro a channel using `peer.publish(, )`. > [!IMPORTANT] -> Native pub/sub is currently only available for [Bun](/adapters/bun) and [Node.js with uWebSockets](http://localhost:4000/adapters/node#uwebsockets). +> Native pub/sub is currently only available for [Bun](/adapters/bun) and [Node.js](http://localhost:4000/adapters/node#uwebsockets). ```js import { defineHooks } from "crossws"; diff --git a/src/adapters/node.ts b/src/adapters/node.ts index 6b20787..ed4591f 100644 --- a/src/adapters/node.ts +++ b/src/adapters/node.ts @@ -111,9 +111,14 @@ class NodePeer extends Peer<{ node: { server: WebSocketServer; req: IncomingMessage; - ws: WebSocketT; + ws: WebSocketT & { _peer?: NodePeer }; }; }> { + constructor(ctx: NodePeer["ctx"]) { + super(ctx); + ctx.node.ws._peer = this; + } + get addr() { const socket = this.ctx.node.req.socket; if (!socket) { @@ -148,4 +153,14 @@ class NodePeer extends Peer<{ }); return 0; } + + publish(topic: string, message: any): void { + message = toBufferLike(message); + for (const client of this.ctx.node.server.clients) { + const peer = (client as WebSocketT & { _peer?: NodePeer })._peer; + if (peer && peer !== this && peer._subscriptions.has(topic)) { + peer.send(message); + } + } + } } diff --git a/src/peer.ts b/src/peer.ts index 045f400..a5d6b48 100644 --- a/src/peer.ts +++ b/src/peer.ts @@ -11,7 +11,7 @@ const ReadyStateMap = { } as const; export abstract class Peer implements WSRequest { - private _subscriptions: Set = new Set(); + _subscriptions: Set = new Set(); static _idCounter = 0; private _id: string;