diff --git a/src/channels/channel.ts b/src/channels/channel.ts index bc1dd6b3..fadff304 100644 --- a/src/channels/channel.ts +++ b/src/channels/channel.ts @@ -1,6 +1,6 @@ -import { PresenceChannel } from './presence-channel'; -import { PrivateChannel } from './private-channel'; -import { Log } from './../log'; +import {PresenceChannel} from './presence-channel'; +import {PrivateChannel} from './private-channel'; +import {Log} from './../log'; export class Channel { /** @@ -63,9 +63,7 @@ export class Channel { if (this.isClientEvent(data.event) && this.isPrivate(data.channel) && this.isInChannel(socket, data.channel)) { - this.io.sockets.connected[socket.id] - .broadcast.to(data.channel) - .emit(data.event, data.channel, data.data); + this.presence.clientEvent({socket, ...data}); } } } diff --git a/src/channels/presence-channel.ts b/src/channels/presence-channel.ts index 7b618720..4e98145b 100644 --- a/src/channels/presence-channel.ts +++ b/src/channels/presence-channel.ts @@ -1,5 +1,8 @@ -import { Database } from './../database'; -import { Log } from './../log'; +import {Database} from './../database'; +import {Log} from './../log'; +import {Publisher} from "../publishers"; +import {PublisherFactory} from "../publishers/publisher-factory"; + var _ = require("lodash"); export class PresenceChannel { @@ -8,11 +11,14 @@ export class PresenceChannel { */ db: Database; + publisher: Publisher; + /** * Create a new Presence channel instance. */ constructor(private io, private options: any) { this.db = new Database(options); + this.publisher = new PublisherFactory(io).create(options); } /** @@ -141,16 +147,18 @@ export class PresenceChannel { * On join event handler. */ onJoin(socket: any, channel: string, member: any): void { - this.io.sockets.connected[socket.id].broadcast - .to(channel) - .emit("presence:joining", channel, member); + this.publisher.publish(channel, "presence:joining",{ + data: {member} + }); } /** * On leave emitter. */ onLeave(channel: string, member: any): void { - this.io.to(channel).emit("presence:leaving", channel, member); + this.publisher.publish(channel, "presence:leaving", { + data: {member} + }); } /** @@ -159,4 +167,8 @@ export class PresenceChannel { onSubscribed(socket: any, channel: string, members: any[]) { this.io.to(socket.id).emit("presence:subscribed", channel, members); } + + clientEvent(data) { + this.publisher.publish(data.channel, data.event, data); + } } diff --git a/src/channels/private-channel.ts b/src/channels/private-channel.ts index 35c030cc..1f5c8e74 100644 --- a/src/channels/private-channel.ts +++ b/src/channels/private-channel.ts @@ -1,6 +1,5 @@ let request = require('request'); let url = require('url'); -import { Channel } from './channel'; import { Log } from './../log'; export class PrivateChannel { diff --git a/src/echo-server.ts b/src/echo-server.ts index 069335c2..9fe7bd99 100644 --- a/src/echo-server.ts +++ b/src/echo-server.ts @@ -1,11 +1,11 @@ -import { HttpSubscriber, RedisSubscriber, Subscriber } from './subscribers'; -import { Channel } from './channels'; -import { Server } from './server'; -import { HttpApi } from './api'; -import { Log } from './log'; -import * as fs from 'fs'; +import {HttpSubscriber, RedisSubscriber, Subscriber} from './subscribers'; +import {Channel} from './channels'; +import {Server} from './server'; +import {HttpApi} from './api'; +import {Log} from './log'; + const packageFile = require('../package.json'); -const { constants } = require('crypto'); +const {constants} = require('crypto'); /** * Echo server class. @@ -75,7 +75,8 @@ export class EchoServer { /** * Create a new instance. */ - constructor() { } + constructor() { + } /** * Start the Echo Server. diff --git a/src/publishers/index.ts b/src/publishers/index.ts new file mode 100644 index 00000000..ee74e99a --- /dev/null +++ b/src/publishers/index.ts @@ -0,0 +1,2 @@ +export * from './redis-publisher'; +export * from './publisher'; diff --git a/src/publishers/io-publisher.ts b/src/publishers/io-publisher.ts new file mode 100644 index 00000000..a8b784fd --- /dev/null +++ b/src/publishers/io-publisher.ts @@ -0,0 +1,35 @@ +import {Publisher} from "./publisher"; + +export class IoPublisher implements Publisher { + constructor(private io) { + } + + publish(channel: string, event: string, data: any): Promise { + if (event === "presence:leaving") { + this.io + .to(channel) + .emit(event, data.data.member); + + return Promise.resolve(undefined); + } + + if (event === "presence:joining") { + this.io + .sockets + .connected[data.socket.id] + .broadcast + .to(channel) + .emit(event, data.data.member); + + return Promise.resolve(undefined); + } + + this.io.sockets + .connected[data.socket.id] + .broadcast + .to(channel) + .emit(event, channel, data.data); + + return Promise.resolve(undefined); + } +} diff --git a/src/publishers/publisher-factory.ts b/src/publishers/publisher-factory.ts new file mode 100644 index 00000000..816ddca0 --- /dev/null +++ b/src/publishers/publisher-factory.ts @@ -0,0 +1,14 @@ +import {Publisher} from "./publisher"; +import {RedisPublisher} from "./redis-publisher"; +import {IoPublisher} from "./io-publisher"; + +export class PublisherFactory { + public constructor(private io) { } + + public create(options: any): Publisher { + if (options.subscribers.redis) { + return new RedisPublisher(options); + } + return new IoPublisher(this.io); + } +} diff --git a/src/publishers/publisher.ts b/src/publishers/publisher.ts new file mode 100644 index 00000000..a484c77e --- /dev/null +++ b/src/publishers/publisher.ts @@ -0,0 +1,3 @@ +export interface Publisher { + publish(channel: string, event: string, data: any): Promise; +} diff --git a/src/publishers/redis-publisher.ts b/src/publishers/redis-publisher.ts new file mode 100644 index 00000000..4c3d97bf --- /dev/null +++ b/src/publishers/redis-publisher.ts @@ -0,0 +1,47 @@ +var Redis = require('ioredis'); +import {Publisher} from './publisher'; + +var _ = require("lodash"); + +export class RedisPublisher implements Publisher { + /** + * Redis pub/sub client. + * + * @type {object} + */ + private _redis: any; + + /** + * + * KeyPrefix for used in the redis Connection + * + * @type {String} + */ + private readonly _keyPrefix: string; + + /** + * Create a new instance of subscriber. + * + * @param {any} options + */ + constructor(private options) { + this._keyPrefix = options.databaseConfig.redis.keyPrefix || ''; + this._redis = new Redis(options.databaseConfig.redis); + } + + /** + * Subscribe to events to broadcast. + * + * @return {Promise} + */ + publish(channel: string, event: string, data: any): Promise { + return new Promise((resolve, reject) => { + try { + this._redis.publish(this._keyPrefix + channel, JSON.stringify({event, ..._.omit(data, ['socket'])})); + resolve(); + } catch (e) { + reject(e); + } + }); + } +}