Skip to content

Commit

Permalink
feat: basic pubsub support for node
Browse files Browse the repository at this point in the history
  • Loading branch information
pi0 committed Feb 26, 2024
1 parent 9f7e1f0 commit 4bd61ca
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 3 deletions.
2 changes: 1 addition & 1 deletion docs/1.guide/5.pubsub.md
Expand Up @@ -7,7 +7,7 @@ icon: simple-icons:googlepubsub
CrossWS supports native pub-sub API integration. A [peer](/guide/peer) can be subscribed to a set of named channels using `peer.subscribe(<name>)`. Messages can be published tro a channel using `peer.publish(<name>, <message>)`.

> [!IMPORTANT]
> Native pub/sub is currently only available for [Bun](/adapters/bun) and [Node.js with uWebSockets](http://localhost:4000/adapters/node#uwebsockets).
> Native pub/sub is currently only available for [Bun](/adapters/bun) and [Node.js](http://localhost:4000/adapters/node#uwebsockets).
```js
import { defineHooks } from "crossws";
Expand Down
17 changes: 16 additions & 1 deletion src/adapters/node.ts
Expand Up @@ -111,9 +111,14 @@ class NodePeer extends Peer<{
node: {
server: WebSocketServer;
req: IncomingMessage;
ws: WebSocketT;
ws: WebSocketT & { _peer?: NodePeer };
};
}> {
constructor(ctx: NodePeer["ctx"]) {
super(ctx);
ctx.node.ws._peer = this;
}

get addr() {
const socket = this.ctx.node.req.socket;
if (!socket) {
Expand Down Expand Up @@ -148,4 +153,14 @@ class NodePeer extends Peer<{
});
return 0;
}

publish(topic: string, message: any): void {
message = toBufferLike(message);
for (const client of this.ctx.node.server.clients) {
const peer = (client as WebSocketT & { _peer?: NodePeer })._peer;
if (peer && peer !== this && peer._subscriptions.has(topic)) {
peer.send(message);
}
}
}
}
2 changes: 1 addition & 1 deletion src/peer.ts
Expand Up @@ -11,7 +11,7 @@ const ReadyStateMap = {
} as const;

export abstract class Peer<AdapterContext = any> implements WSRequest {
private _subscriptions: Set<string> = new Set();
_subscriptions: Set<string> = new Set();

static _idCounter = 0;
private _id: string;
Expand Down

0 comments on commit 4bd61ca

Please sign in to comment.