From e9b5b50f0f21b45b92c77f0059a29a908bf5c702 Mon Sep 17 00:00:00 2001 From: Artem Date: Mon, 23 May 2022 19:44:51 +0300 Subject: [PATCH 1/9] #RI-2931 BE implementation for create new consumer group(s) #RI-2933 BE implementation for consumer groups list --- .../api/src/constants/redis-error-codes.ts | 1 + .../api/src/modules/browser/browser.module.ts | 4 + .../constants/browser-tool-commands.ts | 3 + .../stream/consumer-group.controller.ts | 54 +++++ .../api/src/modules/browser/dto/stream.dto.ts | 80 ++++++- .../services/stream/consumer-group.service.ts | 205 ++++++++++++++++++ 6 files changed, 346 insertions(+), 1 deletion(-) create mode 100644 redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts create mode 100644 redisinsight/api/src/modules/browser/services/stream/consumer-group.service.ts diff --git a/redisinsight/api/src/constants/redis-error-codes.ts b/redisinsight/api/src/constants/redis-error-codes.ts index a7123dd422..baf151099e 100644 --- a/redisinsight/api/src/constants/redis-error-codes.ts +++ b/redisinsight/api/src/constants/redis-error-codes.ts @@ -10,6 +10,7 @@ export enum RedisErrorCodes { ConnectionReset = 'ECONNRESET', Timeout = 'ETIMEDOUT', CommandSyntaxError = 'syntax error', + BusyGroup = 'BUSYGROUP', UnknownCommand = 'unknown command', } diff --git a/redisinsight/api/src/modules/browser/browser.module.ts b/redisinsight/api/src/modules/browser/browser.module.ts index 34a1d37850..5acd869a7b 100644 --- a/redisinsight/api/src/modules/browser/browser.module.ts +++ b/redisinsight/api/src/modules/browser/browser.module.ts @@ -4,6 +4,8 @@ import { SharedModule } from 'src/modules/shared/shared.module'; import { RedisConnectionMiddleware } from 'src/middleware/redis-connection.middleware'; import { StreamController } from 'src/modules/browser/controllers/stream/stream.controller'; import { StreamService } from 'src/modules/browser/services/stream/stream.service'; +import { ConsumerGroupController } from 'src/modules/browser/controllers/stream/consumer-group.controller'; +import { ConsumerGroupService } from 'src/modules/browser/services/stream/consumer-group.service'; import { HashController } from './controllers/hash/hash.controller'; import { KeysController } from './controllers/keys/keys.controller'; import { KeysBusinessService } from './services/keys-business/keys-business.service'; @@ -32,6 +34,7 @@ import { BrowserToolClusterService } from './services/browser-tool-cluster/brows RejsonRlController, HashController, StreamController, + ConsumerGroupController, ], providers: [ KeysBusinessService, @@ -42,6 +45,7 @@ import { BrowserToolClusterService } from './services/browser-tool-cluster/brows RejsonRlBusinessService, HashBusinessService, StreamService, + ConsumerGroupService, BrowserToolService, BrowserToolClusterService, ], diff --git a/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts b/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts index 8b8a1436c2..b06e984b60 100644 --- a/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts +++ b/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts @@ -80,6 +80,9 @@ export enum BrowserToolStreamCommands { XRevRange = 'xrevrange', XAdd = 'xadd', XDel = 'xdel', + XInfoGroups = 'xinfo groups', + XPending = 'xpending', + XGroupCreate = 'xgroup create', } export enum BrowserToolTSCommands { diff --git a/redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts b/redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts new file mode 100644 index 0000000000..0ab45b87bc --- /dev/null +++ b/redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts @@ -0,0 +1,54 @@ +import { + Body, + Controller, + Param, + Post, + UsePipes, + ValidationPipe, +} from '@nestjs/common'; +import { ApiTags } from '@nestjs/swagger'; +import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-operation.decorator'; +import { + ConsumerGroupDto, CreateConsumerGroupsDto, + GetStreamEntriesResponse, +} from 'src/modules/browser/dto/stream.dto'; +import { ConsumerGroupService } from 'src/modules/browser/services/stream/consumer-group.service'; +import { KeyDto } from 'src/modules/browser/dto'; + +@ApiTags('Streams') +@Controller('streams/consumer-groups') +@UsePipes(new ValidationPipe({ transform: true })) +export class ConsumerGroupController { + constructor(private service: ConsumerGroupService) {} + + @Post('/get') + @ApiRedisInstanceOperation({ + description: 'Get stream entries', + statusCode: 200, + responses: [ + { + status: 200, + description: 'Returns ordered stream entries in defined range.', + type: GetStreamEntriesResponse, + }, + ], + }) + async getGroups( + @Param('dbInstance') instanceId: string, + @Body() dto: KeyDto, + ): Promise { + return this.service.getGroups({ instanceId }, dto); + } + + @Post('') + @ApiRedisInstanceOperation({ + description: 'Create stream consumer group', + statusCode: 201, + }) + async createGroups( + @Param('dbInstance') instanceId: string, + @Body() dto: CreateConsumerGroupsDto, + ): Promise { + return this.service.createGroups({ instanceId }, dto); + } +} diff --git a/redisinsight/api/src/modules/browser/dto/stream.dto.ts b/redisinsight/api/src/modules/browser/dto/stream.dto.ts index def52f4187..fdad16b5d3 100644 --- a/redisinsight/api/src/modules/browser/dto/stream.dto.ts +++ b/redisinsight/api/src/modules/browser/dto/stream.dto.ts @@ -1,4 +1,6 @@ -import { ApiProperty, ApiPropertyOptional, IntersectionType } from '@nestjs/swagger'; +import { + ApiProperty, ApiPropertyOptional, IntersectionType +} from '@nestjs/swagger'; import { ArrayNotEmpty, IsArray, @@ -183,3 +185,79 @@ export class CreateStreamDto extends IntersectionType( AddStreamEntriesDto, KeyWithExpireDto, ) {} + +export class ConsumerGroupDto { + @ApiProperty({ + type: String, + description: 'Consumer group name', + example: 'group', + }) + name: string; + + @ApiProperty({ + type: Number, + description: 'Number of consumers', + example: 2, + }) + consumers: number = 0; + + @ApiProperty({ + type: Number, + description: 'Number of pending messages', + example: 2, + }) + pending: number = 0; + + @ApiProperty({ + type: String, + description: 'Smallest Id of the message that is pending in the group', + example: '1657892649-0', + }) + smallestPendingId: string; + + @ApiProperty({ + type: String, + description: 'Greatest Id of the message that is pending in the group', + example: '1657892680-0', + }) + greatestPendingId: string; + + @ApiProperty({ + type: String, + description: 'Id of last delivered message', + example: '1657892648-0', + }) + lastDeliveredId: string; +} + +export class CreateConsumerGroupDto { + @ApiProperty({ + type: String, + description: 'Consumer group name', + example: 'group', + }) + @IsNotEmpty() + @IsString() + name: string; + + @ApiProperty({ + type: String, + description: 'Id of last delivered message', + example: '1657892648-0', + }) + @IsNotEmpty() + @IsString() + lastDeliveredId: string; +} + +export class CreateConsumerGroupsDto extends KeyDto { + @ApiProperty({ + type: () => CreateConsumerGroupDto, + isArray: true, + description: 'List of consumer groups to create', + }) + @ValidateNested() + @IsArray() + @Type(() => CreateConsumerGroupDto) + consumerGroups: CreateConsumerGroupDto[]; +} diff --git a/redisinsight/api/src/modules/browser/services/stream/consumer-group.service.ts b/redisinsight/api/src/modules/browser/services/stream/consumer-group.service.ts new file mode 100644 index 0000000000..b6d7590612 --- /dev/null +++ b/redisinsight/api/src/modules/browser/services/stream/consumer-group.service.ts @@ -0,0 +1,205 @@ +import { + BadRequestException, ConflictException, Injectable, Logger, NotFoundException, +} from '@nestjs/common'; +import { IFindRedisClientInstanceByOptions } from 'src/modules/core/services/redis/redis.service'; +import { RedisErrorCodes } from 'src/constants'; +import { catchAclError, catchTransactionError, convertStringsArrayToObject } from 'src/utils'; +import { + BrowserToolCommands, + BrowserToolKeysCommands, BrowserToolStreamCommands, +} from 'src/modules/browser/constants/browser-tool-commands'; +import { BrowserToolService } from 'src/modules/browser/services/browser-tool/browser-tool.service'; +import { KeyDto } from 'src/modules/browser/dto'; +import ERROR_MESSAGES from 'src/constants/error-messages'; +import { ConsumerGroupDto, CreateConsumerGroupsDto } from 'src/modules/browser/dto/stream.dto'; + +@Injectable() +export class ConsumerGroupService { + private logger = new Logger('ConsumerGroupService'); + + constructor(private browserTool: BrowserToolService) {} + + /** + * Get consumer groups list for particular stream + * In addition fetch pending messages info for each group + * !May be slow on huge streams as 'XPENDING' command tagged with as @slow + * @param clientOptions + * @param dto + */ + async getGroups( + clientOptions: IFindRedisClientInstanceByOptions, + dto: KeyDto, + ): Promise { + try { + this.logger.log('Getting consumer groups list.'); + + const exists = await this.browserTool.execCommand( + clientOptions, + BrowserToolKeysCommands.Exists, + [dto.keyName], + ); + + if (!exists) { + return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); + } + + const groups = ConsumerGroupService.formatReplyToDto(await this.browserTool.execCommand( + clientOptions, + BrowserToolStreamCommands.XInfoGroups, + [dto.keyName], + )); + + return await Promise.all(groups.map((group) => this.getGroupInfo( + clientOptions, + dto, + group, + ))); + } catch (error) { + if (error instanceof NotFoundException) { + throw error; + } + + if (error?.message.includes(RedisErrorCodes.WrongType)) { + throw new BadRequestException(error.message); + } + + throw catchAclError(error); + } + } + + /** + * Get consumer group pending info using 'XPENDING' command + * @param clientOptions + * @param dto + * @param group + */ + async getGroupInfo( + clientOptions: IFindRedisClientInstanceByOptions, + dto: KeyDto, + group: ConsumerGroupDto, + ): Promise { + const info = await this.browserTool.execCommand( + clientOptions, + BrowserToolStreamCommands.XPending, + [dto.keyName, group.name], + ); + + return { + ...group, + smallestPendingId: info?.[1] || null, + greatestPendingId: info?.[2] || null, + }; + } + + /** + * Create consumer group(s) + * @param clientOptions + * @param dto + */ + async createGroups( + clientOptions: IFindRedisClientInstanceByOptions, + dto: CreateConsumerGroupsDto, + ): Promise { + try { + this.logger.log('Creating consumer groups.'); + const { keyName, consumerGroups } = dto; + + const exists = await this.browserTool.execCommand( + clientOptions, + BrowserToolKeysCommands.Exists, + [keyName], + ); + + if (!exists) { + return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); + } + + const toolCommands: Array<[ + toolCommand: BrowserToolCommands, + ...args: Array, + ]> = consumerGroups.map((group) => ( + [ + BrowserToolStreamCommands.XGroupCreate, + keyName, + group.name, + group.lastDeliveredId, + ] + )); + + const [ + transactionError, + transactionResults, + ] = await this.browserTool.execMulti(clientOptions, toolCommands); + catchTransactionError(transactionError, transactionResults); + + this.logger.log('Stream consumer group(s) created.'); + + return undefined; + } catch (error) { + if (error instanceof NotFoundException) { + throw error; + } + + if (error?.message.includes(RedisErrorCodes.WrongType)) { + throw new BadRequestException(error.message); + } + + if (error?.message.includes(RedisErrorCodes.BusyGroup)) { + throw new ConflictException(error.message); + } + + throw catchAclError(error); + } + } + + /** + * Converts RESP response from Redis + * [ + * ['name', 'g1', 'consumers', 0, 'pending', 0, 'last-delivered-id', '1653034260278-0'], + * ['name', 'g2', 'consumers', 0, 'pending', 0, 'last-delivered-id', '1653034260278-0'], + * ... + * ] + * + * to DTO + * + * [ + * { + * name: 'g1', + * consumers: 0, + * pending: 0, + * lastDeliveredId: '1653034260278-0' + * }, + * { + * name: 'g2', + * consumers: 0, + * pending: 0, + * lastDeliveredId: '1653034260278-0' + * }, + * ... + * ] + * @param reply + */ + static formatReplyToDto(reply: Array>): ConsumerGroupDto[] { + return reply.map(ConsumerGroupService.formatArrayToDto); + } + + /** + * Format single reply entry to DTO + * @param entry + */ + static formatArrayToDto(entry: Array): ConsumerGroupDto { + if (!entry?.length) { + return null; + } + const entryObj = convertStringsArrayToObject(entry as string[]); + + return { + name: entryObj['name'], + consumers: entryObj['consumers'], + pending: entryObj['pending'], + lastDeliveredId: entryObj['last-delivered-id'], + smallestPendingId: null, + greatestPendingId: null, + }; + } +} From f16336bbcd938dded9f846b710631ce1b6bebb1a Mon Sep 17 00:00:00 2001 From: Artem Date: Mon, 23 May 2022 20:34:35 +0300 Subject: [PATCH 2/9] #RI-2937 BE Modify lastDeliveredId --- .../constants/browser-tool-commands.ts | 1 + .../stream/consumer-group.controller.ts | 22 +++++++-- .../api/src/modules/browser/dto/stream.dto.ts | 7 ++- .../services/stream/consumer-group.service.ts | 46 ++++++++++++++++++- 4 files changed, 69 insertions(+), 7 deletions(-) diff --git a/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts b/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts index b06e984b60..2700ca13f1 100644 --- a/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts +++ b/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts @@ -83,6 +83,7 @@ export enum BrowserToolStreamCommands { XInfoGroups = 'xinfo groups', XPending = 'xpending', XGroupCreate = 'xgroup create', + XGroupSetId = 'xgroup setid', } export enum BrowserToolTSCommands { diff --git a/redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts b/redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts index 0ab45b87bc..0f295b3fa9 100644 --- a/redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts +++ b/redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts @@ -1,7 +1,7 @@ import { Body, Controller, - Param, + Param, Patch, Post, UsePipes, ValidationPipe, @@ -9,8 +9,7 @@ import { import { ApiTags } from '@nestjs/swagger'; import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-operation.decorator'; import { - ConsumerGroupDto, CreateConsumerGroupsDto, - GetStreamEntriesResponse, + ConsumerGroupDto, CreateConsumerGroupsDto, UpdateConsumerGroupDto, } from 'src/modules/browser/dto/stream.dto'; import { ConsumerGroupService } from 'src/modules/browser/services/stream/consumer-group.service'; import { KeyDto } from 'src/modules/browser/dto'; @@ -28,8 +27,9 @@ export class ConsumerGroupController { responses: [ { status: 200, - description: 'Returns ordered stream entries in defined range.', - type: GetStreamEntriesResponse, + description: 'Returns stream consumer groups.', + type: ConsumerGroupDto, + isArray: true, }, ], }) @@ -51,4 +51,16 @@ export class ConsumerGroupController { ): Promise { return this.service.createGroups({ instanceId }, dto); } + + @Patch('') + @ApiRedisInstanceOperation({ + description: 'Modify last delivered ID of the Consumer Group', + statusCode: 200, + }) + async updateGroup( + @Param('dbInstance') instanceId: string, + @Body() dto: UpdateConsumerGroupDto, + ): Promise { + return this.service.updateGroup({ instanceId }, dto); + } } diff --git a/redisinsight/api/src/modules/browser/dto/stream.dto.ts b/redisinsight/api/src/modules/browser/dto/stream.dto.ts index fdad16b5d3..b7645877e8 100644 --- a/redisinsight/api/src/modules/browser/dto/stream.dto.ts +++ b/redisinsight/api/src/modules/browser/dto/stream.dto.ts @@ -1,5 +1,5 @@ import { - ApiProperty, ApiPropertyOptional, IntersectionType + ApiProperty, ApiPropertyOptional, IntersectionType, } from '@nestjs/swagger'; import { ArrayNotEmpty, @@ -261,3 +261,8 @@ export class CreateConsumerGroupsDto extends KeyDto { @Type(() => CreateConsumerGroupDto) consumerGroups: CreateConsumerGroupDto[]; } + +export class UpdateConsumerGroupDto extends IntersectionType( + KeyDto, + CreateConsumerGroupDto, +) {} diff --git a/redisinsight/api/src/modules/browser/services/stream/consumer-group.service.ts b/redisinsight/api/src/modules/browser/services/stream/consumer-group.service.ts index b6d7590612..27937c97d7 100644 --- a/redisinsight/api/src/modules/browser/services/stream/consumer-group.service.ts +++ b/redisinsight/api/src/modules/browser/services/stream/consumer-group.service.ts @@ -11,7 +11,7 @@ import { import { BrowserToolService } from 'src/modules/browser/services/browser-tool/browser-tool.service'; import { KeyDto } from 'src/modules/browser/dto'; import ERROR_MESSAGES from 'src/constants/error-messages'; -import { ConsumerGroupDto, CreateConsumerGroupsDto } from 'src/modules/browser/dto/stream.dto'; +import {ConsumerGroupDto, CreateConsumerGroupsDto, UpdateConsumerGroupDto} from 'src/modules/browser/dto/stream.dto'; @Injectable() export class ConsumerGroupService { @@ -152,6 +152,50 @@ export class ConsumerGroupService { } } + /** + * Updates last delivered id for Consumer Group + * @param clientOptions + * @param dto + */ + async updateGroup( + clientOptions: IFindRedisClientInstanceByOptions, + dto: UpdateConsumerGroupDto, + ): Promise { + try { + this.logger.log('Updating consumer group.'); + + const exists = await this.browserTool.execCommand( + clientOptions, + BrowserToolKeysCommands.Exists, + [dto.keyName], + ); + + if (!exists) { + return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); + } + + await this.browserTool.execCommand( + clientOptions, + BrowserToolStreamCommands.XGroupSetId, + [dto.keyName, dto.name, dto.lastDeliveredId], + ); + + this.logger.log('Consumer group was updated.'); + + return undefined; + } catch (error) { + if (error instanceof NotFoundException) { + throw error; + } + + if (error?.message.includes(RedisErrorCodes.WrongType)) { + throw new BadRequestException(error.message); + } + + throw catchAclError(error); + } + } + /** * Converts RESP response from Redis * [ From 83b8e8413fbe1c319362166b100cc79666cf2336 Mon Sep 17 00:00:00 2001 From: Artem Date: Mon, 23 May 2022 20:53:44 +0300 Subject: [PATCH 3/9] #RI-2939 BE Delete Consumer Group --- .../constants/browser-tool-commands.ts | 1 + .../stream/consumer-group.controller.ts | 27 +++++++- .../api/src/modules/browser/dto/stream.dto.ts | 23 +++++++ .../services/stream/consumer-group.service.ts | 64 ++++++++++++++++++- 4 files changed, 112 insertions(+), 3 deletions(-) diff --git a/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts b/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts index 2700ca13f1..d645e9c656 100644 --- a/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts +++ b/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts @@ -84,6 +84,7 @@ export enum BrowserToolStreamCommands { XPending = 'xpending', XGroupCreate = 'xgroup create', XGroupSetId = 'xgroup setid', + XGroupDestroy = 'xgroup destroy', } export enum BrowserToolTSCommands { diff --git a/redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts b/redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts index 0f295b3fa9..8dd1df4b04 100644 --- a/redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts +++ b/redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts @@ -1,6 +1,6 @@ import { Body, - Controller, + Controller, Delete, Param, Patch, Post, UsePipes, @@ -9,7 +9,11 @@ import { import { ApiTags } from '@nestjs/swagger'; import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-operation.decorator'; import { - ConsumerGroupDto, CreateConsumerGroupsDto, UpdateConsumerGroupDto, + ConsumerGroupDto, + CreateConsumerGroupsDto, + DeleteConsumerGroupsDto, + DeleteConsumerGroupsResponse, + UpdateConsumerGroupDto, } from 'src/modules/browser/dto/stream.dto'; import { ConsumerGroupService } from 'src/modules/browser/services/stream/consumer-group.service'; import { KeyDto } from 'src/modules/browser/dto'; @@ -63,4 +67,23 @@ export class ConsumerGroupController { ): Promise { return this.service.updateGroup({ instanceId }, dto); } + + @Delete('') + @ApiRedisInstanceOperation({ + description: 'Delete Consumer Group', + statusCode: 200, + responses: [ + { + status: 200, + description: 'Returns number of affected consumer groups.', + type: DeleteConsumerGroupsResponse, + }, + ], + }) + async deleteGroup( + @Param('dbInstance') instanceId: string, + @Body() dto: DeleteConsumerGroupsDto, + ): Promise { + return this.service.deleteGroup({ instanceId }, dto); + } } diff --git a/redisinsight/api/src/modules/browser/dto/stream.dto.ts b/redisinsight/api/src/modules/browser/dto/stream.dto.ts index b7645877e8..e45936a39c 100644 --- a/redisinsight/api/src/modules/browser/dto/stream.dto.ts +++ b/redisinsight/api/src/modules/browser/dto/stream.dto.ts @@ -258,6 +258,7 @@ export class CreateConsumerGroupsDto extends KeyDto { }) @ValidateNested() @IsArray() + @ArrayNotEmpty() @Type(() => CreateConsumerGroupDto) consumerGroups: CreateConsumerGroupDto[]; } @@ -266,3 +267,25 @@ export class UpdateConsumerGroupDto extends IntersectionType( KeyDto, CreateConsumerGroupDto, ) {} + +export class DeleteConsumerGroupsDto extends KeyDto { + @ApiProperty({ + description: 'Consumer group names', + type: String, + isArray: true, + example: ['Group-1', 'Group-1'], + }) + @IsDefined() + @IsArray() + @ArrayNotEmpty() + @Type(() => String) + consumerGroups: string[]; +} + +export class DeleteConsumerGroupsResponse { + @ApiProperty({ + description: 'Number of deleted consumer groups', + type: Number, + }) + affected: number; +} diff --git a/redisinsight/api/src/modules/browser/services/stream/consumer-group.service.ts b/redisinsight/api/src/modules/browser/services/stream/consumer-group.service.ts index 27937c97d7..d9fa16881d 100644 --- a/redisinsight/api/src/modules/browser/services/stream/consumer-group.service.ts +++ b/redisinsight/api/src/modules/browser/services/stream/consumer-group.service.ts @@ -11,7 +11,12 @@ import { import { BrowserToolService } from 'src/modules/browser/services/browser-tool/browser-tool.service'; import { KeyDto } from 'src/modules/browser/dto'; import ERROR_MESSAGES from 'src/constants/error-messages'; -import {ConsumerGroupDto, CreateConsumerGroupsDto, UpdateConsumerGroupDto} from 'src/modules/browser/dto/stream.dto'; +import { + ConsumerGroupDto, + CreateConsumerGroupsDto, + DeleteConsumerGroupsDto, DeleteConsumerGroupsResponse, + UpdateConsumerGroupDto +} from 'src/modules/browser/dto/stream.dto'; @Injectable() export class ConsumerGroupService { @@ -196,6 +201,63 @@ export class ConsumerGroupService { } } + /** + * Delete consumer groups in batch + * @param clientOptions + * @param dto + */ + async deleteGroup( + clientOptions: IFindRedisClientInstanceByOptions, + dto: DeleteConsumerGroupsDto, + ): Promise { + try { + this.logger.log('Deleting consumer group.'); + + const exists = await this.browserTool.execCommand( + clientOptions, + BrowserToolKeysCommands.Exists, + [dto.keyName], + ); + + if (!exists) { + return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); + } + + const toolCommands: Array<[ + toolCommand: BrowserToolCommands, + ...args: Array, + ]> = dto.consumerGroups.map((group) => ( + [ + BrowserToolStreamCommands.XGroupDestroy, + dto.keyName, + group, + ] + )); + + const [ + transactionError, + transactionResults, + ] = await this.browserTool.execMulti(clientOptions, toolCommands); + catchTransactionError(transactionError, transactionResults); + + this.logger.log('Consumer group(s) successfully deleted.'); + + return { + affected: toolCommands.length, + }; + } catch (error) { + if (error instanceof NotFoundException) { + throw error; + } + + if (error?.message.includes(RedisErrorCodes.WrongType)) { + throw new BadRequestException(error.message); + } + + throw catchAclError(error); + } + } + /** * Converts RESP response from Redis * [ From 6df022ab54eacb959e866ad02642c1e0ac5e8ae1 Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 24 May 2022 08:26:15 +0300 Subject: [PATCH 4/9] #RI-2935 BE Show Consumers List --- .../api/src/modules/browser/browser.module.ts | 7 ++ .../constants/browser-tool-commands.ts | 1 + .../stream/consumer-group.controller.ts | 2 +- .../controllers/stream/consumer.controller.ts | 43 +++++++ .../api/src/modules/browser/dto/stream.dto.ts | 35 ++++++ .../services/stream/consumer.service.ts | 109 ++++++++++++++++++ 6 files changed, 196 insertions(+), 1 deletion(-) create mode 100644 redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts create mode 100644 redisinsight/api/src/modules/browser/services/stream/consumer.service.ts diff --git a/redisinsight/api/src/modules/browser/browser.module.ts b/redisinsight/api/src/modules/browser/browser.module.ts index 5acd869a7b..d9e66d305e 100644 --- a/redisinsight/api/src/modules/browser/browser.module.ts +++ b/redisinsight/api/src/modules/browser/browser.module.ts @@ -6,6 +6,8 @@ import { StreamController } from 'src/modules/browser/controllers/stream/stream. import { StreamService } from 'src/modules/browser/services/stream/stream.service'; import { ConsumerGroupController } from 'src/modules/browser/controllers/stream/consumer-group.controller'; import { ConsumerGroupService } from 'src/modules/browser/services/stream/consumer-group.service'; +import { ConsumerController } from 'src/modules/browser/controllers/stream/consumer.controller'; +import { ConsumerService } from 'src/modules/browser/services/stream/consumer.service'; import { HashController } from './controllers/hash/hash.controller'; import { KeysController } from './controllers/keys/keys.controller'; import { KeysBusinessService } from './services/keys-business/keys-business.service'; @@ -35,6 +37,7 @@ import { BrowserToolClusterService } from './services/browser-tool-cluster/brows HashController, StreamController, ConsumerGroupController, + ConsumerController, ], providers: [ KeysBusinessService, @@ -46,6 +49,7 @@ import { BrowserToolClusterService } from './services/browser-tool-cluster/brows HashBusinessService, StreamService, ConsumerGroupService, + ConsumerService, BrowserToolService, BrowserToolClusterService, ], @@ -62,6 +66,9 @@ export class BrowserModule implements NestModule { RouterModule.resolvePath(SetController), RouterModule.resolvePath(ZSetController), RouterModule.resolvePath(RejsonRlController), + RouterModule.resolvePath(StreamController), + RouterModule.resolvePath(ConsumerGroupController), + RouterModule.resolvePath(ConsumerController), ); } } diff --git a/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts b/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts index d645e9c656..9bfe82ed28 100644 --- a/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts +++ b/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts @@ -81,6 +81,7 @@ export enum BrowserToolStreamCommands { XAdd = 'xadd', XDel = 'xdel', XInfoGroups = 'xinfo groups', + XInfoConsumers = 'xinfo consumers', XPending = 'xpending', XGroupCreate = 'xgroup create', XGroupSetId = 'xgroup setid', diff --git a/redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts b/redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts index 8dd1df4b04..1a97b260e5 100644 --- a/redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts +++ b/redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts @@ -26,7 +26,7 @@ export class ConsumerGroupController { @Post('/get') @ApiRedisInstanceOperation({ - description: 'Get stream entries', + description: 'Get consumer groups list', statusCode: 200, responses: [ { diff --git a/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts b/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts new file mode 100644 index 0000000000..0774ded8a7 --- /dev/null +++ b/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts @@ -0,0 +1,43 @@ +import { + Body, + Controller, + Param, + Post, + UsePipes, + ValidationPipe, +} from '@nestjs/common'; +import { ApiTags } from '@nestjs/swagger'; +import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-operation.decorator'; +import { + ConsumerDto, + ConsumerGroupDto, + GetConsumersDto, +} from 'src/modules/browser/dto/stream.dto'; +import { ConsumerService } from 'src/modules/browser/services/stream/consumer.service'; + +@ApiTags('Streams') +@Controller('streams/consumer-groups/consumers') +@UsePipes(new ValidationPipe({ transform: true })) +export class ConsumerController { + constructor(private service: ConsumerService) {} + + @Post('/get') + @ApiRedisInstanceOperation({ + description: 'Get stream entries', + statusCode: 200, + responses: [ + { + status: 200, + description: 'Returns stream consumer groups.', + type: ConsumerGroupDto, + isArray: true, + }, + ], + }) + async getConsumers( + @Param('dbInstance') instanceId: string, + @Body() dto: GetConsumersDto, + ): Promise { + return this.service.getConsumers({ instanceId }, dto); + } +} diff --git a/redisinsight/api/src/modules/browser/dto/stream.dto.ts b/redisinsight/api/src/modules/browser/dto/stream.dto.ts index e45936a39c..fbc88fee28 100644 --- a/redisinsight/api/src/modules/browser/dto/stream.dto.ts +++ b/redisinsight/api/src/modules/browser/dto/stream.dto.ts @@ -289,3 +289,38 @@ export class DeleteConsumerGroupsResponse { }) affected: number; } + +export class ConsumerDto { + @ApiProperty({ + type: String, + description: 'The consumer\'s name', + example: 'consumer-1', + }) + name: string; + + @ApiProperty({ + type: Number, + description: 'The number of pending messages for the client, ' + + 'which are messages that were delivered but are yet to be acknowledged', + example: 2, + }) + pending: number = 0; + + @ApiProperty({ + type: Number, + description: 'The number of milliseconds that have passed since the consumer last interacted with the server', + example: 22442, + }) + idle: number = 0; +} + +export class GetConsumersDto extends KeyDto { + @ApiProperty({ + type: String, + description: 'Consumer group name', + example: 'group-1', + }) + @IsNotEmpty() + @IsString() + groupName: string; +} diff --git a/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts b/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts new file mode 100644 index 0000000000..0845535246 --- /dev/null +++ b/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts @@ -0,0 +1,109 @@ +import { + BadRequestException, Injectable, Logger, NotFoundException, +} from '@nestjs/common'; +import { IFindRedisClientInstanceByOptions } from 'src/modules/core/services/redis/redis.service'; +import { RedisErrorCodes } from 'src/constants'; +import { catchAclError, convertStringsArrayToObject } from 'src/utils'; +import { + BrowserToolKeysCommands, BrowserToolStreamCommands, +} from 'src/modules/browser/constants/browser-tool-commands'; +import { BrowserToolService } from 'src/modules/browser/services/browser-tool/browser-tool.service'; +import ERROR_MESSAGES from 'src/constants/error-messages'; +import { + ConsumerDto, + GetConsumersDto, +} from 'src/modules/browser/dto/stream.dto'; + +@Injectable() +export class ConsumerService { + private logger = new Logger('ConsumerService'); + + constructor(private browserTool: BrowserToolService) {} + + /** + * Get consumers list inside particular group + * @param clientOptions + * @param dto + */ + async getConsumers( + clientOptions: IFindRedisClientInstanceByOptions, + dto: GetConsumersDto, + ): Promise { + try { + this.logger.log('Getting consumers list.'); + + const exists = await this.browserTool.execCommand( + clientOptions, + BrowserToolKeysCommands.Exists, + [dto.keyName], + ); + + if (!exists) { + return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); + } + + return ConsumerService.formatReplyToDto(await this.browserTool.execCommand( + clientOptions, + BrowserToolStreamCommands.XInfoConsumers, + [dto.keyName, dto.groupName], + )); + } catch (error) { + if (error instanceof NotFoundException) { + throw error; + } + + if (error?.message.includes(RedisErrorCodes.WrongType)) { + throw new BadRequestException(error.message); + } + + throw catchAclError(error); + } + } + + /** + * Converts RESP response from Redis + * [ + * ['name', 'consumer-1', 'pending', 0, 'idle', 258741], + * ['name', 'consumer-2', 'pending', 0, 'idle', 258741], + * ... + * ] + * + * to DTO + * + * [ + * { + * name: 'consumer-1', + * pending: 0, + * idle: 258741, + * }, + * { + * name: 'consumer-2', + * pending: 0, + * idle: 258741, + * }, + * ... + * ] + * @param reply + */ + static formatReplyToDto(reply: Array>): ConsumerDto[] { + return reply.map(ConsumerService.formatArrayToDto); + } + + /** + * Format single reply entry to DTO + * @param entry + */ + static formatArrayToDto(entry: Array): ConsumerDto { + if (!entry?.length) { + return null; + } + + const entryObj = convertStringsArrayToObject(entry as string[]); + + return { + name: entryObj['name'], + pending: entryObj['pending'], + idle: entryObj['idle'], + }; + } +} From 9810719664cbb084b6c0e7f1fa8c0bfaea57a336 Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 24 May 2022 09:02:16 +0300 Subject: [PATCH 5/9] #RI-2943 BE for pending messages list --- .../controllers/stream/consumer.controller.ts | 24 +++++- .../api/src/modules/browser/dto/stream.dto.ts | 72 ++++++++++++++++ .../services/stream/consumer.service.ts | 83 ++++++++++++++++++- 3 files changed, 175 insertions(+), 4 deletions(-) diff --git a/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts b/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts index 0774ded8a7..8a6e137479 100644 --- a/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts +++ b/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts @@ -11,7 +11,7 @@ import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-ope import { ConsumerDto, ConsumerGroupDto, - GetConsumersDto, + GetConsumersDto, GetPendingMessagesDto, PendingMessageDto, } from 'src/modules/browser/dto/stream.dto'; import { ConsumerService } from 'src/modules/browser/services/stream/consumer.service'; @@ -23,12 +23,11 @@ export class ConsumerController { @Post('/get') @ApiRedisInstanceOperation({ - description: 'Get stream entries', + description: 'Get group consumers', statusCode: 200, responses: [ { status: 200, - description: 'Returns stream consumer groups.', type: ConsumerGroupDto, isArray: true, }, @@ -40,4 +39,23 @@ export class ConsumerController { ): Promise { return this.service.getConsumers({ instanceId }, dto); } + + @Post('/pending-messages/get') + @ApiRedisInstanceOperation({ + description: 'Get pending messages list', + statusCode: 200, + responses: [ + { + status: 200, + type: PendingMessageDto, + isArray: true, + }, + ], + }) + async getPendingMessages( + @Param('dbInstance') instanceId: string, + @Body() dto: GetPendingMessagesDto, + ): Promise { + return this.service.getPendingMessages({ instanceId }, dto); + } } diff --git a/redisinsight/api/src/modules/browser/dto/stream.dto.ts b/redisinsight/api/src/modules/browser/dto/stream.dto.ts index fbc88fee28..228e57d6cd 100644 --- a/redisinsight/api/src/modules/browser/dto/stream.dto.ts +++ b/redisinsight/api/src/modules/browser/dto/stream.dto.ts @@ -324,3 +324,75 @@ export class GetConsumersDto extends KeyDto { @IsString() groupName: string; } + +export class PendingMessageDto { + @ApiProperty({ + type: String, + description: 'Entry ID', + example: '*', + }) + id: string; + + @ApiProperty({ + type: String, + description: 'Consumer name', + example: 'consumer-1', + }) + consumerName: string; + + @ApiProperty({ + type: Number, + description: 'The number of milliseconds that elapsed since the last time ' + + 'this message was delivered to this consumer', + example: 22442, + }) + idle: number = 0; + + @ApiProperty({ + type: Number, + description: 'The number of times this message was delivered', + example: 2, + }) + delivered: number = 0; +} + +export class GetPendingMessagesDto extends IntersectionType( + KeyDto, + GetConsumersDto, +) { + @ApiProperty({ + type: String, + description: 'Consumer name', + example: 'consumer-1', + }) + @IsNotEmpty() + @IsString() + consumerName: string; + + @ApiPropertyOptional({ + description: 'Specifying the start id', + type: String, + default: '-', + }) + @IsString() + start?: string = '-'; + + @ApiPropertyOptional({ + description: 'Specifying the end id', + type: String, + default: '+', + }) + @IsString() + end?: string = '+'; + + @ApiPropertyOptional({ + description: + 'Specifying the number of pending messages to return.', + type: Number, + minimum: 1, + default: 500, + }) + @IsInt() + @Min(1) + count?: number = 500; +} diff --git a/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts b/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts index 0845535246..15a7783c55 100644 --- a/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts +++ b/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts @@ -11,7 +11,7 @@ import { BrowserToolService } from 'src/modules/browser/services/browser-tool/br import ERROR_MESSAGES from 'src/constants/error-messages'; import { ConsumerDto, - GetConsumersDto, + GetConsumersDto, GetPendingMessagesDto, PendingMessageDto, } from 'src/modules/browser/dto/stream.dto'; @Injectable() @@ -60,6 +60,46 @@ export class ConsumerService { } } + /** + * Get list of pending messages info for particular consumer + * @param clientOptions + * @param dto + */ + async getPendingMessages( + clientOptions: IFindRedisClientInstanceByOptions, + dto: GetPendingMessagesDto, + ): Promise { + try { + this.logger.log('Getting pending messages list.'); + + const exists = await this.browserTool.execCommand( + clientOptions, + BrowserToolKeysCommands.Exists, + [dto.keyName], + ); + + if (!exists) { + return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); + } + + return ConsumerService.formatReplyToPendingMessagesDto(await this.browserTool.execCommand( + clientOptions, + BrowserToolStreamCommands.XPending, + [dto.keyName, dto.groupName, dto.start, dto.end, dto.count, dto.consumerName], + )); + } catch (error) { + if (error instanceof NotFoundException) { + throw error; + } + + if (error?.message.includes(RedisErrorCodes.WrongType)) { + throw new BadRequestException(error.message); + } + + throw catchAclError(error); + } + } + /** * Converts RESP response from Redis * [ @@ -106,4 +146,45 @@ export class ConsumerService { idle: entryObj['idle'], }; } + + /** + * Converts RESP response from Redis + * [ + * ['1567352639-0', 'consumer-1', 258741, 2], + * ... + * ] + * + * to DTO + * + * [ + * { + * id: '1567352639-0', + * name: 'consumer-1', + * idle: 258741, + * delivered: 2, + * }, + * ... + * ] + * @param reply + */ + static formatReplyToPendingMessagesDto(reply: Array>): PendingMessageDto[] { + return reply.map(ConsumerService.formatArrayToPendingMessageDto); + } + + /** + * Format single reply entry to DTO + * @param entry + */ + static formatArrayToPendingMessageDto(entry: Array): PendingMessageDto { + if (!entry?.length) { + return null; + } + + return { + id: `${entry[0]}`, + consumerName: `${entry[1]}`, + idle: +entry[2], + delivered: +entry[3], + }; + } } From 5f4f60322bd5148e88e208ea21325b7e5e19cd35 Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 24 May 2022 09:08:31 +0300 Subject: [PATCH 6/9] #RI-2943 BE rename PendingMessage -> PendingEntry --- .../browser/controllers/stream/consumer.controller.ts | 8 ++++---- redisinsight/api/src/modules/browser/dto/stream.dto.ts | 4 ++-- .../browser/services/stream/consumer.service.ts | 10 +++++----- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts b/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts index 8a6e137479..0993491832 100644 --- a/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts +++ b/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts @@ -11,7 +11,7 @@ import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-ope import { ConsumerDto, ConsumerGroupDto, - GetConsumersDto, GetPendingMessagesDto, PendingMessageDto, + GetConsumersDto, GetPendingEntriesDto, PendingEntryDto, } from 'src/modules/browser/dto/stream.dto'; import { ConsumerService } from 'src/modules/browser/services/stream/consumer.service'; @@ -47,15 +47,15 @@ export class ConsumerController { responses: [ { status: 200, - type: PendingMessageDto, + type: PendingEntryDto, isArray: true, }, ], }) async getPendingMessages( @Param('dbInstance') instanceId: string, - @Body() dto: GetPendingMessagesDto, - ): Promise { + @Body() dto: GetPendingEntriesDto, + ): Promise { return this.service.getPendingMessages({ instanceId }, dto); } } diff --git a/redisinsight/api/src/modules/browser/dto/stream.dto.ts b/redisinsight/api/src/modules/browser/dto/stream.dto.ts index 228e57d6cd..65461f90b5 100644 --- a/redisinsight/api/src/modules/browser/dto/stream.dto.ts +++ b/redisinsight/api/src/modules/browser/dto/stream.dto.ts @@ -325,7 +325,7 @@ export class GetConsumersDto extends KeyDto { groupName: string; } -export class PendingMessageDto { +export class PendingEntryDto { @ApiProperty({ type: String, description: 'Entry ID', @@ -356,7 +356,7 @@ export class PendingMessageDto { delivered: number = 0; } -export class GetPendingMessagesDto extends IntersectionType( +export class GetPendingEntriesDto extends IntersectionType( KeyDto, GetConsumersDto, ) { diff --git a/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts b/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts index 15a7783c55..3aad8be731 100644 --- a/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts +++ b/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts @@ -11,7 +11,7 @@ import { BrowserToolService } from 'src/modules/browser/services/browser-tool/br import ERROR_MESSAGES from 'src/constants/error-messages'; import { ConsumerDto, - GetConsumersDto, GetPendingMessagesDto, PendingMessageDto, + GetConsumersDto, GetPendingEntriesDto, PendingEntryDto, } from 'src/modules/browser/dto/stream.dto'; @Injectable() @@ -67,8 +67,8 @@ export class ConsumerService { */ async getPendingMessages( clientOptions: IFindRedisClientInstanceByOptions, - dto: GetPendingMessagesDto, - ): Promise { + dto: GetPendingEntriesDto, + ): Promise { try { this.logger.log('Getting pending messages list.'); @@ -167,7 +167,7 @@ export class ConsumerService { * ] * @param reply */ - static formatReplyToPendingMessagesDto(reply: Array>): PendingMessageDto[] { + static formatReplyToPendingMessagesDto(reply: Array>): PendingEntryDto[] { return reply.map(ConsumerService.formatArrayToPendingMessageDto); } @@ -175,7 +175,7 @@ export class ConsumerService { * Format single reply entry to DTO * @param entry */ - static formatArrayToPendingMessageDto(entry: Array): PendingMessageDto { + static formatArrayToPendingMessageDto(entry: Array): PendingEntryDto { if (!entry?.length) { return null; } From 448c64f1ab82a6679388e1c773041d1934c1198f Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 24 May 2022 09:30:19 +0300 Subject: [PATCH 7/9] #RI-2945 BE Ack Pending entries --- .../constants/browser-tool-commands.ts | 1 + .../controllers/stream/consumer.controller.ts | 24 +++++++- .../api/src/modules/browser/dto/stream.dto.ts | 26 +++++++- .../services/stream/consumer.service.ts | 61 ++++++++++++++++--- 4 files changed, 101 insertions(+), 11 deletions(-) diff --git a/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts b/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts index 9bfe82ed28..0b57d0c63c 100644 --- a/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts +++ b/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts @@ -83,6 +83,7 @@ export enum BrowserToolStreamCommands { XInfoGroups = 'xinfo groups', XInfoConsumers = 'xinfo consumers', XPending = 'xpending', + XAck = 'xack', XGroupCreate = 'xgroup create', XGroupSetId = 'xgroup setid', XGroupDestroy = 'xgroup destroy', diff --git a/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts b/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts index 0993491832..9c32641793 100644 --- a/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts +++ b/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts @@ -9,6 +9,7 @@ import { import { ApiTags } from '@nestjs/swagger'; import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-operation.decorator'; import { + AckPendingEntriesDto, AckPendingEntriesResponse, ConsumerDto, ConsumerGroupDto, GetConsumersDto, GetPendingEntriesDto, PendingEntryDto, @@ -52,10 +53,29 @@ export class ConsumerController { }, ], }) - async getPendingMessages( + async getPendingEntries( @Param('dbInstance') instanceId: string, @Body() dto: GetPendingEntriesDto, ): Promise { - return this.service.getPendingMessages({ instanceId }, dto); + return this.service.getPendingEntries({ instanceId }, dto); + } + + @Post('/pending-messages/ack') + @ApiRedisInstanceOperation({ + description: 'Get pending messages list', + statusCode: 200, + responses: [ + { + status: 200, + type: PendingEntryDto, + isArray: true, + }, + ], + }) + async ackPendingEntriers( + @Param('dbInstance') instanceId: string, + @Body() dto: AckPendingEntriesDto, + ): Promise { + return this.service.ackPendingEntries({ instanceId }, dto); } } diff --git a/redisinsight/api/src/modules/browser/dto/stream.dto.ts b/redisinsight/api/src/modules/browser/dto/stream.dto.ts index 65461f90b5..357ec8d173 100644 --- a/redisinsight/api/src/modules/browser/dto/stream.dto.ts +++ b/redisinsight/api/src/modules/browser/dto/stream.dto.ts @@ -189,7 +189,7 @@ export class CreateStreamDto extends IntersectionType( export class ConsumerGroupDto { @ApiProperty({ type: String, - description: 'Consumer group name', + description: 'Consumer Group name', example: 'group', }) name: string; @@ -294,7 +294,7 @@ export class ConsumerDto { @ApiProperty({ type: String, description: 'The consumer\'s name', - example: 'consumer-1', + example: 'consumer-2', }) name: string; @@ -396,3 +396,25 @@ export class GetPendingEntriesDto extends IntersectionType( @Min(1) count?: number = 500; } + +export class AckPendingEntriesDto extends GetConsumersDto { + @ApiProperty({ + description: 'Entries IDs', + type: String, + isArray: true, + example: ['1650985323741-0', '1650985323770-0'], + }) + @IsDefined() + @IsArray() + @ArrayNotEmpty() + @Type(() => String) + entries: string[]; +} + +export class AckPendingEntriesResponse { + @ApiProperty({ + description: 'Number of affected entries', + type: Number, + }) + affected: number; +} diff --git a/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts b/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts index 3aad8be731..dcad14a9c0 100644 --- a/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts +++ b/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts @@ -10,6 +10,7 @@ import { import { BrowserToolService } from 'src/modules/browser/services/browser-tool/browser-tool.service'; import ERROR_MESSAGES from 'src/constants/error-messages'; import { + AckPendingEntriesDto, AckPendingEntriesResponse, ConsumerDto, GetConsumersDto, GetPendingEntriesDto, PendingEntryDto, } from 'src/modules/browser/dto/stream.dto'; @@ -61,16 +62,16 @@ export class ConsumerService { } /** - * Get list of pending messages info for particular consumer + * Get list of pending entries info for particular consumer * @param clientOptions * @param dto */ - async getPendingMessages( + async getPendingEntries( clientOptions: IFindRedisClientInstanceByOptions, dto: GetPendingEntriesDto, ): Promise { try { - this.logger.log('Getting pending messages list.'); + this.logger.log('Getting pending entries list.'); const exists = await this.browserTool.execCommand( clientOptions, @@ -82,7 +83,7 @@ export class ConsumerService { return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); } - return ConsumerService.formatReplyToPendingMessagesDto(await this.browserTool.execCommand( + return ConsumerService.formatReplyToPendingEntriesDto(await this.browserTool.execCommand( clientOptions, BrowserToolStreamCommands.XPending, [dto.keyName, dto.groupName, dto.start, dto.end, dto.count, dto.consumerName], @@ -100,6 +101,52 @@ export class ConsumerService { } } + /** + * Get list of pending entries info for particular consumer + * @param clientOptions + * @param dto + */ + async ackPendingEntries( + clientOptions: IFindRedisClientInstanceByOptions, + dto: AckPendingEntriesDto, + ): Promise { + try { + this.logger.log('Acknowledging pending entries.'); + + const exists = await this.browserTool.execCommand( + clientOptions, + BrowserToolKeysCommands.Exists, + [dto.keyName], + ); + + if (!exists) { + return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); + } + + const affected = await this.browserTool.execCommand( + clientOptions, + BrowserToolStreamCommands.XAck, + [dto.keyName, dto.groupName, ...dto.entries], + ); + + this.logger.log('Successfully acknowledged pending entries.'); + + return { + affected, + }; + } catch (error) { + if (error instanceof NotFoundException) { + throw error; + } + + if (error?.message.includes(RedisErrorCodes.WrongType)) { + throw new BadRequestException(error.message); + } + + throw catchAclError(error); + } + } + /** * Converts RESP response from Redis * [ @@ -167,15 +214,15 @@ export class ConsumerService { * ] * @param reply */ - static formatReplyToPendingMessagesDto(reply: Array>): PendingEntryDto[] { - return reply.map(ConsumerService.formatArrayToPendingMessageDto); + static formatReplyToPendingEntriesDto(reply: Array>): PendingEntryDto[] { + return reply.map(ConsumerService.formatArrayToPendingEntryDto); } /** * Format single reply entry to DTO * @param entry */ - static formatArrayToPendingMessageDto(entry: Array): PendingEntryDto { + static formatArrayToPendingEntryDto(entry: Array): PendingEntryDto { if (!entry?.length) { return null; } From c3425ec893353be95bac68ea81c29f85526b6a62 Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 24 May 2022 11:20:57 +0300 Subject: [PATCH 8/9] #RI-2947 BE Claim Pending entries --- .../constants/browser-tool-commands.ts | 1 + .../controllers/stream/consumer.controller.ts | 31 ++++-- .../api/src/modules/browser/dto/stream.dto.ts | 104 +++++++++++++++++- .../services/stream/consumer.service.ts | 69 +++++++++++- 4 files changed, 195 insertions(+), 10 deletions(-) diff --git a/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts b/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts index 0b57d0c63c..e520a1802a 100644 --- a/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts +++ b/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts @@ -84,6 +84,7 @@ export enum BrowserToolStreamCommands { XInfoConsumers = 'xinfo consumers', XPending = 'xpending', XAck = 'xack', + XClaim = 'xclaim', XGroupCreate = 'xgroup create', XGroupSetId = 'xgroup setid', XGroupDestroy = 'xgroup destroy', diff --git a/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts b/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts index 9c32641793..fad006f4ee 100644 --- a/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts +++ b/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts @@ -9,7 +9,7 @@ import { import { ApiTags } from '@nestjs/swagger'; import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-operation.decorator'; import { - AckPendingEntriesDto, AckPendingEntriesResponse, + AckPendingEntriesDto, AckPendingEntriesResponse, ClaimPendingEntriesResponse, ClaimPendingEntryDto, ConsumerDto, ConsumerGroupDto, GetConsumersDto, GetPendingEntriesDto, PendingEntryDto, @@ -24,7 +24,7 @@ export class ConsumerController { @Post('/get') @ApiRedisInstanceOperation({ - description: 'Get group consumers', + description: 'Get consumers list in the group', statusCode: 200, responses: [ { @@ -43,7 +43,7 @@ export class ConsumerController { @Post('/pending-messages/get') @ApiRedisInstanceOperation({ - description: 'Get pending messages list', + description: 'Get pending entries list', statusCode: 200, responses: [ { @@ -62,20 +62,37 @@ export class ConsumerController { @Post('/pending-messages/ack') @ApiRedisInstanceOperation({ - description: 'Get pending messages list', + description: 'Ack pending entries', statusCode: 200, responses: [ { status: 200, - type: PendingEntryDto, - isArray: true, + type: AckPendingEntriesResponse, }, ], }) - async ackPendingEntriers( + async ackPendingEntries( @Param('dbInstance') instanceId: string, @Body() dto: AckPendingEntriesDto, ): Promise { return this.service.ackPendingEntries({ instanceId }, dto); } + + @Post('/pending-messages/claim') + @ApiRedisInstanceOperation({ + description: 'Claim pending entries', + statusCode: 200, + responses: [ + { + status: 200, + type: ClaimPendingEntriesResponse, + }, + ], + }) + async claimPendingEntries( + @Param('dbInstance') instanceId: string, + @Body() dto: ClaimPendingEntryDto, + ): Promise { + return this.service.claimPendingEntries({ instanceId }, dto); + } } diff --git a/redisinsight/api/src/modules/browser/dto/stream.dto.ts b/redisinsight/api/src/modules/browser/dto/stream.dto.ts index 357ec8d173..7b1f4effa8 100644 --- a/redisinsight/api/src/modules/browser/dto/stream.dto.ts +++ b/redisinsight/api/src/modules/browser/dto/stream.dto.ts @@ -5,7 +5,7 @@ import { ArrayNotEmpty, IsArray, IsDefined, - IsEnum, IsInt, IsNotEmpty, IsString, Min, ValidateNested, isString, + IsEnum, IsInt, IsNotEmpty, IsString, Min, ValidateNested, isString, IsOptional, NotEquals, ValidateIf, IsBoolean, } from 'class-validator'; import { KeyDto, KeyWithExpireDto } from 'src/modules/browser/dto/keys.dto'; import { SortOrder } from 'src/constants'; @@ -418,3 +418,105 @@ export class AckPendingEntriesResponse { }) affected: number; } + +export class ClaimPendingEntryDto extends KeyDto { + @ApiProperty({ + type: String, + description: 'Consumer group name', + example: 'group-1', + }) + @IsNotEmpty() + @IsString() + groupName: string; + + @ApiProperty({ + type: String, + description: 'Consumer name', + example: 'consumer-1', + }) + @IsNotEmpty() + @IsString() + consumerName: string; + + @ApiProperty({ + description: 'Claim only if its idle time is greater the minimum idle time ', + type: Number, + minimum: 0, + default: 0, + }) + @IsInt() + @Min(0) + minIdleTime: number = 0; + + @ApiProperty({ + description: 'Entries IDs', + type: String, + isArray: true, + example: ['1650985323741-0', '1650985323770-0'], + }) + @IsDefined() + @IsArray() + @ArrayNotEmpty() + @Type(() => String) + entries: string[]; + + @ApiPropertyOptional({ + description: 'Set the idle time (last time it was delivered) of the message', + type: Number, + minimum: 0, + }) + @NotEquals(null) + @ValidateIf((object, value) => value !== undefined) + @IsInt() + @Min(0) + idle?: number; + + @ApiPropertyOptional({ + description: 'This is the same as IDLE but instead of a relative amount of milliseconds, ' + + 'it sets the idle time to a specific Unix time (in milliseconds)', + type: Number, + }) + @NotEquals(null) + @ValidateIf((object, value) => value !== undefined) + @IsInt() + time?: number; + + @ApiPropertyOptional({ + description: 'Set the retry counter to the specified value. ' + + 'This counter is incremented every time a message is delivered again. ' + + 'Normally XCLAIM does not alter this counter, which is just served to clients when the XPENDING command ' + + 'is called: this way clients can detect anomalies, like messages that are never processed ' + + 'for some reason after a big number of delivery attempts', + type: Number, + minimum: 0, + }) + @NotEquals(null) + @ValidateIf((object, value) => value !== undefined) + @IsInt() + @Min(0) + retryCount?: number; + + @ApiPropertyOptional({ + description: 'Creates the pending message entry in the PEL even if certain specified IDs are not already ' + + 'in the PEL assigned to a different client', + type: Boolean, + }) + @NotEquals(null) + @ValidateIf((object, value) => value !== undefined) + @IsBoolean() + force?: boolean; +} + +export class ClaimPendingEntriesResponse { + @ApiProperty({ + description: 'Entries IDs were affected by claim command', + type: String, + isArray: true, + example: ['1650985323741-0', '1650985323770-0'], + }) + @IsDefined() + @IsArray() + @ArrayNotEmpty() + @Type(() => String) + affected: string[]; +} diff --git a/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts b/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts index dcad14a9c0..2f98d1f159 100644 --- a/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts +++ b/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts @@ -10,7 +10,7 @@ import { import { BrowserToolService } from 'src/modules/browser/services/browser-tool/browser-tool.service'; import ERROR_MESSAGES from 'src/constants/error-messages'; import { - AckPendingEntriesDto, AckPendingEntriesResponse, + AckPendingEntriesDto, AckPendingEntriesResponse, ClaimPendingEntriesResponse, ClaimPendingEntryDto, ConsumerDto, GetConsumersDto, GetPendingEntriesDto, PendingEntryDto, } from 'src/modules/browser/dto/stream.dto'; @@ -102,7 +102,7 @@ export class ConsumerService { } /** - * Get list of pending entries info for particular consumer + * Acknowledge pending entries * @param clientOptions * @param dto */ @@ -147,6 +147,71 @@ export class ConsumerService { } } + /** + * Claim pending entries with additional parameters + * @param clientOptions + * @param dto + */ + async claimPendingEntries( + clientOptions: IFindRedisClientInstanceByOptions, + dto: ClaimPendingEntryDto, + ): Promise { + try { + this.logger.log('Claiming pending entries.'); + + const exists = await this.browserTool.execCommand( + clientOptions, + BrowserToolKeysCommands.Exists, + [dto.keyName], + ); + + if (!exists) { + return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); + } + + const args = [dto.keyName, dto.groupName, dto.consumerName, dto.minIdleTime, ...dto.entries]; + + if (dto.idle !== undefined) { + args.push('idle', dto.idle); + } else if (dto.time !== undefined) { + args.push('time', dto.time); + } + + if (dto.retryCount !== undefined) { + args.push('retrycount', dto.retryCount); + } + + if (dto.force) { + args.push('force'); + } + + // Return just an array of IDs of messages successfully claimed, without returning the actual message. + args.push('justid'); + + const affected = await this.browserTool.execCommand( + clientOptions, + BrowserToolStreamCommands.XClaim, + args, + ); + + this.logger.log('Successfully claimed pending entries.'); + + return { + affected, + }; + } catch (error) { + if (error instanceof NotFoundException) { + throw error; + } + + if (error?.message.includes(RedisErrorCodes.WrongType)) { + throw new BadRequestException(error.message); + } + + throw catchAclError(error); + } + } + /** * Converts RESP response from Redis * [ From f1af09802439bcb3742a8e876c67b89fbc7bbed3 Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 24 May 2022 11:31:45 +0300 Subject: [PATCH 9/9] #RI-2941 BE Delete consumers from the group --- .../constants/browser-tool-commands.ts | 1 + .../controllers/stream/consumer.controller.ts | 16 ++++- .../api/src/modules/browser/dto/stream.dto.ts | 14 +++++ .../services/stream/consumer.service.ts | 59 ++++++++++++++++++- 4 files changed, 86 insertions(+), 4 deletions(-) diff --git a/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts b/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts index e520a1802a..c506704451 100644 --- a/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts +++ b/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts @@ -88,6 +88,7 @@ export enum BrowserToolStreamCommands { XGroupCreate = 'xgroup create', XGroupSetId = 'xgroup setid', XGroupDestroy = 'xgroup destroy', + XGroupDelConsumer = 'xgroup delconsumer', } export enum BrowserToolTSCommands { diff --git a/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts b/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts index fad006f4ee..7f8eb12328 100644 --- a/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts +++ b/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts @@ -1,6 +1,6 @@ import { Body, - Controller, + Controller, Delete, Param, Post, UsePipes, @@ -11,7 +11,7 @@ import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-ope import { AckPendingEntriesDto, AckPendingEntriesResponse, ClaimPendingEntriesResponse, ClaimPendingEntryDto, ConsumerDto, - ConsumerGroupDto, + ConsumerGroupDto, DeleteConsumersDto, GetConsumersDto, GetPendingEntriesDto, PendingEntryDto, } from 'src/modules/browser/dto/stream.dto'; import { ConsumerService } from 'src/modules/browser/services/stream/consumer.service'; @@ -41,6 +41,18 @@ export class ConsumerController { return this.service.getConsumers({ instanceId }, dto); } + @Delete('') + @ApiRedisInstanceOperation({ + description: 'Delete Consumer(s) from the Consumer Group', + statusCode: 200, + }) + async deleteConsumers( + @Param('dbInstance') instanceId: string, + @Body() dto: DeleteConsumersDto, + ): Promise { + return this.service.deleteConsumers({ instanceId }, dto); + } + @Post('/pending-messages/get') @ApiRedisInstanceOperation({ description: 'Get pending entries list', diff --git a/redisinsight/api/src/modules/browser/dto/stream.dto.ts b/redisinsight/api/src/modules/browser/dto/stream.dto.ts index 7b1f4effa8..52d79496ee 100644 --- a/redisinsight/api/src/modules/browser/dto/stream.dto.ts +++ b/redisinsight/api/src/modules/browser/dto/stream.dto.ts @@ -325,6 +325,20 @@ export class GetConsumersDto extends KeyDto { groupName: string; } +export class DeleteConsumersDto extends GetConsumersDto { + @ApiProperty({ + description: 'Names of consumers to delete', + type: String, + isArray: true, + example: ['consumer-1', 'consumer-2'], + }) + @IsDefined() + @IsArray() + @ArrayNotEmpty() + @Type(() => String) + consumerNames: string[]; +} + export class PendingEntryDto { @ApiProperty({ type: String, diff --git a/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts b/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts index 2f98d1f159..aea20b87ae 100644 --- a/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts +++ b/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts @@ -3,15 +3,16 @@ import { } from '@nestjs/common'; import { IFindRedisClientInstanceByOptions } from 'src/modules/core/services/redis/redis.service'; import { RedisErrorCodes } from 'src/constants'; -import { catchAclError, convertStringsArrayToObject } from 'src/utils'; +import {catchAclError, catchTransactionError, convertStringsArrayToObject} from 'src/utils'; import { + BrowserToolCommands, BrowserToolKeysCommands, BrowserToolStreamCommands, } from 'src/modules/browser/constants/browser-tool-commands'; import { BrowserToolService } from 'src/modules/browser/services/browser-tool/browser-tool.service'; import ERROR_MESSAGES from 'src/constants/error-messages'; import { AckPendingEntriesDto, AckPendingEntriesResponse, ClaimPendingEntriesResponse, ClaimPendingEntryDto, - ConsumerDto, + ConsumerDto, DeleteConsumersDto, GetConsumersDto, GetPendingEntriesDto, PendingEntryDto, } from 'src/modules/browser/dto/stream.dto'; @@ -61,6 +62,60 @@ export class ConsumerService { } } + /** + * Get consumers list inside particular group + * @param clientOptions + * @param dto + */ + async deleteConsumers( + clientOptions: IFindRedisClientInstanceByOptions, + dto: DeleteConsumersDto, + ): Promise { + try { + this.logger.log('Deleting consumers from the group.'); + + const exists = await this.browserTool.execCommand( + clientOptions, + BrowserToolKeysCommands.Exists, + [dto.keyName], + ); + + if (!exists) { + return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); + } + + const toolCommands: Array<[ + toolCommand: BrowserToolCommands, + ...args: Array, + ]> = dto.consumerNames.map((consumerName) => ( + [ + BrowserToolStreamCommands.XGroupDelConsumer, + dto.keyName, + dto.groupName, + consumerName, + ] + )); + + const [ + transactionError, + transactionResults, + ] = await this.browserTool.execMulti(clientOptions, toolCommands); + catchTransactionError(transactionError, transactionResults); + + return undefined; + } catch (error) { + if (error instanceof NotFoundException) { + throw error; + } + + if (error?.message.includes(RedisErrorCodes.WrongType)) { + throw new BadRequestException(error.message); + } + + throw catchAclError(error); + } + } + /** * Get list of pending entries info for particular consumer * @param clientOptions