diff --git a/redisinsight/api/src/constants/redis-error-codes.ts b/redisinsight/api/src/constants/redis-error-codes.ts index a7123dd422..baf151099e 100644 --- a/redisinsight/api/src/constants/redis-error-codes.ts +++ b/redisinsight/api/src/constants/redis-error-codes.ts @@ -10,6 +10,7 @@ export enum RedisErrorCodes { ConnectionReset = 'ECONNRESET', Timeout = 'ETIMEDOUT', CommandSyntaxError = 'syntax error', + BusyGroup = 'BUSYGROUP', UnknownCommand = 'unknown command', } diff --git a/redisinsight/api/src/modules/browser/browser.module.ts b/redisinsight/api/src/modules/browser/browser.module.ts index 34a1d37850..d9e66d305e 100644 --- a/redisinsight/api/src/modules/browser/browser.module.ts +++ b/redisinsight/api/src/modules/browser/browser.module.ts @@ -4,6 +4,10 @@ import { SharedModule } from 'src/modules/shared/shared.module'; import { RedisConnectionMiddleware } from 'src/middleware/redis-connection.middleware'; import { StreamController } from 'src/modules/browser/controllers/stream/stream.controller'; import { StreamService } from 'src/modules/browser/services/stream/stream.service'; +import { ConsumerGroupController } from 'src/modules/browser/controllers/stream/consumer-group.controller'; +import { ConsumerGroupService } from 'src/modules/browser/services/stream/consumer-group.service'; +import { ConsumerController } from 'src/modules/browser/controllers/stream/consumer.controller'; +import { ConsumerService } from 'src/modules/browser/services/stream/consumer.service'; import { HashController } from './controllers/hash/hash.controller'; import { KeysController } from './controllers/keys/keys.controller'; import { KeysBusinessService } from './services/keys-business/keys-business.service'; @@ -32,6 +36,8 @@ import { BrowserToolClusterService } from './services/browser-tool-cluster/brows RejsonRlController, HashController, StreamController, + ConsumerGroupController, + ConsumerController, ], providers: [ KeysBusinessService, @@ -42,6 +48,8 @@ import { BrowserToolClusterService } from './services/browser-tool-cluster/brows RejsonRlBusinessService, HashBusinessService, StreamService, + ConsumerGroupService, + ConsumerService, BrowserToolService, BrowserToolClusterService, ], @@ -58,6 +66,9 @@ export class BrowserModule implements NestModule { RouterModule.resolvePath(SetController), RouterModule.resolvePath(ZSetController), RouterModule.resolvePath(RejsonRlController), + RouterModule.resolvePath(StreamController), + RouterModule.resolvePath(ConsumerGroupController), + RouterModule.resolvePath(ConsumerController), ); } } diff --git a/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts b/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts index 8b8a1436c2..c506704451 100644 --- a/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts +++ b/redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts @@ -80,6 +80,15 @@ export enum BrowserToolStreamCommands { XRevRange = 'xrevrange', XAdd = 'xadd', XDel = 'xdel', + XInfoGroups = 'xinfo groups', + XInfoConsumers = 'xinfo consumers', + XPending = 'xpending', + XAck = 'xack', + XClaim = 'xclaim', + XGroupCreate = 'xgroup create', + XGroupSetId = 'xgroup setid', + XGroupDestroy = 'xgroup destroy', + XGroupDelConsumer = 'xgroup delconsumer', } export enum BrowserToolTSCommands { diff --git a/redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts b/redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts new file mode 100644 index 0000000000..1a97b260e5 --- /dev/null +++ b/redisinsight/api/src/modules/browser/controllers/stream/consumer-group.controller.ts @@ -0,0 +1,89 @@ +import { + Body, + Controller, Delete, + Param, Patch, + Post, + UsePipes, + ValidationPipe, +} from '@nestjs/common'; +import { ApiTags } from '@nestjs/swagger'; +import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-operation.decorator'; +import { + ConsumerGroupDto, + CreateConsumerGroupsDto, + DeleteConsumerGroupsDto, + DeleteConsumerGroupsResponse, + UpdateConsumerGroupDto, +} from 'src/modules/browser/dto/stream.dto'; +import { ConsumerGroupService } from 'src/modules/browser/services/stream/consumer-group.service'; +import { KeyDto } from 'src/modules/browser/dto'; + +@ApiTags('Streams') +@Controller('streams/consumer-groups') +@UsePipes(new ValidationPipe({ transform: true })) +export class ConsumerGroupController { + constructor(private service: ConsumerGroupService) {} + + @Post('/get') + @ApiRedisInstanceOperation({ + description: 'Get consumer groups list', + statusCode: 200, + responses: [ + { + status: 200, + description: 'Returns stream consumer groups.', + type: ConsumerGroupDto, + isArray: true, + }, + ], + }) + async getGroups( + @Param('dbInstance') instanceId: string, + @Body() dto: KeyDto, + ): Promise { + return this.service.getGroups({ instanceId }, dto); + } + + @Post('') + @ApiRedisInstanceOperation({ + description: 'Create stream consumer group', + statusCode: 201, + }) + async createGroups( + @Param('dbInstance') instanceId: string, + @Body() dto: CreateConsumerGroupsDto, + ): Promise { + return this.service.createGroups({ instanceId }, dto); + } + + @Patch('') + @ApiRedisInstanceOperation({ + description: 'Modify last delivered ID of the Consumer Group', + statusCode: 200, + }) + async updateGroup( + @Param('dbInstance') instanceId: string, + @Body() dto: UpdateConsumerGroupDto, + ): Promise { + return this.service.updateGroup({ instanceId }, dto); + } + + @Delete('') + @ApiRedisInstanceOperation({ + description: 'Delete Consumer Group', + statusCode: 200, + responses: [ + { + status: 200, + description: 'Returns number of affected consumer groups.', + type: DeleteConsumerGroupsResponse, + }, + ], + }) + async deleteGroup( + @Param('dbInstance') instanceId: string, + @Body() dto: DeleteConsumerGroupsDto, + ): Promise { + return this.service.deleteGroup({ instanceId }, dto); + } +} diff --git a/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts b/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts new file mode 100644 index 0000000000..7f8eb12328 --- /dev/null +++ b/redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts @@ -0,0 +1,110 @@ +import { + Body, + Controller, Delete, + Param, + Post, + UsePipes, + ValidationPipe, +} from '@nestjs/common'; +import { ApiTags } from '@nestjs/swagger'; +import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-operation.decorator'; +import { + AckPendingEntriesDto, AckPendingEntriesResponse, ClaimPendingEntriesResponse, ClaimPendingEntryDto, + ConsumerDto, + ConsumerGroupDto, DeleteConsumersDto, + GetConsumersDto, GetPendingEntriesDto, PendingEntryDto, +} from 'src/modules/browser/dto/stream.dto'; +import { ConsumerService } from 'src/modules/browser/services/stream/consumer.service'; + +@ApiTags('Streams') +@Controller('streams/consumer-groups/consumers') +@UsePipes(new ValidationPipe({ transform: true })) +export class ConsumerController { + constructor(private service: ConsumerService) {} + + @Post('/get') + @ApiRedisInstanceOperation({ + description: 'Get consumers list in the group', + statusCode: 200, + responses: [ + { + status: 200, + type: ConsumerGroupDto, + isArray: true, + }, + ], + }) + async getConsumers( + @Param('dbInstance') instanceId: string, + @Body() dto: GetConsumersDto, + ): Promise { + return this.service.getConsumers({ instanceId }, dto); + } + + @Delete('') + @ApiRedisInstanceOperation({ + description: 'Delete Consumer(s) from the Consumer Group', + statusCode: 200, + }) + async deleteConsumers( + @Param('dbInstance') instanceId: string, + @Body() dto: DeleteConsumersDto, + ): Promise { + return this.service.deleteConsumers({ instanceId }, dto); + } + + @Post('/pending-messages/get') + @ApiRedisInstanceOperation({ + description: 'Get pending entries list', + statusCode: 200, + responses: [ + { + status: 200, + type: PendingEntryDto, + isArray: true, + }, + ], + }) + async getPendingEntries( + @Param('dbInstance') instanceId: string, + @Body() dto: GetPendingEntriesDto, + ): Promise { + return this.service.getPendingEntries({ instanceId }, dto); + } + + @Post('/pending-messages/ack') + @ApiRedisInstanceOperation({ + description: 'Ack pending entries', + statusCode: 200, + responses: [ + { + status: 200, + type: AckPendingEntriesResponse, + }, + ], + }) + async ackPendingEntries( + @Param('dbInstance') instanceId: string, + @Body() dto: AckPendingEntriesDto, + ): Promise { + return this.service.ackPendingEntries({ instanceId }, dto); + } + + @Post('/pending-messages/claim') + @ApiRedisInstanceOperation({ + description: 'Claim pending entries', + statusCode: 200, + responses: [ + { + status: 200, + type: ClaimPendingEntriesResponse, + }, + ], + }) + async claimPendingEntries( + @Param('dbInstance') instanceId: string, + @Body() dto: ClaimPendingEntryDto, + ): Promise { + return this.service.claimPendingEntries({ instanceId }, dto); + } +} diff --git a/redisinsight/api/src/modules/browser/dto/stream.dto.ts b/redisinsight/api/src/modules/browser/dto/stream.dto.ts index def52f4187..52d79496ee 100644 --- a/redisinsight/api/src/modules/browser/dto/stream.dto.ts +++ b/redisinsight/api/src/modules/browser/dto/stream.dto.ts @@ -1,9 +1,11 @@ -import { ApiProperty, ApiPropertyOptional, IntersectionType } from '@nestjs/swagger'; +import { + ApiProperty, ApiPropertyOptional, IntersectionType, +} from '@nestjs/swagger'; import { ArrayNotEmpty, IsArray, IsDefined, - IsEnum, IsInt, IsNotEmpty, IsString, Min, ValidateNested, isString, + IsEnum, IsInt, IsNotEmpty, IsString, Min, ValidateNested, isString, IsOptional, NotEquals, ValidateIf, IsBoolean, } from 'class-validator'; import { KeyDto, KeyWithExpireDto } from 'src/modules/browser/dto/keys.dto'; import { SortOrder } from 'src/constants'; @@ -183,3 +185,352 @@ export class CreateStreamDto extends IntersectionType( AddStreamEntriesDto, KeyWithExpireDto, ) {} + +export class ConsumerGroupDto { + @ApiProperty({ + type: String, + description: 'Consumer Group name', + example: 'group', + }) + name: string; + + @ApiProperty({ + type: Number, + description: 'Number of consumers', + example: 2, + }) + consumers: number = 0; + + @ApiProperty({ + type: Number, + description: 'Number of pending messages', + example: 2, + }) + pending: number = 0; + + @ApiProperty({ + type: String, + description: 'Smallest Id of the message that is pending in the group', + example: '1657892649-0', + }) + smallestPendingId: string; + + @ApiProperty({ + type: String, + description: 'Greatest Id of the message that is pending in the group', + example: '1657892680-0', + }) + greatestPendingId: string; + + @ApiProperty({ + type: String, + description: 'Id of last delivered message', + example: '1657892648-0', + }) + lastDeliveredId: string; +} + +export class CreateConsumerGroupDto { + @ApiProperty({ + type: String, + description: 'Consumer group name', + example: 'group', + }) + @IsNotEmpty() + @IsString() + name: string; + + @ApiProperty({ + type: String, + description: 'Id of last delivered message', + example: '1657892648-0', + }) + @IsNotEmpty() + @IsString() + lastDeliveredId: string; +} + +export class CreateConsumerGroupsDto extends KeyDto { + @ApiProperty({ + type: () => CreateConsumerGroupDto, + isArray: true, + description: 'List of consumer groups to create', + }) + @ValidateNested() + @IsArray() + @ArrayNotEmpty() + @Type(() => CreateConsumerGroupDto) + consumerGroups: CreateConsumerGroupDto[]; +} + +export class UpdateConsumerGroupDto extends IntersectionType( + KeyDto, + CreateConsumerGroupDto, +) {} + +export class DeleteConsumerGroupsDto extends KeyDto { + @ApiProperty({ + description: 'Consumer group names', + type: String, + isArray: true, + example: ['Group-1', 'Group-1'], + }) + @IsDefined() + @IsArray() + @ArrayNotEmpty() + @Type(() => String) + consumerGroups: string[]; +} + +export class DeleteConsumerGroupsResponse { + @ApiProperty({ + description: 'Number of deleted consumer groups', + type: Number, + }) + affected: number; +} + +export class ConsumerDto { + @ApiProperty({ + type: String, + description: 'The consumer\'s name', + example: 'consumer-2', + }) + name: string; + + @ApiProperty({ + type: Number, + description: 'The number of pending messages for the client, ' + + 'which are messages that were delivered but are yet to be acknowledged', + example: 2, + }) + pending: number = 0; + + @ApiProperty({ + type: Number, + description: 'The number of milliseconds that have passed since the consumer last interacted with the server', + example: 22442, + }) + idle: number = 0; +} + +export class GetConsumersDto extends KeyDto { + @ApiProperty({ + type: String, + description: 'Consumer group name', + example: 'group-1', + }) + @IsNotEmpty() + @IsString() + groupName: string; +} + +export class DeleteConsumersDto extends GetConsumersDto { + @ApiProperty({ + description: 'Names of consumers to delete', + type: String, + isArray: true, + example: ['consumer-1', 'consumer-2'], + }) + @IsDefined() + @IsArray() + @ArrayNotEmpty() + @Type(() => String) + consumerNames: string[]; +} + +export class PendingEntryDto { + @ApiProperty({ + type: String, + description: 'Entry ID', + example: '*', + }) + id: string; + + @ApiProperty({ + type: String, + description: 'Consumer name', + example: 'consumer-1', + }) + consumerName: string; + + @ApiProperty({ + type: Number, + description: 'The number of milliseconds that elapsed since the last time ' + + 'this message was delivered to this consumer', + example: 22442, + }) + idle: number = 0; + + @ApiProperty({ + type: Number, + description: 'The number of times this message was delivered', + example: 2, + }) + delivered: number = 0; +} + +export class GetPendingEntriesDto extends IntersectionType( + KeyDto, + GetConsumersDto, +) { + @ApiProperty({ + type: String, + description: 'Consumer name', + example: 'consumer-1', + }) + @IsNotEmpty() + @IsString() + consumerName: string; + + @ApiPropertyOptional({ + description: 'Specifying the start id', + type: String, + default: '-', + }) + @IsString() + start?: string = '-'; + + @ApiPropertyOptional({ + description: 'Specifying the end id', + type: String, + default: '+', + }) + @IsString() + end?: string = '+'; + + @ApiPropertyOptional({ + description: + 'Specifying the number of pending messages to return.', + type: Number, + minimum: 1, + default: 500, + }) + @IsInt() + @Min(1) + count?: number = 500; +} + +export class AckPendingEntriesDto extends GetConsumersDto { + @ApiProperty({ + description: 'Entries IDs', + type: String, + isArray: true, + example: ['1650985323741-0', '1650985323770-0'], + }) + @IsDefined() + @IsArray() + @ArrayNotEmpty() + @Type(() => String) + entries: string[]; +} + +export class AckPendingEntriesResponse { + @ApiProperty({ + description: 'Number of affected entries', + type: Number, + }) + affected: number; +} + +export class ClaimPendingEntryDto extends KeyDto { + @ApiProperty({ + type: String, + description: 'Consumer group name', + example: 'group-1', + }) + @IsNotEmpty() + @IsString() + groupName: string; + + @ApiProperty({ + type: String, + description: 'Consumer name', + example: 'consumer-1', + }) + @IsNotEmpty() + @IsString() + consumerName: string; + + @ApiProperty({ + description: 'Claim only if its idle time is greater the minimum idle time ', + type: Number, + minimum: 0, + default: 0, + }) + @IsInt() + @Min(0) + minIdleTime: number = 0; + + @ApiProperty({ + description: 'Entries IDs', + type: String, + isArray: true, + example: ['1650985323741-0', '1650985323770-0'], + }) + @IsDefined() + @IsArray() + @ArrayNotEmpty() + @Type(() => String) + entries: string[]; + + @ApiPropertyOptional({ + description: 'Set the idle time (last time it was delivered) of the message', + type: Number, + minimum: 0, + }) + @NotEquals(null) + @ValidateIf((object, value) => value !== undefined) + @IsInt() + @Min(0) + idle?: number; + + @ApiPropertyOptional({ + description: 'This is the same as IDLE but instead of a relative amount of milliseconds, ' + + 'it sets the idle time to a specific Unix time (in milliseconds)', + type: Number, + }) + @NotEquals(null) + @ValidateIf((object, value) => value !== undefined) + @IsInt() + time?: number; + + @ApiPropertyOptional({ + description: 'Set the retry counter to the specified value. ' + + 'This counter is incremented every time a message is delivered again. ' + + 'Normally XCLAIM does not alter this counter, which is just served to clients when the XPENDING command ' + + 'is called: this way clients can detect anomalies, like messages that are never processed ' + + 'for some reason after a big number of delivery attempts', + type: Number, + minimum: 0, + }) + @NotEquals(null) + @ValidateIf((object, value) => value !== undefined) + @IsInt() + @Min(0) + retryCount?: number; + + @ApiPropertyOptional({ + description: 'Creates the pending message entry in the PEL even if certain specified IDs are not already ' + + 'in the PEL assigned to a different client', + type: Boolean, + }) + @NotEquals(null) + @ValidateIf((object, value) => value !== undefined) + @IsBoolean() + force?: boolean; +} + +export class ClaimPendingEntriesResponse { + @ApiProperty({ + description: 'Entries IDs were affected by claim command', + type: String, + isArray: true, + example: ['1650985323741-0', '1650985323770-0'], + }) + @IsDefined() + @IsArray() + @ArrayNotEmpty() + @Type(() => String) + affected: string[]; +} diff --git a/redisinsight/api/src/modules/browser/services/stream/consumer-group.service.ts b/redisinsight/api/src/modules/browser/services/stream/consumer-group.service.ts new file mode 100644 index 0000000000..d9fa16881d --- /dev/null +++ b/redisinsight/api/src/modules/browser/services/stream/consumer-group.service.ts @@ -0,0 +1,311 @@ +import { + BadRequestException, ConflictException, Injectable, Logger, NotFoundException, +} from '@nestjs/common'; +import { IFindRedisClientInstanceByOptions } from 'src/modules/core/services/redis/redis.service'; +import { RedisErrorCodes } from 'src/constants'; +import { catchAclError, catchTransactionError, convertStringsArrayToObject } from 'src/utils'; +import { + BrowserToolCommands, + BrowserToolKeysCommands, BrowserToolStreamCommands, +} from 'src/modules/browser/constants/browser-tool-commands'; +import { BrowserToolService } from 'src/modules/browser/services/browser-tool/browser-tool.service'; +import { KeyDto } from 'src/modules/browser/dto'; +import ERROR_MESSAGES from 'src/constants/error-messages'; +import { + ConsumerGroupDto, + CreateConsumerGroupsDto, + DeleteConsumerGroupsDto, DeleteConsumerGroupsResponse, + UpdateConsumerGroupDto +} from 'src/modules/browser/dto/stream.dto'; + +@Injectable() +export class ConsumerGroupService { + private logger = new Logger('ConsumerGroupService'); + + constructor(private browserTool: BrowserToolService) {} + + /** + * Get consumer groups list for particular stream + * In addition fetch pending messages info for each group + * !May be slow on huge streams as 'XPENDING' command tagged with as @slow + * @param clientOptions + * @param dto + */ + async getGroups( + clientOptions: IFindRedisClientInstanceByOptions, + dto: KeyDto, + ): Promise { + try { + this.logger.log('Getting consumer groups list.'); + + const exists = await this.browserTool.execCommand( + clientOptions, + BrowserToolKeysCommands.Exists, + [dto.keyName], + ); + + if (!exists) { + return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); + } + + const groups = ConsumerGroupService.formatReplyToDto(await this.browserTool.execCommand( + clientOptions, + BrowserToolStreamCommands.XInfoGroups, + [dto.keyName], + )); + + return await Promise.all(groups.map((group) => this.getGroupInfo( + clientOptions, + dto, + group, + ))); + } catch (error) { + if (error instanceof NotFoundException) { + throw error; + } + + if (error?.message.includes(RedisErrorCodes.WrongType)) { + throw new BadRequestException(error.message); + } + + throw catchAclError(error); + } + } + + /** + * Get consumer group pending info using 'XPENDING' command + * @param clientOptions + * @param dto + * @param group + */ + async getGroupInfo( + clientOptions: IFindRedisClientInstanceByOptions, + dto: KeyDto, + group: ConsumerGroupDto, + ): Promise { + const info = await this.browserTool.execCommand( + clientOptions, + BrowserToolStreamCommands.XPending, + [dto.keyName, group.name], + ); + + return { + ...group, + smallestPendingId: info?.[1] || null, + greatestPendingId: info?.[2] || null, + }; + } + + /** + * Create consumer group(s) + * @param clientOptions + * @param dto + */ + async createGroups( + clientOptions: IFindRedisClientInstanceByOptions, + dto: CreateConsumerGroupsDto, + ): Promise { + try { + this.logger.log('Creating consumer groups.'); + const { keyName, consumerGroups } = dto; + + const exists = await this.browserTool.execCommand( + clientOptions, + BrowserToolKeysCommands.Exists, + [keyName], + ); + + if (!exists) { + return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); + } + + const toolCommands: Array<[ + toolCommand: BrowserToolCommands, + ...args: Array, + ]> = consumerGroups.map((group) => ( + [ + BrowserToolStreamCommands.XGroupCreate, + keyName, + group.name, + group.lastDeliveredId, + ] + )); + + const [ + transactionError, + transactionResults, + ] = await this.browserTool.execMulti(clientOptions, toolCommands); + catchTransactionError(transactionError, transactionResults); + + this.logger.log('Stream consumer group(s) created.'); + + return undefined; + } catch (error) { + if (error instanceof NotFoundException) { + throw error; + } + + if (error?.message.includes(RedisErrorCodes.WrongType)) { + throw new BadRequestException(error.message); + } + + if (error?.message.includes(RedisErrorCodes.BusyGroup)) { + throw new ConflictException(error.message); + } + + throw catchAclError(error); + } + } + + /** + * Updates last delivered id for Consumer Group + * @param clientOptions + * @param dto + */ + async updateGroup( + clientOptions: IFindRedisClientInstanceByOptions, + dto: UpdateConsumerGroupDto, + ): Promise { + try { + this.logger.log('Updating consumer group.'); + + const exists = await this.browserTool.execCommand( + clientOptions, + BrowserToolKeysCommands.Exists, + [dto.keyName], + ); + + if (!exists) { + return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); + } + + await this.browserTool.execCommand( + clientOptions, + BrowserToolStreamCommands.XGroupSetId, + [dto.keyName, dto.name, dto.lastDeliveredId], + ); + + this.logger.log('Consumer group was updated.'); + + return undefined; + } catch (error) { + if (error instanceof NotFoundException) { + throw error; + } + + if (error?.message.includes(RedisErrorCodes.WrongType)) { + throw new BadRequestException(error.message); + } + + throw catchAclError(error); + } + } + + /** + * Delete consumer groups in batch + * @param clientOptions + * @param dto + */ + async deleteGroup( + clientOptions: IFindRedisClientInstanceByOptions, + dto: DeleteConsumerGroupsDto, + ): Promise { + try { + this.logger.log('Deleting consumer group.'); + + const exists = await this.browserTool.execCommand( + clientOptions, + BrowserToolKeysCommands.Exists, + [dto.keyName], + ); + + if (!exists) { + return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); + } + + const toolCommands: Array<[ + toolCommand: BrowserToolCommands, + ...args: Array, + ]> = dto.consumerGroups.map((group) => ( + [ + BrowserToolStreamCommands.XGroupDestroy, + dto.keyName, + group, + ] + )); + + const [ + transactionError, + transactionResults, + ] = await this.browserTool.execMulti(clientOptions, toolCommands); + catchTransactionError(transactionError, transactionResults); + + this.logger.log('Consumer group(s) successfully deleted.'); + + return { + affected: toolCommands.length, + }; + } catch (error) { + if (error instanceof NotFoundException) { + throw error; + } + + if (error?.message.includes(RedisErrorCodes.WrongType)) { + throw new BadRequestException(error.message); + } + + throw catchAclError(error); + } + } + + /** + * Converts RESP response from Redis + * [ + * ['name', 'g1', 'consumers', 0, 'pending', 0, 'last-delivered-id', '1653034260278-0'], + * ['name', 'g2', 'consumers', 0, 'pending', 0, 'last-delivered-id', '1653034260278-0'], + * ... + * ] + * + * to DTO + * + * [ + * { + * name: 'g1', + * consumers: 0, + * pending: 0, + * lastDeliveredId: '1653034260278-0' + * }, + * { + * name: 'g2', + * consumers: 0, + * pending: 0, + * lastDeliveredId: '1653034260278-0' + * }, + * ... + * ] + * @param reply + */ + static formatReplyToDto(reply: Array>): ConsumerGroupDto[] { + return reply.map(ConsumerGroupService.formatArrayToDto); + } + + /** + * Format single reply entry to DTO + * @param entry + */ + static formatArrayToDto(entry: Array): ConsumerGroupDto { + if (!entry?.length) { + return null; + } + const entryObj = convertStringsArrayToObject(entry as string[]); + + return { + name: entryObj['name'], + consumers: entryObj['consumers'], + pending: entryObj['pending'], + lastDeliveredId: entryObj['last-delivered-id'], + smallestPendingId: null, + greatestPendingId: null, + }; + } +} diff --git a/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts b/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts new file mode 100644 index 0000000000..aea20b87ae --- /dev/null +++ b/redisinsight/api/src/modules/browser/services/stream/consumer.service.ts @@ -0,0 +1,357 @@ +import { + BadRequestException, Injectable, Logger, NotFoundException, +} from '@nestjs/common'; +import { IFindRedisClientInstanceByOptions } from 'src/modules/core/services/redis/redis.service'; +import { RedisErrorCodes } from 'src/constants'; +import {catchAclError, catchTransactionError, convertStringsArrayToObject} from 'src/utils'; +import { + BrowserToolCommands, + BrowserToolKeysCommands, BrowserToolStreamCommands, +} from 'src/modules/browser/constants/browser-tool-commands'; +import { BrowserToolService } from 'src/modules/browser/services/browser-tool/browser-tool.service'; +import ERROR_MESSAGES from 'src/constants/error-messages'; +import { + AckPendingEntriesDto, AckPendingEntriesResponse, ClaimPendingEntriesResponse, ClaimPendingEntryDto, + ConsumerDto, DeleteConsumersDto, + GetConsumersDto, GetPendingEntriesDto, PendingEntryDto, +} from 'src/modules/browser/dto/stream.dto'; + +@Injectable() +export class ConsumerService { + private logger = new Logger('ConsumerService'); + + constructor(private browserTool: BrowserToolService) {} + + /** + * Get consumers list inside particular group + * @param clientOptions + * @param dto + */ + async getConsumers( + clientOptions: IFindRedisClientInstanceByOptions, + dto: GetConsumersDto, + ): Promise { + try { + this.logger.log('Getting consumers list.'); + + const exists = await this.browserTool.execCommand( + clientOptions, + BrowserToolKeysCommands.Exists, + [dto.keyName], + ); + + if (!exists) { + return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); + } + + return ConsumerService.formatReplyToDto(await this.browserTool.execCommand( + clientOptions, + BrowserToolStreamCommands.XInfoConsumers, + [dto.keyName, dto.groupName], + )); + } catch (error) { + if (error instanceof NotFoundException) { + throw error; + } + + if (error?.message.includes(RedisErrorCodes.WrongType)) { + throw new BadRequestException(error.message); + } + + throw catchAclError(error); + } + } + + /** + * Get consumers list inside particular group + * @param clientOptions + * @param dto + */ + async deleteConsumers( + clientOptions: IFindRedisClientInstanceByOptions, + dto: DeleteConsumersDto, + ): Promise { + try { + this.logger.log('Deleting consumers from the group.'); + + const exists = await this.browserTool.execCommand( + clientOptions, + BrowserToolKeysCommands.Exists, + [dto.keyName], + ); + + if (!exists) { + return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); + } + + const toolCommands: Array<[ + toolCommand: BrowserToolCommands, + ...args: Array, + ]> = dto.consumerNames.map((consumerName) => ( + [ + BrowserToolStreamCommands.XGroupDelConsumer, + dto.keyName, + dto.groupName, + consumerName, + ] + )); + + const [ + transactionError, + transactionResults, + ] = await this.browserTool.execMulti(clientOptions, toolCommands); + catchTransactionError(transactionError, transactionResults); + + return undefined; + } catch (error) { + if (error instanceof NotFoundException) { + throw error; + } + + if (error?.message.includes(RedisErrorCodes.WrongType)) { + throw new BadRequestException(error.message); + } + + throw catchAclError(error); + } + } + + /** + * Get list of pending entries info for particular consumer + * @param clientOptions + * @param dto + */ + async getPendingEntries( + clientOptions: IFindRedisClientInstanceByOptions, + dto: GetPendingEntriesDto, + ): Promise { + try { + this.logger.log('Getting pending entries list.'); + + const exists = await this.browserTool.execCommand( + clientOptions, + BrowserToolKeysCommands.Exists, + [dto.keyName], + ); + + if (!exists) { + return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); + } + + return ConsumerService.formatReplyToPendingEntriesDto(await this.browserTool.execCommand( + clientOptions, + BrowserToolStreamCommands.XPending, + [dto.keyName, dto.groupName, dto.start, dto.end, dto.count, dto.consumerName], + )); + } catch (error) { + if (error instanceof NotFoundException) { + throw error; + } + + if (error?.message.includes(RedisErrorCodes.WrongType)) { + throw new BadRequestException(error.message); + } + + throw catchAclError(error); + } + } + + /** + * Acknowledge pending entries + * @param clientOptions + * @param dto + */ + async ackPendingEntries( + clientOptions: IFindRedisClientInstanceByOptions, + dto: AckPendingEntriesDto, + ): Promise { + try { + this.logger.log('Acknowledging pending entries.'); + + const exists = await this.browserTool.execCommand( + clientOptions, + BrowserToolKeysCommands.Exists, + [dto.keyName], + ); + + if (!exists) { + return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); + } + + const affected = await this.browserTool.execCommand( + clientOptions, + BrowserToolStreamCommands.XAck, + [dto.keyName, dto.groupName, ...dto.entries], + ); + + this.logger.log('Successfully acknowledged pending entries.'); + + return { + affected, + }; + } catch (error) { + if (error instanceof NotFoundException) { + throw error; + } + + if (error?.message.includes(RedisErrorCodes.WrongType)) { + throw new BadRequestException(error.message); + } + + throw catchAclError(error); + } + } + + /** + * Claim pending entries with additional parameters + * @param clientOptions + * @param dto + */ + async claimPendingEntries( + clientOptions: IFindRedisClientInstanceByOptions, + dto: ClaimPendingEntryDto, + ): Promise { + try { + this.logger.log('Claiming pending entries.'); + + const exists = await this.browserTool.execCommand( + clientOptions, + BrowserToolKeysCommands.Exists, + [dto.keyName], + ); + + if (!exists) { + return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST)); + } + + const args = [dto.keyName, dto.groupName, dto.consumerName, dto.minIdleTime, ...dto.entries]; + + if (dto.idle !== undefined) { + args.push('idle', dto.idle); + } else if (dto.time !== undefined) { + args.push('time', dto.time); + } + + if (dto.retryCount !== undefined) { + args.push('retrycount', dto.retryCount); + } + + if (dto.force) { + args.push('force'); + } + + // Return just an array of IDs of messages successfully claimed, without returning the actual message. + args.push('justid'); + + const affected = await this.browserTool.execCommand( + clientOptions, + BrowserToolStreamCommands.XClaim, + args, + ); + + this.logger.log('Successfully claimed pending entries.'); + + return { + affected, + }; + } catch (error) { + if (error instanceof NotFoundException) { + throw error; + } + + if (error?.message.includes(RedisErrorCodes.WrongType)) { + throw new BadRequestException(error.message); + } + + throw catchAclError(error); + } + } + + /** + * Converts RESP response from Redis + * [ + * ['name', 'consumer-1', 'pending', 0, 'idle', 258741], + * ['name', 'consumer-2', 'pending', 0, 'idle', 258741], + * ... + * ] + * + * to DTO + * + * [ + * { + * name: 'consumer-1', + * pending: 0, + * idle: 258741, + * }, + * { + * name: 'consumer-2', + * pending: 0, + * idle: 258741, + * }, + * ... + * ] + * @param reply + */ + static formatReplyToDto(reply: Array>): ConsumerDto[] { + return reply.map(ConsumerService.formatArrayToDto); + } + + /** + * Format single reply entry to DTO + * @param entry + */ + static formatArrayToDto(entry: Array): ConsumerDto { + if (!entry?.length) { + return null; + } + + const entryObj = convertStringsArrayToObject(entry as string[]); + + return { + name: entryObj['name'], + pending: entryObj['pending'], + idle: entryObj['idle'], + }; + } + + /** + * Converts RESP response from Redis + * [ + * ['1567352639-0', 'consumer-1', 258741, 2], + * ... + * ] + * + * to DTO + * + * [ + * { + * id: '1567352639-0', + * name: 'consumer-1', + * idle: 258741, + * delivered: 2, + * }, + * ... + * ] + * @param reply + */ + static formatReplyToPendingEntriesDto(reply: Array>): PendingEntryDto[] { + return reply.map(ConsumerService.formatArrayToPendingEntryDto); + } + + /** + * Format single reply entry to DTO + * @param entry + */ + static formatArrayToPendingEntryDto(entry: Array): PendingEntryDto { + if (!entry?.length) { + return null; + } + + return { + id: `${entry[0]}`, + consumerName: `${entry[1]}`, + idle: +entry[2], + delivered: +entry[3], + }; + } +}