Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions redisinsight/api/src/constants/redis-error-codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export enum RedisErrorCodes {
ConnectionReset = 'ECONNRESET',
Timeout = 'ETIMEDOUT',
CommandSyntaxError = 'syntax error',
BusyGroup = 'BUSYGROUP',
UnknownCommand = 'unknown command',
}

Expand Down
11 changes: 11 additions & 0 deletions redisinsight/api/src/modules/browser/browser.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import { SharedModule } from 'src/modules/shared/shared.module';
import { RedisConnectionMiddleware } from 'src/middleware/redis-connection.middleware';
import { StreamController } from 'src/modules/browser/controllers/stream/stream.controller';
import { StreamService } from 'src/modules/browser/services/stream/stream.service';
import { ConsumerGroupController } from 'src/modules/browser/controllers/stream/consumer-group.controller';
import { ConsumerGroupService } from 'src/modules/browser/services/stream/consumer-group.service';
import { ConsumerController } from 'src/modules/browser/controllers/stream/consumer.controller';
import { ConsumerService } from 'src/modules/browser/services/stream/consumer.service';
import { HashController } from './controllers/hash/hash.controller';
import { KeysController } from './controllers/keys/keys.controller';
import { KeysBusinessService } from './services/keys-business/keys-business.service';
Expand Down Expand Up @@ -32,6 +36,8 @@ import { BrowserToolClusterService } from './services/browser-tool-cluster/brows
RejsonRlController,
HashController,
StreamController,
ConsumerGroupController,
ConsumerController,
],
providers: [
KeysBusinessService,
Expand All @@ -42,6 +48,8 @@ import { BrowserToolClusterService } from './services/browser-tool-cluster/brows
RejsonRlBusinessService,
HashBusinessService,
StreamService,
ConsumerGroupService,
ConsumerService,
BrowserToolService,
BrowserToolClusterService,
],
Expand All @@ -58,6 +66,9 @@ export class BrowserModule implements NestModule {
RouterModule.resolvePath(SetController),
RouterModule.resolvePath(ZSetController),
RouterModule.resolvePath(RejsonRlController),
RouterModule.resolvePath(StreamController),
RouterModule.resolvePath(ConsumerGroupController),
RouterModule.resolvePath(ConsumerController),
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ export enum BrowserToolStreamCommands {
XRevRange = 'xrevrange',
XAdd = 'xadd',
XDel = 'xdel',
XInfoGroups = 'xinfo groups',
XInfoConsumers = 'xinfo consumers',
XPending = 'xpending',
XAck = 'xack',
XClaim = 'xclaim',
XGroupCreate = 'xgroup create',
XGroupSetId = 'xgroup setid',
XGroupDestroy = 'xgroup destroy',
XGroupDelConsumer = 'xgroup delconsumer',
}

export enum BrowserToolTSCommands {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import {
Body,
Controller, Delete,
Param, Patch,
Post,
UsePipes,
ValidationPipe,
} from '@nestjs/common';
import { ApiTags } from '@nestjs/swagger';
import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-operation.decorator';
import {
ConsumerGroupDto,
CreateConsumerGroupsDto,
DeleteConsumerGroupsDto,
DeleteConsumerGroupsResponse,
UpdateConsumerGroupDto,
} from 'src/modules/browser/dto/stream.dto';
import { ConsumerGroupService } from 'src/modules/browser/services/stream/consumer-group.service';
import { KeyDto } from 'src/modules/browser/dto';

@ApiTags('Streams')
@Controller('streams/consumer-groups')
@UsePipes(new ValidationPipe({ transform: true }))
export class ConsumerGroupController {
constructor(private service: ConsumerGroupService) {}

@Post('/get')
@ApiRedisInstanceOperation({
description: 'Get consumer groups list',
statusCode: 200,
responses: [
{
status: 200,
description: 'Returns stream consumer groups.',
type: ConsumerGroupDto,
isArray: true,
},
],
})
async getGroups(
@Param('dbInstance') instanceId: string,
@Body() dto: KeyDto,
): Promise<ConsumerGroupDto[]> {
return this.service.getGroups({ instanceId }, dto);
}

@Post('')
@ApiRedisInstanceOperation({
description: 'Create stream consumer group',
statusCode: 201,
})
async createGroups(
@Param('dbInstance') instanceId: string,
@Body() dto: CreateConsumerGroupsDto,
): Promise<void> {
return this.service.createGroups({ instanceId }, dto);
}

@Patch('')
@ApiRedisInstanceOperation({
description: 'Modify last delivered ID of the Consumer Group',
statusCode: 200,
})
async updateGroup(
@Param('dbInstance') instanceId: string,
@Body() dto: UpdateConsumerGroupDto,
): Promise<void> {
return this.service.updateGroup({ instanceId }, dto);
}

@Delete('')
@ApiRedisInstanceOperation({
description: 'Delete Consumer Group',
statusCode: 200,
responses: [
{
status: 200,
description: 'Returns number of affected consumer groups.',
type: DeleteConsumerGroupsResponse,
},
],
})
async deleteGroup(
@Param('dbInstance') instanceId: string,
@Body() dto: DeleteConsumerGroupsDto,
): Promise<DeleteConsumerGroupsResponse> {
return this.service.deleteGroup({ instanceId }, dto);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import {
Body,
Controller, Delete,
Param,
Post,
UsePipes,
ValidationPipe,
} from '@nestjs/common';
import { ApiTags } from '@nestjs/swagger';
import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-operation.decorator';
import {
AckPendingEntriesDto, AckPendingEntriesResponse, ClaimPendingEntriesResponse, ClaimPendingEntryDto,
ConsumerDto,
ConsumerGroupDto, DeleteConsumersDto,
GetConsumersDto, GetPendingEntriesDto, PendingEntryDto,
} from 'src/modules/browser/dto/stream.dto';
import { ConsumerService } from 'src/modules/browser/services/stream/consumer.service';

@ApiTags('Streams')
@Controller('streams/consumer-groups/consumers')
@UsePipes(new ValidationPipe({ transform: true }))
export class ConsumerController {
constructor(private service: ConsumerService) {}

@Post('/get')
@ApiRedisInstanceOperation({
description: 'Get consumers list in the group',
statusCode: 200,
responses: [
{
status: 200,
type: ConsumerGroupDto,
isArray: true,
},
],
})
async getConsumers(
@Param('dbInstance') instanceId: string,
@Body() dto: GetConsumersDto,
): Promise<ConsumerDto[]> {
return this.service.getConsumers({ instanceId }, dto);
}

@Delete('')
@ApiRedisInstanceOperation({
description: 'Delete Consumer(s) from the Consumer Group',
statusCode: 200,
})
async deleteConsumers(
@Param('dbInstance') instanceId: string,
@Body() dto: DeleteConsumersDto,
): Promise<void> {
return this.service.deleteConsumers({ instanceId }, dto);
}

@Post('/pending-messages/get')
@ApiRedisInstanceOperation({
description: 'Get pending entries list',
statusCode: 200,
responses: [
{
status: 200,
type: PendingEntryDto,
isArray: true,
},
],
})
async getPendingEntries(
@Param('dbInstance') instanceId: string,
@Body() dto: GetPendingEntriesDto,
): Promise<PendingEntryDto[]> {
return this.service.getPendingEntries({ instanceId }, dto);
}

@Post('/pending-messages/ack')
@ApiRedisInstanceOperation({
description: 'Ack pending entries',
statusCode: 200,
responses: [
{
status: 200,
type: AckPendingEntriesResponse,
},
],
})
async ackPendingEntries(
@Param('dbInstance') instanceId: string,
@Body() dto: AckPendingEntriesDto,
): Promise<AckPendingEntriesResponse> {
return this.service.ackPendingEntries({ instanceId }, dto);
}

@Post('/pending-messages/claim')
@ApiRedisInstanceOperation({
description: 'Claim pending entries',
statusCode: 200,
responses: [
{
status: 200,
type: ClaimPendingEntriesResponse,
},
],
})
async claimPendingEntries(
@Param('dbInstance') instanceId: string,
@Body() dto: ClaimPendingEntryDto,
): Promise<ClaimPendingEntriesResponse> {
return this.service.claimPendingEntries({ instanceId }, dto);
}
}
Loading