diff --git a/redisinsight/api/src/app.module.ts b/redisinsight/api/src/app.module.ts index 9ad75e0129..fe24271c1c 100644 --- a/redisinsight/api/src/app.module.ts +++ b/redisinsight/api/src/app.module.ts @@ -12,6 +12,7 @@ import { PluginModule } from 'src/modules/plugin/plugin.module'; import { CommandsModule } from 'src/modules/commands/commands.module'; import { WorkbenchModule } from 'src/modules/workbench/workbench.module'; import { SlowLogModule } from 'src/modules/slow-log/slow-log.module'; +import { PubSubModule } from 'src/modules/pub-sub/pub-sub.module'; import { SharedModule } from './modules/shared/shared.module'; import { InstancesModule } from './modules/instances/instances.module'; import { BrowserModule } from './modules/browser/browser.module'; @@ -43,6 +44,7 @@ const PATH_CONFIG = config.get('dir_path'); PluginModule, CommandsModule, ProfilerModule, + PubSubModule, SlowLogModule, EventEmitterModule.forRoot(), ...(SERVER_CONFIG.staticContent diff --git a/redisinsight/api/src/app.routes.ts b/redisinsight/api/src/app.routes.ts index 97ccd5e98f..2c529122b7 100644 --- a/redisinsight/api/src/app.routes.ts +++ b/redisinsight/api/src/app.routes.ts @@ -6,6 +6,7 @@ import { RedisSentinelModule } from 'src/modules/redis-sentinel/redis-sentinel.m import { CliModule } from 'src/modules/cli/cli.module'; import { WorkbenchModule } from 'src/modules/workbench/workbench.module'; import { SlowLogModule } from 'src/modules/slow-log/slow-log.module'; +import { PubSubModule } from 'src/modules/pub-sub/pub-sub.module'; export const routes: Routes = [ { @@ -28,6 +29,10 @@ export const routes: Routes = [ path: '/:dbInstance', module: SlowLogModule, }, + { + path: '/:dbInstance', + module: PubSubModule, + }, ], }, { diff --git a/redisinsight/api/src/modules/cli/utils/getUnsupportedCommands.spec.ts b/redisinsight/api/src/modules/cli/utils/getUnsupportedCommands.spec.ts index 5617d3eb8c..887f5963ca 100644 --- a/redisinsight/api/src/modules/cli/utils/getUnsupportedCommands.spec.ts +++ b/redisinsight/api/src/modules/cli/utils/getUnsupportedCommands.spec.ts @@ -2,7 +2,7 @@ import { getUnsupportedCommands } from './getUnsupportedCommands'; describe('cli unsupported commands', () => { it('should return correct list', () => { - const expectedResult = ['monitor', 'subscribe', 'psubscribe', 'sync', 'psync', 'script debug']; + const expectedResult = ['monitor', 'subscribe', 'psubscribe', 'ssubscribe', 'sync', 'psync', 'script debug']; expect(getUnsupportedCommands()).toEqual(expectedResult); }); diff --git a/redisinsight/api/src/modules/cli/utils/getUnsupportedCommands.ts b/redisinsight/api/src/modules/cli/utils/getUnsupportedCommands.ts index 75fed48aa0..119cf4da1a 100644 --- a/redisinsight/api/src/modules/cli/utils/getUnsupportedCommands.ts +++ b/redisinsight/api/src/modules/cli/utils/getUnsupportedCommands.ts @@ -6,6 +6,7 @@ export enum CliToolUnsupportedCommands { Monitor = 'monitor', Subscribe = 'subscribe', PSubscribe = 'psubscribe', + SSubscribe = 'ssubscribe', Sync = 'sync', PSync = 'psync', ScriptDebug = 'script debug', diff --git a/redisinsight/api/src/modules/profiler/models/log-file.ts b/redisinsight/api/src/modules/profiler/models/log-file.ts index 04fb389ca1..a4f34b66c5 100644 --- a/redisinsight/api/src/modules/profiler/models/log-file.ts +++ b/redisinsight/api/src/modules/profiler/models/log-file.ts @@ -124,7 +124,7 @@ export class LogFile { this.writeStream?.close(); this.writeStream = null; const size = this.getFileSize(); - fs.unlink(this.filePath); + fs.unlinkSync(this.filePath); this.analyticsEvents.get(TelemetryEvents.ProfilerLogDeleted)(this.instanceId, size); } catch (e) { diff --git a/redisinsight/api/src/modules/pub-sub/constants/index.ts b/redisinsight/api/src/modules/pub-sub/constants/index.ts new file mode 100644 index 0000000000..666e2fdfb7 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/constants/index.ts @@ -0,0 +1,28 @@ +export enum PubSubClientEvents { + Subscribe = 'subscribe', + Unsubscribe = 'unsubscribe', +} + +export enum PubSubServerEvents { + Exception = 'exception', +} + +export enum SubscriptionType { + Subscribe = 's', + PSubscribe = 'p', + SSubscribe = 'ss', +} + +export enum RedisClientStatus { + Connecting = 'connecting', + Connected = 'connected', + Error = 'error', + End = 'end', +} + +export enum RedisClientEvents { + Connected = 'connected', + ConnectionError = 'connection_error', + Message = 'message', + End = 'end', +} diff --git a/redisinsight/api/src/modules/pub-sub/decorators/client.decorator.ts b/redisinsight/api/src/modules/pub-sub/decorators/client.decorator.ts new file mode 100644 index 0000000000..2c78dd897e --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/decorators/client.decorator.ts @@ -0,0 +1,11 @@ +import { get } from 'lodash'; +import { createParamDecorator, ExecutionContext } from '@nestjs/common'; +import { UserClient } from 'src/modules/pub-sub/model/user-client'; + +export const Client = createParamDecorator( + (data: unknown, ctx: ExecutionContext): UserClient => { + const socket = ctx.switchToWs().getClient(); + + return new UserClient(socket.id, socket, get(socket, 'handshake.query.instanceId')); + }, +); diff --git a/redisinsight/api/src/modules/pub-sub/dto/index.ts b/redisinsight/api/src/modules/pub-sub/dto/index.ts new file mode 100644 index 0000000000..088b5a1e98 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/dto/index.ts @@ -0,0 +1,3 @@ +export * from './subscribe.dto'; +export * from './subscription.dto'; +export * from './messages.response'; diff --git a/redisinsight/api/src/modules/pub-sub/dto/messages.response.ts b/redisinsight/api/src/modules/pub-sub/dto/messages.response.ts new file mode 100644 index 0000000000..e02ea99aa4 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/dto/messages.response.ts @@ -0,0 +1,7 @@ +import { IMessage } from 'src/modules/pub-sub/interfaces/message.interface'; + +export class MessagesResponse { + messages: IMessage[]; + + count: number; +} diff --git a/redisinsight/api/src/modules/pub-sub/dto/publish.dto.ts b/redisinsight/api/src/modules/pub-sub/dto/publish.dto.ts new file mode 100644 index 0000000000..b352f54394 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/dto/publish.dto.ts @@ -0,0 +1,24 @@ +import { + IsNotEmpty, IsString, +} from 'class-validator'; +import { ApiProperty } from '@nestjs/swagger'; + +export class PublishDto { + @ApiProperty({ + type: String, + description: 'Message to send', + example: '{"hello":"world"}', + }) + @IsNotEmpty() + @IsString() + message: string; + + @ApiProperty({ + type: String, + description: 'Chanel name', + example: 'channel-1', + }) + @IsNotEmpty() + @IsString() + channel: string; +} diff --git a/redisinsight/api/src/modules/pub-sub/dto/publish.response.ts b/redisinsight/api/src/modules/pub-sub/dto/publish.response.ts new file mode 100644 index 0000000000..85b3fbbd39 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/dto/publish.response.ts @@ -0,0 +1,3 @@ +export class PublishResponse { + affected: number; +} diff --git a/redisinsight/api/src/modules/pub-sub/dto/subscribe.dto.ts b/redisinsight/api/src/modules/pub-sub/dto/subscribe.dto.ts new file mode 100644 index 0000000000..e1f32af388 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/dto/subscribe.dto.ts @@ -0,0 +1,13 @@ +import { + ArrayNotEmpty, IsArray, ValidateNested, +} from 'class-validator'; +import { Type } from 'class-transformer'; +import { SubscriptionDto } from './subscription.dto'; + +export class SubscribeDto { + @IsArray() + @ArrayNotEmpty() + @ValidateNested({ each: true }) + @Type(() => SubscriptionDto) + subscriptions: SubscriptionDto[]; +} diff --git a/redisinsight/api/src/modules/pub-sub/dto/subscription.dto.ts b/redisinsight/api/src/modules/pub-sub/dto/subscription.dto.ts new file mode 100644 index 0000000000..d3695113a7 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/dto/subscription.dto.ts @@ -0,0 +1,12 @@ +import { SubscriptionType } from 'src/modules/pub-sub/constants'; +import { IsEnum, IsNotEmpty, IsString } from 'class-validator'; + +export class SubscriptionDto { + @IsNotEmpty() + @IsString() + channel: string; + + @IsNotEmpty() + @IsEnum(SubscriptionType) + type: SubscriptionType; +} diff --git a/redisinsight/api/src/modules/pub-sub/errors/pub-sub-ws.exception.ts b/redisinsight/api/src/modules/pub-sub/errors/pub-sub-ws.exception.ts new file mode 100644 index 0000000000..9cd10c52f3 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/errors/pub-sub-ws.exception.ts @@ -0,0 +1,26 @@ +import { HttpException } from '@nestjs/common'; +import { isString } from 'lodash'; + +export class PubSubWsException extends Error { + status: number; + + name: string; + + constructor(err: Error | string) { + super(); + this.status = 500; + this.message = 'Internal server error'; + this.name = this.constructor.name; + + if (isString(err)) { + this.message = err; + } else if (err instanceof HttpException) { + this.message = (err.getResponse())['message']; + this.status = err.getStatus(); + this.name = err.constructor.name; + } else if (err instanceof Error) { + this.message = err.message; + this.name = 'Error'; + } + } +} diff --git a/redisinsight/api/src/modules/pub-sub/filters/ack-ws-exception.filter.ts b/redisinsight/api/src/modules/pub-sub/filters/ack-ws-exception.filter.ts new file mode 100644 index 0000000000..77f853633d --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/filters/ack-ws-exception.filter.ts @@ -0,0 +1,18 @@ +import { + ArgumentsHost, Catch, HttpException, +} from '@nestjs/common'; +import { PubSubWsException } from 'src/modules/pub-sub/errors/pub-sub-ws.exception'; + +@Catch() +export class AckWsExceptionFilter { + public catch(exception: HttpException, host: ArgumentsHost) { + const callback = host.getArgByIndex(2); + this.handleError(callback, exception); + } + + public handleError(callback: any, exception: Error) { + if (callback && typeof callback === 'function') { + callback({ status: 'error', error: new PubSubWsException(exception) }); + } + } +} diff --git a/redisinsight/api/src/modules/pub-sub/interfaces/message.interface.ts b/redisinsight/api/src/modules/pub-sub/interfaces/message.interface.ts new file mode 100644 index 0000000000..5947222651 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/interfaces/message.interface.ts @@ -0,0 +1,7 @@ +export interface IMessage { + message: string; + + channel: string; + + time: number; +} diff --git a/redisinsight/api/src/modules/pub-sub/interfaces/subscription.interface.ts b/redisinsight/api/src/modules/pub-sub/interfaces/subscription.interface.ts new file mode 100644 index 0000000000..f266a1920a --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/interfaces/subscription.interface.ts @@ -0,0 +1,16 @@ +import * as IORedis from 'ioredis'; +import { IMessage } from 'src/modules/pub-sub/interfaces/message.interface'; + +export interface ISubscription { + getId(): string; + + getChannel(): string; + + getType(): string; + + pushMessage(message: IMessage): void; + + subscribe(client: IORedis.Redis | IORedis.Cluster): Promise; + + unsubscribe(client: IORedis.Redis | IORedis.Cluster): Promise; +} diff --git a/redisinsight/api/src/modules/pub-sub/model/abstract.subscription.ts b/redisinsight/api/src/modules/pub-sub/model/abstract.subscription.ts new file mode 100644 index 0000000000..97ce4cac06 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/model/abstract.subscription.ts @@ -0,0 +1,73 @@ +import { debounce } from 'lodash'; +import { SubscriptionType } from 'src/modules/pub-sub/constants'; +import { UserClient } from 'src/modules/pub-sub/model/user-client'; +import { MessagesResponse, SubscriptionDto } from 'src/modules/pub-sub/dto'; +import { ISubscription } from 'src/modules/pub-sub/interfaces/subscription.interface'; +import { IMessage } from 'src/modules/pub-sub/interfaces/message.interface'; +import * as IORedis from 'ioredis'; + +const EMIT_WAIT = 30; +const EMIT_MAX_WAIT = 100; +const MESSAGES_MAX = 5000; + +export abstract class AbstractSubscription implements ISubscription { + protected readonly id: string; + + protected readonly userClient: UserClient; + + protected readonly debounce: any; + + protected readonly channel: string; + + protected readonly type: SubscriptionType; + + protected messages: IMessage[] = []; + + constructor(userClient: UserClient, dto: SubscriptionDto) { + this.userClient = userClient; + this.channel = dto.channel; + this.type = dto.type; + this.id = `${this.type}:${this.channel}`; + this.debounce = debounce(() => { + if (this.messages.length) { + this.userClient.getSocket() + .emit(this.id, { + messages: this.messages.slice(0, MESSAGES_MAX), + count: this.messages.length, + } as MessagesResponse); + this.messages = []; + } + }, EMIT_WAIT, { + maxWait: EMIT_MAX_WAIT, + }); + } + + getId() { + return this.id; + } + + getChannel() { + return this.channel; + } + + getType() { + return this.type; + } + + abstract subscribe(client: IORedis.Redis | IORedis.Cluster): Promise; + + abstract unsubscribe(client: IORedis.Redis | IORedis.Cluster): Promise; + + pushMessage(message: IMessage) { + this.messages.push(message); + + this.debounce(); + } + + toString() { + return `${this.constructor.name}:${JSON.stringify({ + id: this.id, + mL: this.messages.length, + })}`; + } +} diff --git a/redisinsight/api/src/modules/pub-sub/model/pattern.subscription.ts b/redisinsight/api/src/modules/pub-sub/model/pattern.subscription.ts new file mode 100644 index 0000000000..23a381dd44 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/model/pattern.subscription.ts @@ -0,0 +1,12 @@ +import { AbstractSubscription } from 'src/modules/pub-sub/model/abstract.subscription'; +import * as IORedis from 'ioredis'; + +export class PatternSubscription extends AbstractSubscription { + async subscribe(client: IORedis.Redis | IORedis.Cluster): Promise { + await client.psubscribe(this.channel); + } + + async unsubscribe(client: IORedis.Redis | IORedis.Cluster): Promise { + await client.punsubscribe(this.channel); + } +} diff --git a/redisinsight/api/src/modules/pub-sub/model/redis-client.spec.ts b/redisinsight/api/src/modules/pub-sub/model/redis-client.spec.ts new file mode 100644 index 0000000000..4c3916e957 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/model/redis-client.spec.ts @@ -0,0 +1,133 @@ +import * as Redis from 'ioredis'; +import { RedisClient } from 'src/modules/pub-sub/model/redis-client'; +import { RedisClientEvents, RedisClientStatus } from 'src/modules/pub-sub/constants'; + +const getRedisClientFn = jest.fn(); + +const nodeClient = Object.create(Redis.prototype); +nodeClient.subscribe = jest.fn(); +nodeClient.psubscribe = jest.fn(); +nodeClient.unsubscribe = jest.fn(); +nodeClient.punsubscribe = jest.fn(); +nodeClient.status = 'ready'; +nodeClient.disconnect = jest.fn(); +nodeClient.quit = jest.fn(); + +describe('RedisClient', () => { + let redisClient: RedisClient; + + beforeEach(() => { + jest.resetAllMocks(); + redisClient = new RedisClient('databaseId', getRedisClientFn); + getRedisClientFn.mockResolvedValue(nodeClient); + nodeClient.subscribe.mockResolvedValue('OK'); + nodeClient.psubscribe.mockResolvedValue('OK'); + }); + + describe('getClient', () => { + let connectSpy; + + beforeEach(() => { + connectSpy = jest.spyOn(redisClient as any, 'connect'); + }); + + it('should connect and return client by default', async () => { + expect(await redisClient.getClient()).toEqual(nodeClient); + expect(connectSpy).toHaveBeenCalledTimes(1); + expect(redisClient['status']).toEqual(RedisClientStatus.Connected); + }); + it('should wait until first attempt of connection finish with success', async () => { + redisClient.getClient().then().catch(); + expect(redisClient['status']).toEqual(RedisClientStatus.Connecting); + expect(await redisClient.getClient()).toEqual(nodeClient); + expect(connectSpy).toHaveBeenCalledTimes(1); + expect(redisClient['status']).toEqual(RedisClientStatus.Connected); + }); + it('should wait until first attempt of connection finish with error', async () => { + try { + getRedisClientFn.mockRejectedValueOnce(new Error('Connection error')); + redisClient.getClient().then().catch(() => {}); + expect(redisClient['status']).toEqual(RedisClientStatus.Connecting); + expect(await redisClient.getClient()).toEqual(nodeClient); + fail(); + } catch (e) { + expect(connectSpy).toHaveBeenCalledTimes(1); + expect(redisClient['status']).toEqual(RedisClientStatus.Error); + } + }); + it('should return existing connection when status connected', async () => { + expect(await redisClient.getClient()).toEqual(nodeClient); + expect(connectSpy).toHaveBeenCalledTimes(1); + expect(redisClient['status']).toEqual(RedisClientStatus.Connected); + expect(await redisClient.getClient()).toEqual(nodeClient); + expect(connectSpy).toHaveBeenCalledTimes(1); + }); + it('should return create new connection when status end or error', async () => { + expect(await redisClient.getClient()).toEqual(nodeClient); + expect(connectSpy).toHaveBeenCalledTimes(1); + expect(redisClient['status']).toEqual(RedisClientStatus.Connected); + redisClient['status'] = RedisClientStatus.Error; + expect(await redisClient.getClient()).toEqual(nodeClient); + expect(connectSpy).toHaveBeenCalledTimes(2); + expect(redisClient['status']).toEqual(RedisClientStatus.Connected); + redisClient['status'] = RedisClientStatus.End; + expect(await redisClient.getClient()).toEqual(nodeClient); + expect(connectSpy).toHaveBeenCalledTimes(3); + expect(redisClient['status']).toEqual(RedisClientStatus.Connected); + }); + }); + + describe('connect', () => { + it('should connect and emit connected event', async () => { + expect(await new Promise((res) => { + redisClient['connect'](); + redisClient.on(RedisClientEvents.Connected, res); + })).toEqual(nodeClient); + }); + it('should emit message event (message source)', async () => { + await redisClient['connect'](); + const [id, message] = await new Promise((res) => { + redisClient.on('message', (i, m) => res([i, m])); + nodeClient.emit('message', 'channel-a', 'message-a'); + }); + + expect(id).toEqual('s:channel-a'); + expect(message.channel).toEqual('channel-a'); + expect(message.message).toEqual('message-a'); + }); + it('should emit message event (pmessage source)', async () => { + await redisClient['connect'](); + const [id, message] = await new Promise((res) => { + redisClient.on('message', (i, m) => res([i, m])); + nodeClient.emit('pmessage', '*', 'channel-aa', 'message-aa'); + }); + expect(id).toEqual('p:*'); + expect(message.channel).toEqual('channel-aa'); + expect(message.message).toEqual('message-aa'); + }); + it('should emit end event', async () => { + await redisClient['connect'](); + await new Promise((res) => { + redisClient.on('end', () => { + res(null); + }); + + nodeClient.emit('end'); + }); + }); + }); + + describe('destroy', () => { + it('should remove all listeners, disconnect, set client to null and emit end event', async () => { + const removeAllListenersSpy = jest.spyOn(nodeClient, 'removeAllListeners'); + + await redisClient['connect'](); + redisClient.destroy(); + + expect(redisClient['client']).toEqual(null); + expect(redisClient['status']).toEqual(RedisClientStatus.End); + expect(removeAllListenersSpy).toHaveBeenCalled(); + expect(nodeClient.quit).toHaveBeenCalled(); + }); + }); +}); diff --git a/redisinsight/api/src/modules/pub-sub/model/redis-client.ts b/redisinsight/api/src/modules/pub-sub/model/redis-client.ts new file mode 100644 index 0000000000..a11be02c0c --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/model/redis-client.ts @@ -0,0 +1,110 @@ +import * as IORedis from 'ioredis'; +import { Logger } from '@nestjs/common'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +import { RedisClientEvents, RedisClientStatus } from 'src/modules/pub-sub/constants'; + +export class RedisClient extends EventEmitter2 { + private logger: Logger = new Logger('RedisClient'); + + private client: IORedis.Redis | IORedis.Cluster; + + private readonly databaseId: string; + + private readonly connectFn: () => Promise; + + private status: RedisClientStatus; + + constructor( + databaseId: string, + connectFn: () => Promise, + ) { + super(); + this.databaseId = databaseId; + this.connectFn = connectFn; + } + + /** + * Get existing client or wait until previous attempt fulfill or initiate new connection attempt + * based on current status + */ + async getClient(): Promise { + try { + this.logger.debug(`Get client ${this}`); + switch (this.status) { + case RedisClientStatus.Connected: + return this.client; + case RedisClientStatus.Connecting: + // wait until connect or error + break; + case RedisClientStatus.Error: + case RedisClientStatus.End: + default: + await this.connect(); + return this.client; + } + + return new Promise((resolve, reject) => { + this.once(RedisClientEvents.Connected, resolve); + this.once(RedisClientEvents.ConnectionError, reject); + }); + } catch (e) { + this.logger.error('Unable to connect to Redis', e); + this.status = RedisClientStatus.Error; + this.emit(RedisClientEvents.ConnectionError, e); + throw e; + } + } + + /** + * Connects to redis and change current status to Connected + * Also emit Connected event after success + * Also subscribe to needed channels + * @private + */ + private async connect() { + this.status = RedisClientStatus.Connecting; + this.client = await this.connectFn(); + this.status = RedisClientStatus.Connected; + this.emit(RedisClientEvents.Connected, this.client); + + this.client.on('message', (channel: string, message: string) => { + this.emit(RedisClientEvents.Message, `s:${channel}`, { + channel, + message, + time: Date.now(), + }); + }); + + this.client.on('pmessage', (pattern: string, channel: string, message: string) => { + this.emit(RedisClientEvents.Message, `p:${pattern}`, { + channel, + message, + time: Date.now(), + }); + }); + + this.client.on('end', () => { + this.status = RedisClientStatus.End; + this.emit(RedisClientEvents.End); + }); + } + + /** + * Unsubscribe all listeners and disconnect + * Remove client and set current state to End + */ + destroy() { + this.client?.removeAllListeners(); + this.client?.quit(); + this.client = null; + this.status = RedisClientStatus.End; + } + + toString() { + return `RedisClient:${JSON.stringify({ + databaseId: this.databaseId, + status: this.status, + clientStatus: this.client?.status, + })}`; + } +} diff --git a/redisinsight/api/src/modules/pub-sub/model/simple.subscription.ts b/redisinsight/api/src/modules/pub-sub/model/simple.subscription.ts new file mode 100644 index 0000000000..12893d2791 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/model/simple.subscription.ts @@ -0,0 +1,12 @@ +import { AbstractSubscription } from 'src/modules/pub-sub/model/abstract.subscription'; +import * as IORedis from 'ioredis'; + +export class SimpleSubscription extends AbstractSubscription { + async subscribe(client: IORedis.Redis | IORedis.Cluster): Promise { + await client.subscribe(this.channel); + } + + async unsubscribe(client: IORedis.Redis | IORedis.Cluster): Promise { + await client.unsubscribe(this.channel); + } +} diff --git a/redisinsight/api/src/modules/pub-sub/model/user-client.ts b/redisinsight/api/src/modules/pub-sub/model/user-client.ts new file mode 100644 index 0000000000..42de3b7c75 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/model/user-client.ts @@ -0,0 +1,27 @@ +import { Socket } from 'socket.io'; + +export class UserClient { + private readonly socket: Socket; + + private readonly id: string; + + private readonly databaseId: string; + + constructor(id: string, socket: Socket, databaseId: string) { + this.id = id; + this.socket = socket; + this.databaseId = databaseId; + } + + getId() { + return this.id; + } + + getDatabaseId() { + return this.databaseId; + } + + getSocket() { + return this.socket; + } +} diff --git a/redisinsight/api/src/modules/pub-sub/model/user-session.spec.ts b/redisinsight/api/src/modules/pub-sub/model/user-session.spec.ts new file mode 100644 index 0000000000..9f2cacbcf1 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/model/user-session.spec.ts @@ -0,0 +1,126 @@ +import * as Redis from 'ioredis'; +import { mockSocket } from 'src/__mocks__'; +import { UserSession } from 'src/modules/pub-sub/model/user-session'; +import { UserClient } from 'src/modules/pub-sub/model/user-client'; +import { RedisClient } from 'src/modules/pub-sub/model/redis-client'; +import { SimpleSubscription } from 'src/modules/pub-sub/model/simple.subscription'; +import { SubscriptionType } from 'src/modules/pub-sub/constants'; +import { PatternSubscription } from 'src/modules/pub-sub/model/pattern.subscription'; + +const getRedisClientFn = jest.fn(); + +const nodeClient = Object.create(Redis.prototype); +nodeClient.subscribe = jest.fn(); +nodeClient.psubscribe = jest.fn(); +nodeClient.unsubscribe = jest.fn(); +nodeClient.punsubscribe = jest.fn(); +nodeClient.status = 'ready'; +nodeClient.disconnect = jest.fn(); +nodeClient.quit = jest.fn(); + +const mockUserClient = new UserClient('socketId', mockSocket, 'databaseId'); + +const mockRedisClient = new RedisClient('databaseId', getRedisClientFn); + +const mockSubscriptionDto = { + channel: 'channel-a', + type: SubscriptionType.Subscribe, +}; + +const mockPSubscriptionDto = { + channel: 'channel-a', + type: SubscriptionType.PSubscribe, +}; + +const mockSubscription = new SimpleSubscription(mockUserClient, mockSubscriptionDto); +const mockPSubscription = new PatternSubscription(mockUserClient, mockPSubscriptionDto); + +const mockMessage = { + channel: 'channel-a', + message: 'message-a', + time: 1234567890, +}; + +describe('UserSession', () => { + let userSession: UserSession; + + beforeEach(() => { + jest.resetAllMocks(); + userSession = new UserSession(mockUserClient, mockRedisClient); + getRedisClientFn.mockResolvedValue(nodeClient); + nodeClient.subscribe.mockResolvedValue('OK'); + nodeClient.psubscribe.mockResolvedValue('OK'); + }); + + describe('subscribe', () => { + it('should subscribe to a channel', async () => { + expect(userSession['subscriptions'].size).toEqual(0); + await userSession.subscribe(mockSubscription); + expect(userSession['subscriptions'].size).toEqual(1); + await userSession.subscribe(mockSubscription); + expect(userSession['subscriptions'].size).toEqual(1); + expect(userSession['subscriptions'].get(mockSubscription.getId())).toEqual(mockSubscription); + await userSession.subscribe(mockPSubscription); + expect(userSession['subscriptions'].size).toEqual(2); + await userSession.subscribe(mockPSubscription); + expect(userSession['subscriptions'].size).toEqual(2); + expect(userSession['subscriptions'].get(mockPSubscription.getId())).toEqual(mockPSubscription); + }); + }); + + describe('unsubscribe', () => { + it('should unsubscribe from a channel', async () => { + expect(userSession['subscriptions'].size).toEqual(0); + await userSession.subscribe(mockSubscription); + expect(userSession['subscriptions'].size).toEqual(1); + await userSession.subscribe(mockPSubscription); + expect(userSession['subscriptions'].size).toEqual(2); + await userSession.unsubscribe(mockSubscription); + expect(userSession['subscriptions'].size).toEqual(1); + await userSession.unsubscribe(mockSubscription); + expect(userSession['subscriptions'].size).toEqual(1); + await userSession.unsubscribe(mockPSubscription); + expect(userSession['subscriptions'].size).toEqual(0); + await userSession.unsubscribe(mockPSubscription); + expect(userSession['subscriptions'].size).toEqual(0); + }); + }); + + describe('handleMessage', () => { + let handleSimpleSpy; + let handlePatternSpy; + + beforeEach(async () => { + handleSimpleSpy = jest.spyOn(mockSubscription, 'pushMessage'); + handlePatternSpy = jest.spyOn(mockPSubscription, 'pushMessage'); + await userSession.subscribe(mockSubscription); + await userSession.subscribe(mockPSubscription); + }); + it('should handle message by particular subscription', async () => { + userSession.handleMessage('id', mockMessage); + expect(handleSimpleSpy).toHaveBeenCalledTimes(0); + expect(handlePatternSpy).toHaveBeenCalledTimes(0); + userSession.handleMessage(mockSubscription.getId(), mockMessage); + expect(handleSimpleSpy).toHaveBeenCalledTimes(1); + expect(handlePatternSpy).toHaveBeenCalledTimes(0); + userSession.handleMessage(mockPSubscription.getId(), mockMessage); + userSession.handleMessage(mockPSubscription.getId(), mockMessage); + expect(handleSimpleSpy).toHaveBeenCalledTimes(1); + expect(handlePatternSpy).toHaveBeenCalledTimes(2); + // wait until debounce process + await new Promise((res) => setTimeout(res, 200)); + }); + }); + + describe('handleDisconnect', () => { + beforeEach(async () => { + await userSession.subscribe(mockSubscription); + await userSession.subscribe(mockPSubscription); + }); + it('should handle message by particular subscription', async () => { + userSession.handleDisconnect(); + expect(userSession['subscriptions'].size).toEqual(0); + expect(nodeClient.quit).toHaveBeenCalled(); + }); + }); +}); diff --git a/redisinsight/api/src/modules/pub-sub/model/user-session.ts b/redisinsight/api/src/modules/pub-sub/model/user-session.ts new file mode 100644 index 0000000000..93de7c8599 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/model/user-session.ts @@ -0,0 +1,127 @@ +import { RedisClient } from 'src/modules/pub-sub/model/redis-client'; +import { UserClient } from 'src/modules/pub-sub/model/user-client'; +import { ISubscription } from 'src/modules/pub-sub/interfaces/subscription.interface'; +import { IMessage } from 'src/modules/pub-sub/interfaces/message.interface'; +import { PubSubServerEvents, RedisClientEvents } from 'src/modules/pub-sub/constants'; +import { Logger } from '@nestjs/common'; +import ERROR_MESSAGES from 'src/constants/error-messages'; +import { PubSubWsException } from 'src/modules/pub-sub/errors/pub-sub-ws.exception'; + +export class UserSession { + private readonly logger: Logger = new Logger('UserSession'); + + private readonly id: string; + + private readonly userClient: UserClient; + + private readonly redisClient: RedisClient; + + private subscriptions: Map = new Map(); + + constructor(userClient: UserClient, redisClient: RedisClient) { + this.id = userClient.getId(); + this.userClient = userClient; + this.redisClient = redisClient; + redisClient.on(RedisClientEvents.Message, this.handleMessage.bind(this)); + redisClient.on(RedisClientEvents.End, this.handleDisconnect.bind(this)); + } + + getId() { return this.id; } + + getUserClient() { return this.userClient; } + + getRedisClient() { return this.redisClient; } + + /** + * Subscribe to a Pub/Sub channel and create Redis client connection if needed + * Also add subscription to the subscriptions list + * @param subscription + */ + async subscribe(subscription: ISubscription) { + this.logger.debug(`Subscribe ${subscription} ${this}. Getting Redis client...`); + + const client = await this.redisClient?.getClient(); + + if (!client) { throw new Error('There is no Redis client initialized'); } + + if (!this.subscriptions.has(subscription.getId())) { + this.subscriptions.set(subscription.getId(), subscription); + this.logger.debug(`Subscribe to Redis ${subscription} ${this}`); + await subscription.subscribe(client); + } + } + + /** + * Unsubscribe from a channel and remove from the list of subscriptions + * Also destroy redis client when no subscriptions left + * @param subscription + */ + async unsubscribe(subscription: ISubscription) { + this.logger.debug(`Unsubscribe ${subscription} ${this}`); + + this.subscriptions.delete(subscription.getId()); + + const client = await this.redisClient?.getClient(); + + if (client) { + this.logger.debug(`Unsubscribe from Redis ${subscription} ${this}`); + await subscription.unsubscribe(client); + + if (!this.subscriptions.size) { + this.logger.debug(`Unsubscribe: Destroy RedisClient ${this}`); + this.redisClient.destroy(); + } + } + } + + /** + * Redirect message to a proper subscription from the list using id + * ID is generated in this way: "p:channelName" where "p" - is a type of subscription + * Subscription types: s - "subscribe", p - "psubscribe", ss - "ssubscribe" + * @param id + * @param message + */ + handleMessage(id: string, message: IMessage) { + const subscription = this.subscriptions.get(id); + + if (subscription) { + subscription.pushMessage(message); + } + } + + /** + * Handle socket disconnection + * In this case we need to destroy entire session and cascade destroy other models inside + * to be sure that there is no open connections left + */ + handleDisconnect() { + this.logger.debug(`Handle disconnect ${this}`); + + this.userClient.getSocket().emit( + PubSubServerEvents.Exception, + new PubSubWsException(ERROR_MESSAGES.NO_CONNECTION_TO_REDIS_DB), + ); + + this.destroy(); + } + + /** + * Reset subscriptions map and call and destroy Redis client + */ + destroy() { + this.logger.debug(`Destroy ${this}`); + + this.subscriptions = new Map(); + this.redisClient.destroy(); + + this.logger.debug(`Destroyed ${this}`); + } + + toString() { + return `UserSession:${JSON.stringify({ + id: this.id, + subscriptionsSize: this.subscriptions.size, + subscriptions: [...this.subscriptions.keys()], + })}`; + } +} diff --git a/redisinsight/api/src/modules/pub-sub/providers/redis-client.provider.spec.ts b/redisinsight/api/src/modules/pub-sub/providers/redis-client.provider.spec.ts new file mode 100644 index 0000000000..cc2c6f2762 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/providers/redis-client.provider.spec.ts @@ -0,0 +1,40 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { mockSocket } from 'src/__mocks__'; +import { UserClient } from 'src/modules/pub-sub/model/user-client'; +import { RedisClientProvider } from 'src/modules/pub-sub/providers/redis-client.provider'; +import { RedisService } from 'src/modules/core/services/redis/redis.service'; +import { InstancesBusinessService } from 'src/modules/shared/services/instances-business/instances-business.service'; +import { RedisClient } from 'src/modules/pub-sub/model/redis-client'; + +const mockUserClient = new UserClient('socketId', mockSocket, 'databaseId'); + +describe('RedisClientProvider', () => { + let service: RedisClientProvider; + + beforeEach(async () => { + jest.resetAllMocks(); + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + RedisClientProvider, + { + provide: RedisService, + useFactory: () => ({}), + }, + { + provide: InstancesBusinessService, + useFactory: () => ({}), + }, + ], + }).compile(); + + service = await module.get(RedisClientProvider); + }); + + describe('createClient', () => { + it('should create redis client', async () => { + const redisClient = service.createClient(mockUserClient.getId()); + expect(redisClient).toBeInstanceOf(RedisClient); + }); + }); +}); diff --git a/redisinsight/api/src/modules/pub-sub/providers/redis-client.provider.ts b/redisinsight/api/src/modules/pub-sub/providers/redis-client.provider.ts new file mode 100644 index 0000000000..5de688a4b0 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/providers/redis-client.provider.ts @@ -0,0 +1,34 @@ +import { Injectable, ServiceUnavailableException } from '@nestjs/common'; +import { RedisService } from 'src/modules/core/services/redis/redis.service'; +import { InstancesBusinessService } from 'src/modules/shared/services/instances-business/instances-business.service'; +import { AppTool } from 'src/models'; +import { withTimeout } from 'src/utils/promise-with-timeout'; +import ERROR_MESSAGES from 'src/constants/error-messages'; +import config from 'src/utils/config'; +import { RedisClient } from 'src/modules/pub-sub/model/redis-client'; + +const serverConfig = config.get('server'); + +@Injectable() +export class RedisClientProvider { + constructor( + private redisService: RedisService, + private instancesBusinessService: InstancesBusinessService, + ) {} + + createClient(databaseId: string): RedisClient { + return new RedisClient(databaseId, this.getConnectFn(databaseId)); + } + + private getConnectFn(databaseId: string) { + return () => withTimeout( + this.instancesBusinessService.connectToInstance( + databaseId, + AppTool.Common, + false, + ), + serverConfig.requestTimeout, + new ServiceUnavailableException(ERROR_MESSAGES.NO_CONNECTION_TO_REDIS_DB), + ); + } +} diff --git a/redisinsight/api/src/modules/pub-sub/providers/subscription.provider.spec.ts b/redisinsight/api/src/modules/pub-sub/providers/subscription.provider.spec.ts new file mode 100644 index 0000000000..bd0a8d7012 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/providers/subscription.provider.spec.ts @@ -0,0 +1,64 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { + mockSocket, +} from 'src/__mocks__'; +import { UserClient } from 'src/modules/pub-sub/model/user-client'; +import { SubscriptionProvider } from 'src/modules/pub-sub/providers/subscription.provider'; +import { SubscriptionType } from 'src/modules/pub-sub/constants'; +import { SimpleSubscription } from 'src/modules/pub-sub/model/simple.subscription'; +import { PatternSubscription } from 'src/modules/pub-sub/model/pattern.subscription'; +import { BadRequestException } from '@nestjs/common'; + +const mockUserClient = new UserClient('socketId', mockSocket, 'databaseId'); + +const mockSubscriptionDto = { + channel: 'channel-a', + type: SubscriptionType.Subscribe, +}; + +const mockPSubscriptionDto = { + channel: 'channel-a', + type: SubscriptionType.PSubscribe, +}; + +const mockSSubscriptionDto = { + channel: 'channel-a', + type: SubscriptionType.SSubscribe, +}; + +describe('SubscriptionProvider', () => { + let service: SubscriptionProvider; + + beforeEach(async () => { + jest.resetAllMocks(); + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + SubscriptionProvider, + + ], + }).compile(); + + service = await module.get(SubscriptionProvider); + }); + + describe('createSubscription', () => { + it('should create simple subscription', async () => { + const subscription = service.createSubscription(mockUserClient, mockSubscriptionDto); + expect(subscription).toBeInstanceOf(SimpleSubscription); + }); + it('should create pattern subscription', async () => { + const subscription = service.createSubscription(mockUserClient, mockPSubscriptionDto); + expect(subscription).toBeInstanceOf(PatternSubscription); + }); + it('should throw error since shard subscription is not supported yet', async () => { + try { + service.createSubscription(mockUserClient, mockSSubscriptionDto); + fail(); + } catch (e) { + expect(e).toBeInstanceOf(BadRequestException); + expect(e.message).toEqual('Unsupported Subscription type'); + } + }); + }); +}); diff --git a/redisinsight/api/src/modules/pub-sub/providers/subscription.provider.ts b/redisinsight/api/src/modules/pub-sub/providers/subscription.provider.ts new file mode 100644 index 0000000000..b724c1aafb --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/providers/subscription.provider.ts @@ -0,0 +1,22 @@ +import { BadRequestException, Injectable } from '@nestjs/common'; +import { SubscriptionDto } from 'src/modules/pub-sub/dto'; +import { SubscriptionType } from 'src/modules/pub-sub/constants'; +import { UserClient } from 'src/modules/pub-sub/model/user-client'; +import { PatternSubscription } from 'src/modules/pub-sub/model/pattern.subscription'; +import { SimpleSubscription } from 'src/modules/pub-sub/model/simple.subscription'; +import { ISubscription } from 'src/modules/pub-sub/interfaces/subscription.interface'; + +@Injectable() +export class SubscriptionProvider { + createSubscription(userClient: UserClient, dto: SubscriptionDto): ISubscription { + switch (dto.type) { + case SubscriptionType.PSubscribe: + return new PatternSubscription(userClient, dto); + case SubscriptionType.Subscribe: + return new SimpleSubscription(userClient, dto); + case SubscriptionType.SSubscribe: + default: + throw new BadRequestException('Unsupported Subscription type'); + } + } +} diff --git a/redisinsight/api/src/modules/pub-sub/providers/user-session.provider.spec.ts b/redisinsight/api/src/modules/pub-sub/providers/user-session.provider.spec.ts new file mode 100644 index 0000000000..b6b64eb77b --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/providers/user-session.provider.spec.ts @@ -0,0 +1,67 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { + mockSocket, + MockType, +} from 'src/__mocks__'; +import { UserSessionProvider } from 'src/modules/pub-sub/providers/user-session.provider'; +import { RedisClientProvider } from 'src/modules/pub-sub/providers/redis-client.provider'; +import { UserClient } from 'src/modules/pub-sub/model/user-client'; +import { RedisClient } from 'src/modules/pub-sub/model/redis-client'; + +const mockUserClient = new UserClient('socketId', mockSocket, 'databaseId'); +const mockUserClient2 = new UserClient('socketId2', mockSocket, 'databaseId'); +const getRedisClientFn = jest.fn(); +const mockRedisClient = new RedisClient('databaseId', getRedisClientFn); + +describe('UserSessionProvider', () => { + let service: UserSessionProvider; + let redisClientProvider: MockType; + + beforeEach(async () => { + jest.resetAllMocks(); + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + UserSessionProvider, + { + provide: RedisClientProvider, + useFactory: () => ({ + createClient: jest.fn(), + }), + }, + + ], + }).compile(); + + service = await module.get(UserSessionProvider); + redisClientProvider = await module.get(RedisClientProvider); + + redisClientProvider.createClient.mockReturnValue(mockRedisClient); + }); + + describe('getOrCreateUserSession', () => { + it('should create new UserSession and store it. Ignore the same session', async () => { + expect(service['sessions'].size).toEqual(0); + const userSession = await service.getOrCreateUserSession(mockUserClient); + expect(service['sessions'].size).toEqual(1); + expect(service.getUserSession(userSession.getId())).toEqual(userSession); + await service.getOrCreateUserSession(mockUserClient); + expect(service['sessions'].size).toEqual(1); + expect(service.getUserSession(userSession.getId())).toEqual(userSession); + }); + }); + describe('removeUserSession', () => { + it('should remove UserSession', async () => { + expect(service['sessions'].size).toEqual(0); + await service.getOrCreateUserSession(mockUserClient); + await service.getOrCreateUserSession(mockUserClient2); + expect(service['sessions'].size).toEqual(2); + await service.removeUserSession(mockUserClient.getId()); + expect(service['sessions'].size).toEqual(1); + await service.removeUserSession(mockUserClient.getId()); + expect(service['sessions'].size).toEqual(1); + await service.removeUserSession(mockUserClient2.getId()); + expect(service['sessions'].size).toEqual(0); + }); + }); +}); diff --git a/redisinsight/api/src/modules/pub-sub/providers/user-session.provider.ts b/redisinsight/api/src/modules/pub-sub/providers/user-session.provider.ts new file mode 100644 index 0000000000..8abb03ad76 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/providers/user-session.provider.ts @@ -0,0 +1,49 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { UserSession } from 'src/modules/pub-sub/model/user-session'; +import { UserClient } from 'src/modules/pub-sub/model/user-client'; +import { RedisClientProvider } from 'src/modules/pub-sub/providers/redis-client.provider'; + +@Injectable() +export class UserSessionProvider { + private readonly logger: Logger = new Logger('UserSessionProvider'); + + private sessions: Map = new Map(); + + constructor(private readonly redisClientProvider: RedisClientProvider) {} + + getOrCreateUserSession(userClient: UserClient) { + let session = this.getUserSession(userClient.getId()); + + if (!session) { + session = new UserSession( + userClient, + this.redisClientProvider.createClient(userClient.getDatabaseId()), + ); + this.sessions.set(session.getId(), session); + this.logger.debug(`New session was added ${this}`); + } + + return session; + } + + getUserSession(id: string): UserSession { + return this.sessions.get(id); + } + + removeUserSession(id: string) { + this.logger.debug(`Removing user session ${id}`); + + this.sessions.delete(id); + + this.logger.debug(`User session was removed ${this}`); + } + + toString() { + return `UserSessionProvider:${ + JSON.stringify({ + sessionsSize: this.sessions.size, + sessions: [...this.sessions.keys()], + }) + }`; + } +} diff --git a/redisinsight/api/src/modules/pub-sub/pub-sub.controller.ts b/redisinsight/api/src/modules/pub-sub/pub-sub.controller.ts new file mode 100644 index 0000000000..8ab37fda43 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/pub-sub.controller.ts @@ -0,0 +1,39 @@ +import { + Body, + Controller, Param, Post, UsePipes, ValidationPipe +} from '@nestjs/common'; +import { ApiTags } from '@nestjs/swagger'; +import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-operation.decorator'; +import { PubSubService } from 'src/modules/pub-sub/pub-sub.service'; +import { AppTool } from 'src/models'; +import { PublishDto } from 'src/modules/pub-sub/dto/publish.dto'; +import { PublishResponse } from 'src/modules/pub-sub/dto/publish.response'; + +@ApiTags('Pub/Sub') +@Controller('pub-sub') +@UsePipes(new ValidationPipe()) +export class PubSubController { + constructor(private service: PubSubService) {} + + @Post('messages') + @ApiRedisInstanceOperation({ + description: 'Publish message to a channel', + statusCode: 201, + responses: [ + { + status: 201, + description: 'Returns number of clients message ws delivered', + type: PublishResponse, + }, + ], + }) + async publish( + @Param('dbInstance') instanceId: string, + @Body() dto: PublishDto, + ): Promise { + return this.service.publish({ + instanceId, + tool: AppTool.Common, + }, dto); + } +} diff --git a/redisinsight/api/src/modules/pub-sub/pub-sub.gateway.ts b/redisinsight/api/src/modules/pub-sub/pub-sub.gateway.ts new file mode 100644 index 0000000000..09511ea581 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/pub-sub.gateway.ts @@ -0,0 +1,52 @@ +import { Socket, Server } from 'socket.io'; +import { + OnGatewayConnection, + OnGatewayDisconnect, + SubscribeMessage, + WebSocketGateway, + WebSocketServer, +} from '@nestjs/websockets'; +import { + Body, Logger, UseFilters, UsePipes, ValidationPipe, +} from '@nestjs/common'; +import config from 'src/utils/config'; +import { PubSubService } from 'src/modules/pub-sub/pub-sub.service'; +import { Client } from 'src/modules/pub-sub/decorators/client.decorator'; +import { UserClient } from 'src/modules/pub-sub/model/user-client'; +import { SubscribeDto } from 'src/modules/pub-sub/dto'; +import { AckWsExceptionFilter } from 'src/modules/pub-sub/filters/ack-ws-exception.filter'; +import { PubSubClientEvents } from './constants'; + +const SOCKETS_CONFIG = config.get('sockets'); + +@UsePipes(new ValidationPipe()) +@UseFilters(AckWsExceptionFilter) +@WebSocketGateway({ namespace: 'pub-sub', cors: SOCKETS_CONFIG.cors, serveClient: SOCKETS_CONFIG.serveClient }) +export class PubSubGateway implements OnGatewayConnection, OnGatewayDisconnect { + @WebSocketServer() wss: Server; + + private logger: Logger = new Logger('PubSubGateway'); + + constructor(private service: PubSubService) {} + + @SubscribeMessage(PubSubClientEvents.Subscribe) + async subscribe(@Client() client: UserClient, @Body() dto: SubscribeDto): Promise { + await this.service.subscribe(client, dto); + return { status: 'ok' }; + } + + @SubscribeMessage(PubSubClientEvents.Unsubscribe) + async unsubscribe(@Client() client: UserClient, @Body() dto: SubscribeDto): Promise { + await this.service.unsubscribe(client, dto); + return { status: 'ok' }; + } + + async handleConnection(client: Socket): Promise { + this.logger.log(`Client connected: ${client.id}`); + } + + async handleDisconnect(client: Socket): Promise { + await this.service.handleDisconnect(client.id); + this.logger.log(`Client disconnected: ${client.id}`); + } +} diff --git a/redisinsight/api/src/modules/pub-sub/pub-sub.module.ts b/redisinsight/api/src/modules/pub-sub/pub-sub.module.ts new file mode 100644 index 0000000000..5822b3c33c --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/pub-sub.module.ts @@ -0,0 +1,21 @@ +import { Module } from '@nestjs/common'; +import { SharedModule } from 'src/modules/shared/shared.module'; +import { PubSubGateway } from 'src/modules/pub-sub/pub-sub.gateway'; +import { PubSubService } from 'src/modules/pub-sub/pub-sub.service'; +import { UserSessionProvider } from 'src/modules/pub-sub/providers/user-session.provider'; +import { SubscriptionProvider } from 'src/modules/pub-sub/providers/subscription.provider'; +import { RedisClientProvider } from 'src/modules/pub-sub/providers/redis-client.provider'; +import { PubSubController } from 'src/modules/pub-sub/pub-sub.controller'; + +@Module({ + imports: [SharedModule], + providers: [ + PubSubGateway, + PubSubService, + UserSessionProvider, + SubscriptionProvider, + RedisClientProvider, + ], + controllers: [PubSubController], +}) +export class PubSubModule {} diff --git a/redisinsight/api/src/modules/pub-sub/pub-sub.service.spec.ts b/redisinsight/api/src/modules/pub-sub/pub-sub.service.spec.ts new file mode 100644 index 0000000000..b22017ebc8 --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/pub-sub.service.spec.ts @@ -0,0 +1,224 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import * as Redis from 'ioredis'; +import { + mockLogFile, mockRedisShardObserver, mockSocket, mockStandaloneDatabaseEntity, + MockType +} from 'src/__mocks__'; +import { InstancesBusinessService } from 'src/modules/shared/services/instances-business/instances-business.service'; +import { RedisObserverProvider } from 'src/modules/profiler/providers/redis-observer.provider'; +import { IFindRedisClientInstanceByOptions, RedisService } from 'src/modules/core/services/redis/redis.service'; +import { mockRedisClientInstance } from 'src/modules/shared/services/base/redis-consumer.abstract.service.spec'; +import { RedisObserverStatus } from 'src/modules/profiler/constants'; +import { PubSubService } from 'src/modules/pub-sub/pub-sub.service'; +import { UserSessionProvider } from 'src/modules/pub-sub/providers/user-session.provider'; +import { SubscriptionProvider } from 'src/modules/pub-sub/providers/subscription.provider'; +import { UserClient } from 'src/modules/pub-sub/model/user-client'; +import { SubscriptionType } from 'src/modules/pub-sub/constants'; +import { RedisClientProvider } from 'src/modules/pub-sub/providers/redis-client.provider'; +import { UserSession } from 'src/modules/pub-sub/model/user-session'; +import { RedisClient } from 'src/modules/pub-sub/model/redis-client'; +import { ForbiddenException, NotFoundException } from '@nestjs/common'; + +const nodeClient = Object.create(Redis.prototype); +nodeClient.subscribe = jest.fn(); +nodeClient.psubscribe = jest.fn(); +nodeClient.unsubscribe = jest.fn(); +nodeClient.punsubscribe = jest.fn(); +nodeClient.status = 'ready'; +nodeClient.disconnect = jest.fn(); +nodeClient.publish = jest.fn(); + +const mockUserClient = new UserClient('socketId', mockSocket, 'databaseId'); + +const mockSubscriptionDto = { + channel: 'channel-a', + type: SubscriptionType.Subscribe, +}; + +const mockPSubscriptionDto = { + channel: 'channel-a', + type: SubscriptionType.PSubscribe, +}; + +const getRedisClientFn = jest.fn(); +const mockRedisClient = new RedisClient('databaseId', getRedisClientFn); +const mockUserSession = new UserSession(mockUserClient, mockRedisClient); + +const mockSubscribe = jest.fn(); +const mockUnsubscribe = jest.fn(); +mockUserSession['subscribe'] = mockSubscribe; +mockUserSession['unsubscribe'] = mockUnsubscribe; +mockUserSession['destroy'] = jest.fn(); + +const mockClientOptions: IFindRedisClientInstanceByOptions = { + instanceId: mockStandaloneDatabaseEntity.id, +}; + +const mockPublishDto = { + message: 'message-a', + channel: 'channel-a', +}; + +describe('PubSubService', () => { + let service: PubSubService; + let sessionProvider: MockType; + let redisService: MockType; + let databaseService: MockType; + + beforeEach(async () => { + jest.resetAllMocks(); + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + PubSubService, + UserSessionProvider, + SubscriptionProvider, + { + provide: UserSessionProvider, + useFactory: () => ({ + getOrCreateUserSession: jest.fn(), + getUserSession: jest.fn(), + removeUserSession: jest.fn(), + }), + }, + { + provide: RedisService, + useFactory: () => ({ + getClientInstance: jest.fn(), + isClientConnected: jest.fn(), + }), + }, + { + provide: InstancesBusinessService, + useFactory: () => ({ + connectToInstance: jest.fn(), + getOneById: jest.fn(), + }), + }, + ], + }).compile(); + + service = await module.get(PubSubService); + redisService = await module.get(RedisService); + databaseService = await module.get(InstancesBusinessService); + sessionProvider = await module.get(UserSessionProvider); + + getRedisClientFn.mockResolvedValue(nodeClient); + sessionProvider.getOrCreateUserSession.mockReturnValue(mockUserSession); + sessionProvider.getUserSession.mockReturnValue(mockUserSession); + sessionProvider.removeUserSession.mockReturnValue(undefined); + mockSubscribe.mockResolvedValue('OK'); + mockUnsubscribe.mockResolvedValue('OK'); + redisService.getClientInstance.mockReturnValue({ ...mockRedisClientInstance, client: nodeClient }); + redisService.isClientConnected.mockReturnValue(true); + databaseService.connectToInstance.mockResolvedValue(nodeClient); + nodeClient.publish.mockResolvedValue(2); + }); + + describe('subscribe', () => { + it('should subscribe to a single channel', async () => { + await service.subscribe(mockUserClient, { subscriptions: [mockSubscriptionDto] }); + expect(mockUserSession.subscribe).toHaveBeenCalledTimes(1); + }); + it('should subscribe to a multiple channels', async () => { + await service.subscribe(mockUserClient, { subscriptions: [mockSubscriptionDto, mockPSubscriptionDto] }); + expect(mockUserSession.subscribe).toHaveBeenCalledTimes(2); + }); + it('should handle HTTP error', async () => { + try { + mockSubscribe.mockRejectedValueOnce(new NotFoundException('Not Found')); + await service.subscribe(mockUserClient, { subscriptions: [mockSubscriptionDto] }); + fail(); + } catch (e) { + expect(e).toBeInstanceOf(NotFoundException); + } + }); + it('should handle acl error', async () => { + try { + mockSubscribe.mockRejectedValueOnce(new Error('NOPERM')); + await service.subscribe(mockUserClient, { subscriptions: [mockSubscriptionDto] }); + fail(); + } catch (e) { + expect(e).toBeInstanceOf(ForbiddenException); + } + }); + }); + + describe('unsubscribe', () => { + it('should unsubscribe from a single channel', async () => { + await service.unsubscribe(mockUserClient, { subscriptions: [mockSubscriptionDto] }); + expect(mockUserSession.unsubscribe).toHaveBeenCalledTimes(1); + }); + it('should unsubscribe from multiple channels', async () => { + await service.unsubscribe(mockUserClient, { subscriptions: [mockSubscriptionDto, mockPSubscriptionDto] }); + expect(mockUserSession.unsubscribe).toHaveBeenCalledTimes(2); + }); + it('should handle HTTP error', async () => { + try { + mockUnsubscribe.mockRejectedValueOnce(new NotFoundException('Not Found')); + await service.unsubscribe(mockUserClient, { subscriptions: [mockSubscriptionDto] }); + fail(); + } catch (e) { + expect(e).toBeInstanceOf(NotFoundException); + } + }); + it('should handle acl error', async () => { + try { + mockUnsubscribe.mockRejectedValueOnce(new Error('NOPERM')); + await service.unsubscribe(mockUserClient, { subscriptions: [mockSubscriptionDto] }); + fail(); + } catch (e) { + expect(e).toBeInstanceOf(ForbiddenException); + } + }); + }); + + describe('publish', () => { + it('should publish using existing client', async () => { + const res = await service.publish(mockClientOptions, mockPublishDto); + expect(res).toEqual({ affected: 2 }); + }); + it('should publish using new client', async () => { + redisService.isClientConnected.mockReturnValueOnce(false); + const res = await service.publish(mockClientOptions, mockPublishDto); + expect(res).toEqual({ affected: 2 }); + }); + it('should handle HTTP error', async () => { + try { + redisService.getClientInstance.mockImplementation(() => { + throw new NotFoundException('Not Found'); + }); + + await service.publish(mockClientOptions, mockPublishDto); + fail(); + } catch (e) { + expect(e).toBeInstanceOf(NotFoundException); + } + }); + it('should handle acl error', async () => { + try { + redisService.getClientInstance.mockImplementation(() => { + throw new Error('NOPERM'); + }); + + await service.publish(mockClientOptions, mockPublishDto); + fail(); + } catch (e) { + expect(e).toBeInstanceOf(ForbiddenException); + } + }); + }); + + describe('handleDisconnect', () => { + it('should not do anything if no sessions', async () => { + sessionProvider.getUserSession.mockReturnValueOnce(undefined); + await service.handleDisconnect(mockUserClient.getId()); + expect(sessionProvider.removeUserSession).toHaveBeenCalledTimes(0); + }); + it('should call session.destroy and remove session', async () => { + await service.handleDisconnect(mockUserClient.getId()); + expect(sessionProvider.removeUserSession).toHaveBeenCalledTimes(1); + expect(mockUserSession.destroy).toHaveBeenCalledTimes(1); + }); + }); +}); diff --git a/redisinsight/api/src/modules/pub-sub/pub-sub.service.ts b/redisinsight/api/src/modules/pub-sub/pub-sub.service.ts new file mode 100644 index 0000000000..1c5724a92e --- /dev/null +++ b/redisinsight/api/src/modules/pub-sub/pub-sub.service.ts @@ -0,0 +1,135 @@ +import { HttpException, Injectable, Logger } from '@nestjs/common'; +import { UserSessionProvider } from 'src/modules/pub-sub/providers/user-session.provider'; +import { UserClient } from 'src/modules/pub-sub/model/user-client'; +import { SubscribeDto } from 'src/modules/pub-sub/dto'; +import { SubscriptionProvider } from 'src/modules/pub-sub/providers/subscription.provider'; +import { IFindRedisClientInstanceByOptions, RedisService } from 'src/modules/core/services/redis/redis.service'; +import { PublishResponse } from 'src/modules/pub-sub/dto/publish.response'; +import { PublishDto } from 'src/modules/pub-sub/dto/publish.dto'; +import { InstancesBusinessService } from 'src/modules/shared/services/instances-business/instances-business.service'; +import { catchAclError } from 'src/utils'; + +@Injectable() +export class PubSubService { + private logger: Logger = new Logger('PubSubService'); + + constructor( + private readonly sessionProvider: UserSessionProvider, + private readonly subscriptionProvider: SubscriptionProvider, + private redisService: RedisService, + private instancesBusinessService: InstancesBusinessService, + ) {} + + /** + * Subscribe to multiple channels + * @param userClient + * @param dto + */ + async subscribe(userClient: UserClient, dto: SubscribeDto) { + try { + this.logger.log('Subscribing to channels(s)'); + + const session = await this.sessionProvider.getOrCreateUserSession(userClient); + await Promise.all(dto.subscriptions.map((subDto) => session.subscribe( + this.subscriptionProvider.createSubscription(userClient, subDto), + ))); + } catch (e) { + this.logger.error('Unable to create subscriptions', e); + + if (e instanceof HttpException) { + throw e; + } + + throw catchAclError(e); + } + } + + /** + * Unsubscribe from multiple channels + * @param userClient + * @param dto + */ + async unsubscribe(userClient: UserClient, dto: SubscribeDto) { + try { + this.logger.log('Unsubscribing from channels(s)'); + + const session = await this.sessionProvider.getOrCreateUserSession(userClient); + await Promise.all(dto.subscriptions.map((subDto) => session.unsubscribe( + this.subscriptionProvider.createSubscription(userClient, subDto), + ))); + } catch (e) { + this.logger.error('Unable to unsubscribe', e); + + if (e instanceof HttpException) { + throw e; + } + + throw catchAclError(e); + } + } + + /** + * Publish a message to a particular channel + * @param clientOptions + * @param dto + */ + async publish( + clientOptions: IFindRedisClientInstanceByOptions, + dto: PublishDto, + ): Promise { + try { + this.logger.log('Publishing message.'); + + const client = await this.getClient(clientOptions); + + return { + affected: await client.publish(dto.channel, dto.message), + }; + } catch (e) { + this.logger.error('Unable to publish a message', e); + + if (e instanceof HttpException) { + throw e; + } + + throw catchAclError(e); + } + } + + /** + * Get or create redis "common" client + * + * @param clientOptions + * @private + */ + private async getClient(clientOptions: IFindRedisClientInstanceByOptions) { + const { tool, instanceId } = clientOptions; + + const commonClient = this.redisService.getClientInstance({ instanceId, tool })?.client; + + if (commonClient && this.redisService.isClientConnected(commonClient)) { + return commonClient; + } + + return this.instancesBusinessService.connectToInstance( + clientOptions.instanceId, + clientOptions.tool, + true, + ); + } + + /** + * Handle Socket disconnection event + * Basically destroy the UserSession to remove Redis connection + * @param id + */ + async handleDisconnect(id: string) { + this.logger.log(`Handle disconnect event: ${id}`); + const session = this.sessionProvider.getUserSession(id); + + if (session) { + session.destroy(); + this.sessionProvider.removeUserSession(id); + } + } +} diff --git a/redisinsight/api/src/modules/workbench/utils/getUnsupportedCommands.spec.ts b/redisinsight/api/src/modules/workbench/utils/getUnsupportedCommands.spec.ts index 8839fb1ea3..69c2a2d35e 100644 --- a/redisinsight/api/src/modules/workbench/utils/getUnsupportedCommands.spec.ts +++ b/redisinsight/api/src/modules/workbench/utils/getUnsupportedCommands.spec.ts @@ -2,7 +2,7 @@ import { getUnsupportedCommands } from './getUnsupportedCommands'; describe('workbench unsupported commands', () => { it('should return correct list', () => { - const expectedResult = ['monitor', 'subscribe', 'psubscribe', 'sync', 'psync', 'script debug', 'select']; + const expectedResult = ['monitor', 'subscribe', 'psubscribe', 'ssubscribe', 'sync', 'psync', 'script debug', 'select']; expect(getUnsupportedCommands()).toEqual(expectedResult); }); diff --git a/redisinsight/api/src/modules/workbench/utils/getUnsupportedCommands.ts b/redisinsight/api/src/modules/workbench/utils/getUnsupportedCommands.ts index bdd082a51a..a2c9720ed2 100644 --- a/redisinsight/api/src/modules/workbench/utils/getUnsupportedCommands.ts +++ b/redisinsight/api/src/modules/workbench/utils/getUnsupportedCommands.ts @@ -6,6 +6,7 @@ export enum WorkbenchToolUnsupportedCommands { Monitor = 'monitor', Subscribe = 'subscribe', PSubscribe = 'psubscribe', + SSubscribe = 'ssubscribe', Sync = 'sync', PSync = 'psync', ScriptDebug = 'script debug', diff --git a/redisinsight/api/test/api/info/GET-info-cli-unsupported-commands.test.ts b/redisinsight/api/test/api/info/GET-info-cli-unsupported-commands.test.ts index bbdd4d97bf..5e9baceb92 100644 --- a/redisinsight/api/test/api/info/GET-info-cli-unsupported-commands.test.ts +++ b/redisinsight/api/test/api/info/GET-info-cli-unsupported-commands.test.ts @@ -27,7 +27,7 @@ describe('GET /info/cli-unsupported-commands', () => { name: 'Should return array with unsupported commands for CLI tool', statusCode: 200, responseSchema, - responseBody: ['monitor', 'subscribe', 'psubscribe', 'sync', 'psync', 'script debug'], + responseBody: ['monitor', 'subscribe', 'psubscribe', 'ssubscribe', 'sync', 'psync', 'script debug'], }, ].map(mainCheckFn); }); diff --git a/redisinsight/api/test/api/pub-sub/POST-instance-id-pub-sub-messages.test.ts b/redisinsight/api/test/api/pub-sub/POST-instance-id-pub-sub-messages.test.ts new file mode 100644 index 0000000000..7ce81b166c --- /dev/null +++ b/redisinsight/api/test/api/pub-sub/POST-instance-id-pub-sub-messages.test.ts @@ -0,0 +1,116 @@ +import { + describe, + it, + before, + deps, + Joi, + requirements, + generateInvalidDataTestCases, + validateInvalidDataTestCase, + validateApiCall +} from '../deps'; +const { server, request, constants, rte } = deps; + +// endpoint to test +const endpoint = (instanceId = constants.TEST_INSTANCE_ID) => + request(server).post(`/instance/${instanceId}/pub-sub/messages`); + +const dataSchema = Joi.object({ + channel: Joi.string().required(), + message: Joi.string().required(), +}).messages({ + 'any.required': '{#label} should not be empty', +}).strict(); + +const validInputData = { + channel: constants.TEST_PUB_SUB_CHANNEL_1, + message: constants.TEST_PUB_SUB_MESSAGE_1, +}; + +const responseSchema = Joi.object().keys({ + affected: Joi.number().integer().required().min(0), +}).required().strict(); + +const mainCheckFn = async (testCase) => { + it(testCase.name, async () => { + // additional checks before test run + if (testCase.before) { + await testCase.before(); + } + + await validateApiCall({ + endpoint, + ...testCase, + }); + + // additional checks after test pass + if (testCase.after) { + await testCase.after(); + } + }); +}; + +describe('POST /instance/:instanceId/pub-sub/messages', () => { + describe('Validation', () => { + generateInvalidDataTestCases(dataSchema, validInputData).map( + validateInvalidDataTestCase(endpoint, dataSchema), + ); + }); + + describe('Common', () => { + [ + { + name: 'Should send message', + data: { + ...validInputData, + }, + responseSchema, + statusCode: 201, + }, + { + name: 'Should return NotFound error if instance id does not exists', + endpoint: () => endpoint(constants.TEST_NOT_EXISTED_INSTANCE_ID), + data: { + ...validInputData, + }, + statusCode: 404, + responseBody: { + statusCode: 404, + error: 'Not Found', + message: 'Invalid database instance id.', + }, + }, + ].map(mainCheckFn); + }); + + describe('ACL', () => { + requirements('rte.acl'); + + before(async () => await rte.data.generateKeys(true)); + before(async () => rte.data.setAclUserRules('~* +@all')); + + [ + { + name: 'Should publish method', + endpoint: () => endpoint(constants.TEST_INSTANCE_ACL_ID), + statusCode: 201, + data: { + ...validInputData, + }, + }, + { + name: 'Should throw error if no permissions for "publish" command', + endpoint: () => endpoint(constants.TEST_INSTANCE_ACL_ID), + data: { + ...validInputData, + }, + statusCode: 403, + responseBody: { + statusCode: 403, + error: 'Forbidden', + }, + before: () => rte.data.setAclUserRules('~* +@all -publish') + }, + ].map(mainCheckFn); + }); +}); diff --git a/redisinsight/api/test/api/ws/pub-sub/pub-sub.test.ts b/redisinsight/api/test/api/ws/pub-sub/pub-sub.test.ts new file mode 100644 index 0000000000..74de5aa032 --- /dev/null +++ b/redisinsight/api/test/api/ws/pub-sub/pub-sub.test.ts @@ -0,0 +1,331 @@ +import { + describe, + it, + before, + deps, + expect, + requirements, + _, +} from '../../deps'; +import { Socket } from "socket.io-client"; +const { getSocket, constants, rte } = deps; + +const getClient = async (instanceId): Promise => { + return getSocket('pub-sub', { + query: { instanceId }, + }); +}; + +const subscription = { + channel: 'channel-a', + type: 's', +}; + +const subscriptionB = { + channel: 'channel-b', + type: 's', +}; + +const pSubscription = { + channel: '*', + type: 'p', +}; + +let client; + +describe('pub-sub', function () { + this.timeout(10000); + beforeEach(async () => { + client = await getClient(constants.TEST_INSTANCE_ID); + }); + + afterEach(async () => { + client.close(); + }); + + describe('Connection edge cases', () => { + it('should not crash on 100 concurrent pub-sub connections to the same db', async () => { + await Promise.all((new Array(100).fill(1)).map(() => new Promise((res, rej) => { + client.emit('subscribe', { subscriptions: [pSubscription, subscription] }, (ack) => { + expect(ack).to.eql({ status: 'ok' }); + res(ack); + }); + client.on('exception', rej); + }))); + }); + }); + + describe('Client creation', () => { + it('Should successfully create a client', async () => { + expect(client instanceof Socket).to.eql(true); + }); + it('Should successfully create a client even when incorrect instanceId provided', async () => { + const client = await getClient(constants.TEST_NOT_EXISTED_INSTANCE_ID); + expect(client instanceof Socket).to.eql(true); + await client.close(); + }); + }); + + describe('subscribe', () => { + it('Should successfully subscribe', async () => { + await new Promise((resolve) => { + client.emit('subscribe', { subscriptions: [pSubscription] }, (ack) => { + expect(ack).to.eql({ status: 'ok' }); + resolve(ack); + }) + }); + }); + it('Should return Not Found acknowledge when incorrect instanceId', async () => { + const client = await getClient(constants.TEST_NOT_EXISTED_INSTANCE_ID); + await new Promise((resolve, reject) => { + client.emit('subscribe', { subscriptions: [pSubscription] }, (ack) => { + try { + expect(ack.status).to.eql('error'); + expect(ack.error.status).to.eql(404); + expect(ack.error.message).to.eql('Invalid database instance id.'); + expect(ack.error.name).to.eql('NotFoundException'); + resolve(null); + } catch (e) { + reject(e); + } + }) + }); + }); + }); + + describe('on message', () => { + it('Should receive message on particular channel only', async () => { + await new Promise((resolve, reject) => { + client.emit('subscribe', { subscriptions: [subscription, subscriptionB] }, async (ack) => { + expect(ack).to.eql({ status: 'ok' }); + + client.on('s:channel-a', (data) => { + expect(data.count).to.be.eql(1); + expect(data.messages.length).to.be.eql(1); + const [message] = data.messages; + expect(message.channel).to.eq('channel-a'); + expect(message.message).to.eq('message-a'); + expect(message.time).to.be.a('number'); + resolve(null); + }); + + client.on('s:channel-b', (data) => { + reject(new Error('Should not receive message-a in this listener-b')) + }); + + await rte.data.sendCommand('publish', ['channel-c', 'message-c']); + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + }) + }); + }); + it('Should receive bunch of logs for many subscriptions', async () => { + const messages = { + 'channel-a': [], + 'channel-b': [], + '*': [], + }; + + client.on('s:channel-a', (data) => messages['channel-a'].push(...data.messages)); + client.on('s:channel-b', (data) => messages['channel-b'].push(...data.messages)); + client.on('p:*', (data) => messages['*'].push(...data.messages)); + + await new Promise((resolve) => { + client.emit('subscribe', { subscriptions: [subscription, subscriptionB, pSubscription] }, async (ack) => { + expect(ack).to.eql({ status: 'ok' }); + + client.on('s:channel-b', resolve); + + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-b', 'message-b']); + }) + }); + + expect(messages['channel-a'].length).to.eql(4); + messages['channel-a'].forEach(message => { + expect(message.channel).to.eql('channel-a'); + }); + expect(messages['channel-b'].length).to.eql(1); + expect(messages['*'].length).to.eql(5); + }); + }); + + describe('unsubscribe', () => { + it('Should still receive messages on subscriptions left', async () => { + const messages = { + 'channel-a': [], + 'channel-b': [], + '*': [], + }; + + client.on('s:channel-a', (data) => messages['channel-a'].push(...data.messages)); + client.on('s:channel-b', (data) => messages['channel-b'].push(...data.messages)); + client.on('p:*', (data) => messages['*'].push(...data.messages)); + + await new Promise((resolve) => { + client.emit('subscribe', { subscriptions: [subscription, subscriptionB, pSubscription] }, async (ack) => { + expect(ack).to.eql({ status: 'ok' }); + + client.on('s:channel-b', resolve); + + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-b', 'message-b']); + }) + }); + + + await new Promise((resolve) => { + client.emit('unsubscribe', { subscriptions: [subscription, pSubscription] }, async (ack) => { + expect(ack).to.eql({ status: 'ok' }); + + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-b', 'message-b']); + + client.on('s:channel-b', resolve); + }) + }); + + expect(messages['channel-a'].length).to.eql(4); + messages['channel-a'].forEach(message => { + expect(message.channel).to.eql('channel-a'); + }); + expect(messages['channel-b'].length).to.eql(2); + expect(messages['*'].length).to.eql(5); + }); + + it('Should receive bunch of messages when subscribed only', async () => { + const messages = { + 'channel-a': [], + 'channel-b': [], + '*': [], + }; + + client.on('s:channel-a', (data) => messages['channel-a'].push(...data.messages)); + client.on('s:channel-b', (data) => messages['channel-b'].push(...data.messages)); + client.on('p:*', (data) => messages['*'].push(...data.messages)); + + await new Promise((resolve) => { + client.emit('subscribe', { subscriptions: [subscription, subscriptionB, pSubscription] }, async (ack) => { + expect(ack).to.eql({ status: 'ok' }); + + client.on('s:channel-b', resolve); + + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-b', 'message-b']); + }) + }); + + + await new Promise((resolve) => { + client.emit('unsubscribe', { subscriptions: [subscription, subscriptionB, pSubscription] }, async (ack) => { + expect(ack).to.eql({ status: 'ok' }); + + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-a', 'message-a']); + await rte.data.sendCommand('publish', ['channel-b', 'message-b']); + + resolve(null); + }) + }); + + expect(messages['channel-a'].length).to.eql(4); + messages['channel-a'].forEach(message => { + expect(message.channel).to.eql('channel-a'); + }); + expect(messages['channel-b'].length).to.eql(1); + expect(messages['*'].length).to.eql(5); + }); + }); + + describe('ACL', () => { + requirements('rte.acl'); + before(async () => rte.data.setAclUserRules('~* +@all')); + + it('should throw an error on connect without permissions (subscribe)', async () => { + await rte.data.setAclUserRules('~* +@all -subscribe'); + + const client = await getClient(constants.TEST_INSTANCE_ACL_ID); + + expect(client instanceof Socket).to.eql(true); + + await new Promise((resolve, reject) => { + client.emit('subscribe', { subscriptions: [subscription] }, (ack) => { + expect(ack.status).to.eql('error'); + expect(ack.error.status).to.eql(403); + expect(ack.error.message).to.have.string('NOPERM'); + resolve(null); + }) + }); + }); + + it('should throw an error on connect without permissions (psubscribe)', async () => { + await rte.data.setAclUserRules('~* +@all -psubscribe'); + + const client = await getClient(constants.TEST_INSTANCE_ACL_ID); + + expect(client instanceof Socket).to.eql(true); + + await new Promise((resolve, reject) => { + client.emit('subscribe', { subscriptions: [pSubscription] }, (ack) => { + expect(ack.status).to.eql('error'); + expect(ack.error.status).to.eql(403); + expect(ack.error.message).to.have.string('NOPERM'); + resolve(null); + }) + }); + }); + + it('should throw an error on connect without permissions (unsubscribe)', async () => { + await rte.data.setAclUserRules('~* +@all -unsubscribe'); + + const client = await getClient(constants.TEST_INSTANCE_ACL_ID); + + expect(client instanceof Socket).to.eql(true); + + await new Promise((resolve) => { + client.emit('subscribe', { subscriptions: [subscription] }, (ack) => { + expect(ack).to.deep.eql({ status: 'ok' }); + client.emit('unsubscribe', { subscriptions: [subscription] }, (ack) => { + expect(ack.status).to.eql('error'); + expect(ack.error.status).to.eql(403); + expect(ack.error.message).to.have.string('NOPERM'); + resolve(null); + }); + }); + }); + }); + + it('should throw an error on connect without permissions (punsubscribe)', async () => { + await rte.data.setAclUserRules('~* +@all -punsubscribe'); + + const client = await getClient(constants.TEST_INSTANCE_ACL_ID); + + expect(client instanceof Socket).to.eql(true); + + await new Promise((resolve) => { + client.emit('subscribe', { subscriptions: [pSubscription] }, (ack) => { + expect(ack).to.deep.eql({ status: 'ok' }); + client.emit('unsubscribe', { subscriptions: [pSubscription] }, (ack) => { + expect(ack.status).to.eql('error'); + expect(ack.error.status).to.eql(403); + expect(ack.error.message).to.have.string('NOPERM'); + resolve(null); + }); + }); + }); + }); + }); +}); diff --git a/redisinsight/api/test/helpers/constants.ts b/redisinsight/api/test/helpers/constants.ts index 620e416051..375dffe955 100644 --- a/redisinsight/api/test/helpers/constants.ts +++ b/redisinsight/api/test/helpers/constants.ts @@ -222,5 +222,14 @@ export const constants = { // Plugins TEST_PLUGIN_VISUALIZATION_ID_1: uuidv4(), + // Pub/Sub + TEST_PUB_SUB_CHANNEL_1: 'channel-a', + TEST_PUB_SUB_CHANNEL_2: 'channel-b', + TEST_PUB_SUB_CHANNEL_3: 'channel-c', + TEST_PUB_SUB_P_CHANNEL_1: '*', + TEST_PUB_SUB_MESSAGE_1: 'message-a', + TEST_PUB_SUB_MESSAGE_2: 'message-b', + TEST_PUB_SUB_MESSAGE_3: 'message-c', + // etc... }