From 3b95b697c06a7039b1acfc1f84bb7a9849a50fe5 Mon Sep 17 00:00:00 2001 From: lucap Date: Mon, 26 Jul 2021 17:58:56 +0200 Subject: [PATCH 1/6] use Redis for client events, to enable multi-server whisper --- src/channels/channel.ts | 4 +-- src/channels/presence-channel.ts | 31 ++++++++++++++++----- src/publishers/index.ts | 2 ++ src/publishers/publisher.ts | 3 +++ src/publishers/redis-publisher.ts | 45 +++++++++++++++++++++++++++++++ 5 files changed, 75 insertions(+), 10 deletions(-) create mode 100644 src/publishers/index.ts create mode 100644 src/publishers/publisher.ts create mode 100644 src/publishers/redis-publisher.ts diff --git a/src/channels/channel.ts b/src/channels/channel.ts index bc1dd6b3..1650d207 100644 --- a/src/channels/channel.ts +++ b/src/channels/channel.ts @@ -63,9 +63,7 @@ export class Channel { if (this.isClientEvent(data.event) && this.isPrivate(data.channel) && this.isInChannel(socket, data.channel)) { - this.io.sockets.connected[socket.id] - .broadcast.to(data.channel) - .emit(data.event, data.channel, data.data); + this.presence.clientEvent(data); } } } diff --git a/src/channels/presence-channel.ts b/src/channels/presence-channel.ts index 7b618720..c587796c 100644 --- a/src/channels/presence-channel.ts +++ b/src/channels/presence-channel.ts @@ -1,5 +1,8 @@ -import { Database } from './../database'; -import { Log } from './../log'; +import {Database} from './../database'; +import {Log} from './../log'; +import {RedisPublisher} from "../publishers/redis-publisher"; +import {Publisher} from "../publishers"; + var _ = require("lodash"); export class PresenceChannel { @@ -8,11 +11,14 @@ export class PresenceChannel { */ db: Database; + _redis: Publisher; + /** * Create a new Presence channel instance. */ constructor(private io, private options: any) { this.db = new Database(options); + this._redis = new RedisPublisher(options); } /** @@ -141,22 +147,33 @@ export class PresenceChannel { * On join event handler. */ onJoin(socket: any, channel: string, member: any): void { - this.io.sockets.connected[socket.id].broadcast - .to(channel) - .emit("presence:joining", channel, member); + this._redis.publish(channel, { + event: "presence:joining", + data: {member} + }); } /** * On leave emitter. */ onLeave(channel: string, member: any): void { - this.io.to(channel).emit("presence:leaving", channel, member); + this._redis.publish(channel, { + event: "presence:leaving", + data: {member} + }); } /** * On subscribed event emitter. */ onSubscribed(socket: any, channel: string, members: any[]) { - this.io.to(socket.id).emit("presence:subscribed", channel, members); + this._redis.publish(channel, { + event: "presence:subscribed", + data: {members} + }); + } + + clientEvent(data) { + this._redis.publish(data.channel, data); } } diff --git a/src/publishers/index.ts b/src/publishers/index.ts new file mode 100644 index 00000000..ee74e99a --- /dev/null +++ b/src/publishers/index.ts @@ -0,0 +1,2 @@ +export * from './redis-publisher'; +export * from './publisher'; diff --git a/src/publishers/publisher.ts b/src/publishers/publisher.ts new file mode 100644 index 00000000..9c6f5947 --- /dev/null +++ b/src/publishers/publisher.ts @@ -0,0 +1,3 @@ +export interface Publisher { + publish(channel: string, data: any): Promise; +} diff --git a/src/publishers/redis-publisher.ts b/src/publishers/redis-publisher.ts new file mode 100644 index 00000000..9d1bf3af --- /dev/null +++ b/src/publishers/redis-publisher.ts @@ -0,0 +1,45 @@ +var Redis = require('ioredis'); +import {Publisher} from './publisher'; + +export class RedisPublisher implements Publisher { + /** + * Redis pub/sub client. + * + * @type {object} + */ + private _redis: any; + + /** + * + * KeyPrefix for used in the redis Connection + * + * @type {String} + */ + private _keyPrefix: string; + + /** + * Create a new instance of subscriber. + * + * @param {any} options + */ + constructor(private options) { + this._keyPrefix = options.databaseConfig.redis.keyPrefix || ''; + this._redis = new Redis(options.databaseConfig.redis); + } + + /** + * Subscribe to events to broadcast. + * + * @return {Promise} + */ + publish(channel: string, data: any): Promise { + return new Promise((resolve, reject) => { + try { + this._redis.publish(channel, data); + resolve(); + } catch (e) { + reject(e); + } + }); + } +} From bc13f46f49686a432e972c6253c8e9a086e3539e Mon Sep 17 00:00:00 2001 From: lucap Date: Mon, 26 Jul 2021 18:27:27 +0200 Subject: [PATCH 2/6] KePrefix --- src/channels/presence-channel.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/channels/presence-channel.ts b/src/channels/presence-channel.ts index c587796c..69fb453c 100644 --- a/src/channels/presence-channel.ts +++ b/src/channels/presence-channel.ts @@ -13,12 +13,15 @@ export class PresenceChannel { _redis: Publisher; + _keyPrefix: string; + /** * Create a new Presence channel instance. */ constructor(private io, private options: any) { this.db = new Database(options); this._redis = new RedisPublisher(options); + this._keyPrefix = options.databaseConfig.redis.keyPrefix || ''; } /** @@ -148,7 +151,7 @@ export class PresenceChannel { */ onJoin(socket: any, channel: string, member: any): void { this._redis.publish(channel, { - event: "presence:joining", + event: `${this._keyPrefix}presence:joining`, data: {member} }); } @@ -158,7 +161,7 @@ export class PresenceChannel { */ onLeave(channel: string, member: any): void { this._redis.publish(channel, { - event: "presence:leaving", + event: `${this._keyPrefix}presence:leaving`, data: {member} }); } @@ -168,7 +171,7 @@ export class PresenceChannel { */ onSubscribed(socket: any, channel: string, members: any[]) { this._redis.publish(channel, { - event: "presence:subscribed", + event: `${this._keyPrefix}presence:subscribed`, data: {members} }); } From e41b4717001c210459401b8a2e7c836b28904e84 Mon Sep 17 00:00:00 2001 From: lucap Date: Tue, 27 Jul 2021 13:08:29 +0200 Subject: [PATCH 3/6] Create Publisher according to config --- src/channels/channel.ts | 8 +++---- src/channels/presence-channel.ts | 22 ++++++------------- src/channels/private-channel.ts | 1 - src/echo-server.ts | 17 ++++++++------- src/publishers/io-publisher.ts | 34 +++++++++++++++++++++++++++++ src/publishers/publisher-factory.ts | 15 +++++++++++++ src/publishers/publisher.ts | 2 +- src/publishers/redis-publisher.ts | 6 ++--- 8 files changed, 73 insertions(+), 32 deletions(-) create mode 100644 src/publishers/io-publisher.ts create mode 100644 src/publishers/publisher-factory.ts diff --git a/src/channels/channel.ts b/src/channels/channel.ts index 1650d207..fadff304 100644 --- a/src/channels/channel.ts +++ b/src/channels/channel.ts @@ -1,6 +1,6 @@ -import { PresenceChannel } from './presence-channel'; -import { PrivateChannel } from './private-channel'; -import { Log } from './../log'; +import {PresenceChannel} from './presence-channel'; +import {PrivateChannel} from './private-channel'; +import {Log} from './../log'; export class Channel { /** @@ -63,7 +63,7 @@ export class Channel { if (this.isClientEvent(data.event) && this.isPrivate(data.channel) && this.isInChannel(socket, data.channel)) { - this.presence.clientEvent(data); + this.presence.clientEvent({socket, ...data}); } } } diff --git a/src/channels/presence-channel.ts b/src/channels/presence-channel.ts index 69fb453c..4e98145b 100644 --- a/src/channels/presence-channel.ts +++ b/src/channels/presence-channel.ts @@ -1,7 +1,7 @@ import {Database} from './../database'; import {Log} from './../log'; -import {RedisPublisher} from "../publishers/redis-publisher"; import {Publisher} from "../publishers"; +import {PublisherFactory} from "../publishers/publisher-factory"; var _ = require("lodash"); @@ -11,17 +11,14 @@ export class PresenceChannel { */ db: Database; - _redis: Publisher; - - _keyPrefix: string; + publisher: Publisher; /** * Create a new Presence channel instance. */ constructor(private io, private options: any) { this.db = new Database(options); - this._redis = new RedisPublisher(options); - this._keyPrefix = options.databaseConfig.redis.keyPrefix || ''; + this.publisher = new PublisherFactory(io).create(options); } /** @@ -150,8 +147,7 @@ export class PresenceChannel { * On join event handler. */ onJoin(socket: any, channel: string, member: any): void { - this._redis.publish(channel, { - event: `${this._keyPrefix}presence:joining`, + this.publisher.publish(channel, "presence:joining",{ data: {member} }); } @@ -160,8 +156,7 @@ export class PresenceChannel { * On leave emitter. */ onLeave(channel: string, member: any): void { - this._redis.publish(channel, { - event: `${this._keyPrefix}presence:leaving`, + this.publisher.publish(channel, "presence:leaving", { data: {member} }); } @@ -170,13 +165,10 @@ export class PresenceChannel { * On subscribed event emitter. */ onSubscribed(socket: any, channel: string, members: any[]) { - this._redis.publish(channel, { - event: `${this._keyPrefix}presence:subscribed`, - data: {members} - }); + this.io.to(socket.id).emit("presence:subscribed", channel, members); } clientEvent(data) { - this._redis.publish(data.channel, data); + this.publisher.publish(data.channel, data.event, data); } } diff --git a/src/channels/private-channel.ts b/src/channels/private-channel.ts index 35c030cc..1f5c8e74 100644 --- a/src/channels/private-channel.ts +++ b/src/channels/private-channel.ts @@ -1,6 +1,5 @@ let request = require('request'); let url = require('url'); -import { Channel } from './channel'; import { Log } from './../log'; export class PrivateChannel { diff --git a/src/echo-server.ts b/src/echo-server.ts index 069335c2..9fe7bd99 100644 --- a/src/echo-server.ts +++ b/src/echo-server.ts @@ -1,11 +1,11 @@ -import { HttpSubscriber, RedisSubscriber, Subscriber } from './subscribers'; -import { Channel } from './channels'; -import { Server } from './server'; -import { HttpApi } from './api'; -import { Log } from './log'; -import * as fs from 'fs'; +import {HttpSubscriber, RedisSubscriber, Subscriber} from './subscribers'; +import {Channel} from './channels'; +import {Server} from './server'; +import {HttpApi} from './api'; +import {Log} from './log'; + const packageFile = require('../package.json'); -const { constants } = require('crypto'); +const {constants} = require('crypto'); /** * Echo server class. @@ -75,7 +75,8 @@ export class EchoServer { /** * Create a new instance. */ - constructor() { } + constructor() { + } /** * Start the Echo Server. diff --git a/src/publishers/io-publisher.ts b/src/publishers/io-publisher.ts new file mode 100644 index 00000000..7223f3b2 --- /dev/null +++ b/src/publishers/io-publisher.ts @@ -0,0 +1,34 @@ +import {Publisher} from "./publisher"; + +export class IoPublisher implements Publisher { + constructor(private io) { + } + + publish(channel: string, event: string, data: any): Promise { + if (event === "presence:leaving") { + this.io + .to(channel) + .emit(event, data.data.member); + + return Promise.resolve(undefined); + } + + if (event === "presence:joining") { + this.io + .sockets + .connected[data.socket.id] + .broadcast + .to(channel) + .emit(event, data.data.member); + return Promise.resolve(undefined); + } + + this.io.sockets + .connected[data.socket.id] + .broadcast + .to(channel) + .emit(event, channel, data.data); + + return Promise.resolve(undefined); + } +} diff --git a/src/publishers/publisher-factory.ts b/src/publishers/publisher-factory.ts new file mode 100644 index 00000000..477f7142 --- /dev/null +++ b/src/publishers/publisher-factory.ts @@ -0,0 +1,15 @@ +import {Publisher} from "./publisher"; +import {RedisPublisher} from "./redis-publisher"; +import {IoPublisher} from "./io-publisher"; + +export class PublisherFactory { + public constructor(private io) { + } + + public create(options: any): Publisher { + if (options.subscribers.redis === 'redis') { + return new RedisPublisher(options); + } + return new IoPublisher(this.io); + } +} diff --git a/src/publishers/publisher.ts b/src/publishers/publisher.ts index 9c6f5947..a484c77e 100644 --- a/src/publishers/publisher.ts +++ b/src/publishers/publisher.ts @@ -1,3 +1,3 @@ export interface Publisher { - publish(channel: string, data: any): Promise; + publish(channel: string, event: string, data: any): Promise; } diff --git a/src/publishers/redis-publisher.ts b/src/publishers/redis-publisher.ts index 9d1bf3af..68a74b59 100644 --- a/src/publishers/redis-publisher.ts +++ b/src/publishers/redis-publisher.ts @@ -15,7 +15,7 @@ export class RedisPublisher implements Publisher { * * @type {String} */ - private _keyPrefix: string; + private readonly _keyPrefix: string; /** * Create a new instance of subscriber. @@ -32,10 +32,10 @@ export class RedisPublisher implements Publisher { * * @return {Promise} */ - publish(channel: string, data: any): Promise { + publish(channel: string, event: string, data: any): Promise { return new Promise((resolve, reject) => { try { - this._redis.publish(channel, data); + this._redis.publish(this._keyPrefix + channel, JSON.stringify({event, ...data})); resolve(); } catch (e) { reject(e); From 47c93225e6e159b0679938d3c9afbad2e0e9c066 Mon Sep 17 00:00:00 2001 From: lucap Date: Tue, 27 Jul 2021 13:10:22 +0200 Subject: [PATCH 4/6] Fix options check --- src/publishers/publisher-factory.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/publishers/publisher-factory.ts b/src/publishers/publisher-factory.ts index 477f7142..f822fa12 100644 --- a/src/publishers/publisher-factory.ts +++ b/src/publishers/publisher-factory.ts @@ -7,7 +7,7 @@ export class PublisherFactory { } public create(options: any): Publisher { - if (options.subscribers.redis === 'redis') { + if (options.subscribers.redis) { return new RedisPublisher(options); } return new IoPublisher(this.io); From 149deed050afc2716e695f0a6622010a3c39537d Mon Sep 17 00:00:00 2001 From: lucap Date: Tue, 27 Jul 2021 13:13:51 +0200 Subject: [PATCH 5/6] Fix TypeError --- src/publishers/redis-publisher.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/publishers/redis-publisher.ts b/src/publishers/redis-publisher.ts index 68a74b59..4c3d97bf 100644 --- a/src/publishers/redis-publisher.ts +++ b/src/publishers/redis-publisher.ts @@ -1,6 +1,8 @@ var Redis = require('ioredis'); import {Publisher} from './publisher'; +var _ = require("lodash"); + export class RedisPublisher implements Publisher { /** * Redis pub/sub client. @@ -35,7 +37,7 @@ export class RedisPublisher implements Publisher { publish(channel: string, event: string, data: any): Promise { return new Promise((resolve, reject) => { try { - this._redis.publish(this._keyPrefix + channel, JSON.stringify({event, ...data})); + this._redis.publish(this._keyPrefix + channel, JSON.stringify({event, ..._.omit(data, ['socket'])})); resolve(); } catch (e) { reject(e); From 48f59fbd62eede74a02b37fed521e9d3b4c2395a Mon Sep 17 00:00:00 2001 From: lucap Date: Tue, 27 Jul 2021 14:26:53 +0200 Subject: [PATCH 6/6] Whitespace --- src/publishers/io-publisher.ts | 1 + src/publishers/publisher-factory.ts | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/publishers/io-publisher.ts b/src/publishers/io-publisher.ts index 7223f3b2..a8b784fd 100644 --- a/src/publishers/io-publisher.ts +++ b/src/publishers/io-publisher.ts @@ -20,6 +20,7 @@ export class IoPublisher implements Publisher { .broadcast .to(channel) .emit(event, data.data.member); + return Promise.resolve(undefined); } diff --git a/src/publishers/publisher-factory.ts b/src/publishers/publisher-factory.ts index f822fa12..816ddca0 100644 --- a/src/publishers/publisher-factory.ts +++ b/src/publishers/publisher-factory.ts @@ -3,8 +3,7 @@ import {RedisPublisher} from "./redis-publisher"; import {IoPublisher} from "./io-publisher"; export class PublisherFactory { - public constructor(private io) { - } + public constructor(private io) { } public create(options: any): Publisher { if (options.subscribers.redis) {