Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions redisinsight/api/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { PluginModule } from 'src/modules/plugin/plugin.module';
import { CommandsModule } from 'src/modules/commands/commands.module';
import { WorkbenchModule } from 'src/modules/workbench/workbench.module';
import { SlowLogModule } from 'src/modules/slow-log/slow-log.module';
import { PubSubModule } from 'src/modules/pub-sub/pub-sub.module';
import { SharedModule } from './modules/shared/shared.module';
import { InstancesModule } from './modules/instances/instances.module';
import { BrowserModule } from './modules/browser/browser.module';
Expand Down Expand Up @@ -43,6 +44,7 @@ const PATH_CONFIG = config.get('dir_path');
PluginModule,
CommandsModule,
ProfilerModule,
PubSubModule,
SlowLogModule,
EventEmitterModule.forRoot(),
...(SERVER_CONFIG.staticContent
Expand Down
5 changes: 5 additions & 0 deletions redisinsight/api/src/app.routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { RedisSentinelModule } from 'src/modules/redis-sentinel/redis-sentinel.m
import { CliModule } from 'src/modules/cli/cli.module';
import { WorkbenchModule } from 'src/modules/workbench/workbench.module';
import { SlowLogModule } from 'src/modules/slow-log/slow-log.module';
import { PubSubModule } from 'src/modules/pub-sub/pub-sub.module';

export const routes: Routes = [
{
Expand All @@ -28,6 +29,10 @@ export const routes: Routes = [
path: '/:dbInstance',
module: SlowLogModule,
},
{
path: '/:dbInstance',
module: PubSubModule,
},
],
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { getUnsupportedCommands } from './getUnsupportedCommands';

describe('cli unsupported commands', () => {
it('should return correct list', () => {
const expectedResult = ['monitor', 'subscribe', 'psubscribe', 'sync', 'psync', 'script debug'];
const expectedResult = ['monitor', 'subscribe', 'psubscribe', 'ssubscribe', 'sync', 'psync', 'script debug'];

expect(getUnsupportedCommands()).toEqual(expectedResult);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export enum CliToolUnsupportedCommands {
Monitor = 'monitor',
Subscribe = 'subscribe',
PSubscribe = 'psubscribe',
SSubscribe = 'ssubscribe',
Sync = 'sync',
PSync = 'psync',
ScriptDebug = 'script debug',
Expand Down
2 changes: 1 addition & 1 deletion redisinsight/api/src/modules/profiler/models/log-file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ export class LogFile {
this.writeStream?.close();
this.writeStream = null;
const size = this.getFileSize();
fs.unlink(this.filePath);
fs.unlinkSync(this.filePath);

this.analyticsEvents.get(TelemetryEvents.ProfilerLogDeleted)(this.instanceId, size);
} catch (e) {
Expand Down
28 changes: 28 additions & 0 deletions redisinsight/api/src/modules/pub-sub/constants/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
export enum PubSubClientEvents {
Subscribe = 'subscribe',
Unsubscribe = 'unsubscribe',
}

export enum PubSubServerEvents {
Exception = 'exception',
}

export enum SubscriptionType {
Subscribe = 's',
PSubscribe = 'p',
SSubscribe = 'ss',
}

export enum RedisClientStatus {
Connecting = 'connecting',
Connected = 'connected',
Error = 'error',
End = 'end',
}

export enum RedisClientEvents {
Connected = 'connected',
ConnectionError = 'connection_error',
Message = 'message',
End = 'end',
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { get } from 'lodash';
import { createParamDecorator, ExecutionContext } from '@nestjs/common';
import { UserClient } from 'src/modules/pub-sub/model/user-client';

export const Client = createParamDecorator(
(data: unknown, ctx: ExecutionContext): UserClient => {
const socket = ctx.switchToWs().getClient();

return new UserClient(socket.id, socket, get(socket, 'handshake.query.instanceId'));
},
);
3 changes: 3 additions & 0 deletions redisinsight/api/src/modules/pub-sub/dto/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './subscribe.dto';
export * from './subscription.dto';
export * from './messages.response';
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { IMessage } from 'src/modules/pub-sub/interfaces/message.interface';

export class MessagesResponse {
messages: IMessage[];

count: number;
}
24 changes: 24 additions & 0 deletions redisinsight/api/src/modules/pub-sub/dto/publish.dto.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import {
IsNotEmpty, IsString,
} from 'class-validator';
import { ApiProperty } from '@nestjs/swagger';

export class PublishDto {
@ApiProperty({
type: String,
description: 'Message to send',
example: '{"hello":"world"}',
})
@IsNotEmpty()
@IsString()
message: string;

@ApiProperty({
type: String,
description: 'Chanel name',
example: 'channel-1',
})
@IsNotEmpty()
@IsString()
channel: string;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export class PublishResponse {
affected: number;
}
13 changes: 13 additions & 0 deletions redisinsight/api/src/modules/pub-sub/dto/subscribe.dto.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import {
ArrayNotEmpty, IsArray, ValidateNested,
} from 'class-validator';
import { Type } from 'class-transformer';
import { SubscriptionDto } from './subscription.dto';

export class SubscribeDto {
@IsArray()
@ArrayNotEmpty()
@ValidateNested({ each: true })
@Type(() => SubscriptionDto)
subscriptions: SubscriptionDto[];
}
12 changes: 12 additions & 0 deletions redisinsight/api/src/modules/pub-sub/dto/subscription.dto.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { SubscriptionType } from 'src/modules/pub-sub/constants';
import { IsEnum, IsNotEmpty, IsString } from 'class-validator';

export class SubscriptionDto {
@IsNotEmpty()
@IsString()
channel: string;

@IsNotEmpty()
@IsEnum(SubscriptionType)
type: SubscriptionType;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { HttpException } from '@nestjs/common';
import { isString } from 'lodash';

export class PubSubWsException extends Error {
status: number;

name: string;

constructor(err: Error | string) {
super();
this.status = 500;
this.message = 'Internal server error';
this.name = this.constructor.name;

if (isString(err)) {
this.message = err;
} else if (err instanceof HttpException) {
this.message = (err.getResponse())['message'];
this.status = err.getStatus();
this.name = err.constructor.name;
} else if (err instanceof Error) {
this.message = err.message;
this.name = 'Error';
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import {
ArgumentsHost, Catch, HttpException,
} from '@nestjs/common';
import { PubSubWsException } from 'src/modules/pub-sub/errors/pub-sub-ws.exception';

@Catch()
export class AckWsExceptionFilter {
public catch(exception: HttpException, host: ArgumentsHost) {
const callback = host.getArgByIndex(2);
this.handleError(callback, exception);
}

public handleError(callback: any, exception: Error) {
if (callback && typeof callback === 'function') {
callback({ status: 'error', error: new PubSubWsException(exception) });
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export interface IMessage {
message: string;

channel: string;

time: number;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import * as IORedis from 'ioredis';
import { IMessage } from 'src/modules/pub-sub/interfaces/message.interface';

export interface ISubscription {
getId(): string;

getChannel(): string;

getType(): string;

pushMessage(message: IMessage): void;

subscribe(client: IORedis.Redis | IORedis.Cluster): Promise<void>;

unsubscribe(client: IORedis.Redis | IORedis.Cluster): Promise<void>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { debounce } from 'lodash';
import { SubscriptionType } from 'src/modules/pub-sub/constants';
import { UserClient } from 'src/modules/pub-sub/model/user-client';
import { MessagesResponse, SubscriptionDto } from 'src/modules/pub-sub/dto';
import { ISubscription } from 'src/modules/pub-sub/interfaces/subscription.interface';
import { IMessage } from 'src/modules/pub-sub/interfaces/message.interface';
import * as IORedis from 'ioredis';

const EMIT_WAIT = 30;
const EMIT_MAX_WAIT = 100;
const MESSAGES_MAX = 5000;

export abstract class AbstractSubscription implements ISubscription {
protected readonly id: string;

protected readonly userClient: UserClient;

protected readonly debounce: any;

protected readonly channel: string;

protected readonly type: SubscriptionType;

protected messages: IMessage[] = [];

constructor(userClient: UserClient, dto: SubscriptionDto) {
this.userClient = userClient;
this.channel = dto.channel;
this.type = dto.type;
this.id = `${this.type}:${this.channel}`;
this.debounce = debounce(() => {
if (this.messages.length) {
this.userClient.getSocket()
.emit(this.id, {
messages: this.messages.slice(0, MESSAGES_MAX),
count: this.messages.length,
} as MessagesResponse);
this.messages = [];
}
}, EMIT_WAIT, {
maxWait: EMIT_MAX_WAIT,
});
}

getId() {
return this.id;
}

getChannel() {
return this.channel;
}

getType() {
return this.type;
}

abstract subscribe(client: IORedis.Redis | IORedis.Cluster): Promise<void>;

abstract unsubscribe(client: IORedis.Redis | IORedis.Cluster): Promise<void>;

pushMessage(message: IMessage) {
this.messages.push(message);

this.debounce();
}

toString() {
return `${this.constructor.name}:${JSON.stringify({
id: this.id,
mL: this.messages.length,
})}`;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { AbstractSubscription } from 'src/modules/pub-sub/model/abstract.subscription';
import * as IORedis from 'ioredis';

export class PatternSubscription extends AbstractSubscription {
async subscribe(client: IORedis.Redis | IORedis.Cluster): Promise<void> {
await client.psubscribe(this.channel);
}

async unsubscribe(client: IORedis.Redis | IORedis.Cluster): Promise<void> {
await client.punsubscribe(this.channel);
}
}
Loading