Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,30 @@ export class BulkActionsGateway
return this.service.abort(dto);
}

@SubscribeMessage(BulkActionsServerEvents.SubscribeBulkActionReport)
subscribeBulkActionReport(
@ConnectedSocket() socket: Socket,
@Body() dto: BulkActionIdDto,
) {
this.logger.debug('Subscribing to bulk action report.');
return this.service.subscribeToReport(socket, dto.id);
}

@SubscribeMessage(BulkActionsServerEvents.UnsubscribeBulkActionReport)
unsubscribeBulkActionReport(
@ConnectedSocket() socket: Socket,
@Body() dto: BulkActionIdDto,
) {
this.logger.debug('Unsubscribing from bulk action report.');
return this.service.unsubscribeFromReport(socket, dto.id);
}

@SubscribeMessage(BulkActionsServerEvents.StartBulkActionExecution)
startBulkActionExecution(@Body() dto: BulkActionIdDto) {
this.logger.debug('Starting bulk action execution.');
return this.service.startExecution(dto.id);
}

async handleConnection(socket: Socket): Promise<void> {
this.logger.debug(`Client connected: ${socket.id}`);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Socket } from 'socket.io';
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { BulkActionsProvider } from 'src/modules/bulk-actions/providers/bulk-actions.provider';
import { CreateBulkActionDto } from 'src/modules/bulk-actions/dto/create-bulk-action.dto';
import { BulkActionIdDto } from 'src/modules/bulk-actions/dto/bulk-action-id.dto';
Expand All @@ -8,6 +8,8 @@ import { SessionMetadata } from 'src/common/models';

@Injectable()
export class BulkActionsService {
private logger: Logger = new Logger('BulkActionsService');

constructor(
private readonly bulkActionsProvider: BulkActionsProvider,
private readonly analytics: BulkActionsAnalytics,
Expand Down Expand Up @@ -41,6 +43,43 @@ export class BulkActionsService {
return bulkAction.getOverview();
}

async subscribeToReport(socket: Socket, bulkActionId: string) {
try {
const bulkAction = await this.bulkActionsProvider.get(bulkActionId);
bulkAction.subscribeToReport(socket);

bulkAction.emitReportReady();

return { status: 'subscribed' };
} catch (error) {
this.logger.error(
`Failed to subscribe to bulk action report ${bulkActionId}:`,
error,
);
throw error;
}
}

async unsubscribeFromReport(socket: Socket, bulkActionId: string) {
const bulkAction = await this.bulkActionsProvider.get(bulkActionId);
bulkAction.unsubscribeFromReport(socket);
return { status: 'unsubscribed' };
}

async startExecution(bulkActionId: string) {
try {
await this.bulkActionsProvider.startExecution(bulkActionId);

return { status: 'execution-started' };
} catch (error) {
this.logger.error(
`Failed to start execution for bulk action ${bulkActionId}:`,
error,
);
throw error;
}
}

disconnect(socketId: string) {
this.bulkActionsProvider.abortUsersBulkActions(socketId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ export enum BulkActionsServerEvents {
Create = 'create',
Get = 'get',
Abort = 'abort',
SubscribeBulkActionReport = 'subscribe-bulk-action-report',
UnsubscribeBulkActionReport = 'unsubscribe-bulk-action-report',
StartBulkActionExecution = 'start-bulk-action-execution',
}

export enum BulkActionsClientEvents {
ReportKeys = 'report-keys',
ReportComplete = 'report-complete',
ReportReady = 'report-ready',
}

export enum BulkActionType {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { BulkActionFilter } from 'src/modules/bulk-actions/models/bulk-action-filter';
import { BulkActionType } from 'src/modules/bulk-actions/constants';
import {
IsBoolean,
IsEnum,
IsNotEmpty,
IsNumber,
Expand Down Expand Up @@ -35,4 +36,8 @@ export class CreateBulkActionDto extends BulkActionIdDto {
@Min(0)
@Max(2147483647)
db?: number;

@IsOptional()
@IsBoolean()
enableReporting?: boolean;
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { BulkActionStatus } from 'src/modules/bulk-actions/constants';
import { BulkActionFilter } from 'src/modules/bulk-actions/models/bulk-action-filter';
import { Socket } from 'socket.io';
import { RedisString } from 'src/common/constants';

export interface IBulkAction {
getStatus(): BulkActionStatus;
getFilter(): BulkActionFilter;
changeState(): void;
getSocket(): Socket;
emitDeletedKeys(keys: RedisString[]): void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ export class BulkActionSummary {

private keys: Array<RedisString> = [];

private hasMoreKeys: boolean = false;

private totalKeysProcessed: number = 0;

private readonly maxStoredKeys: number = BULK_ACTIONS_CONFIG.summaryKeysLimit;
Expand Down Expand Up @@ -52,10 +50,6 @@ export class BulkActionSummary {
const keysToStore = keys.slice(0, remaining);
this.keys.push(...keysToStore);
}

if (this.totalKeysProcessed > this.maxStoredKeys) {
this.hasMoreKeys = true;
}
}

getOverview(): IBulkActionSummaryOverview {
Expand Down
92 changes: 92 additions & 0 deletions redisinsight/api/src/modules/bulk-actions/models/bulk-action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { debounce } from 'lodash';
import {
BulkActionStatus,
BulkActionType,
BulkActionsClientEvents,
} from 'src/modules/bulk-actions/constants';
import { BulkActionFilter } from 'src/modules/bulk-actions/models/bulk-action-filter';
import { Socket } from 'socket.io';
Expand All @@ -14,6 +15,7 @@ import { IBulkActionOverview } from 'src/modules/bulk-actions/interfaces/bulk-ac
import { BulkActionsAnalytics } from 'src/modules/bulk-actions/bulk-actions.analytics';
import { RedisClient, RedisClientNodeRole } from 'src/modules/redis/client';
import { SessionMetadata } from 'src/common/models';
import { RedisString } from 'src/common/constants';

export class BulkAction implements IBulkAction {
private logger: Logger = new Logger('BulkAction');
Expand All @@ -30,13 +32,18 @@ export class BulkAction implements IBulkAction {

private readonly debounce: Function;

private reportSubscribers: Set<Socket> = new Set();

private totalKeysEmitted: number = 0;

constructor(
private readonly id: string,
private readonly databaseId: string,
private readonly type: BulkActionType,
private readonly filter: BulkActionFilter,
private readonly socket: Socket,
private readonly analytics: BulkActionsAnalytics,
private readonly enableReporting: boolean = true,
) {
this.debounce = debounce(this.sendOverview.bind(this), 1000, {
maxWait: 1000,
Expand Down Expand Up @@ -178,6 +185,7 @@ export class BulkAction implements IBulkAction {
if (!this.endTime) {
this.endTime = Date.now();
}
this.emitReportComplete();
// eslint-disable-next-line no-fallthrough
default:
this.changeState();
Expand Down Expand Up @@ -217,4 +225,88 @@ export class BulkAction implements IBulkAction {
this.logger.error('Unable to send overview', e, sessionMetadata);
}
}

subscribeToReport(socket: Socket): void {
this.reportSubscribers.add(socket);
}

unsubscribeFromReport(socket: Socket): void {
this.reportSubscribers.delete(socket);
}

emitDeletedKeys(keys: RedisString[]): void {
this.totalKeysEmitted += keys.length;

if (
this.enableReporting &&
this.reportSubscribers.size > 0 &&
keys.length > 0
) {
const keyStrings = keys.map((key) => Buffer.from(key).toString());

this.reportSubscribers.forEach((socket) => {
try {
socket.emit(BulkActionsClientEvents.ReportKeys, {
keys: keyStrings,
count: keyStrings.length,
totalEmitted: this.totalKeysEmitted,
});
} catch (error) {
this.logger.error(
`Failed to emit keys batch to socket ${socket.id}:`,
error,
);
this.reportSubscribers.delete(socket);
}
});
}
}

emitReportReady(): void {
if (this.reportSubscribers.size > 0) {
this.logger.debug(
`Emitting report ready to ${this.reportSubscribers.size} subscribers`,
);
this.reportSubscribers.forEach((socket) => {
try {
socket.emit(BulkActionsClientEvents.ReportReady, {
bulkActionId: this.id,
status: this.status,
});
} catch (error) {
this.logger.error(
`Failed to emit report ready to socket ${socket.id}:`,
error,
);
this.reportSubscribers.delete(socket);
}
});
}
}

emitReportComplete(): void {
if (this.reportSubscribers.size > 0) {
const overview = this.getOverview();
this.logger.debug(
`Emitting report completion to ${this.reportSubscribers.size} subscribers. Status: ${overview.status}`,
);
this.reportSubscribers.forEach((socket) => {
try {
socket.emit(BulkActionsClientEvents.ReportComplete, {
status: overview.status,
summary: overview.summary,
totalKeysEmitted: this.totalKeysEmitted,
});
} catch (error) {
this.logger.error(
`Failed to emit completion to socket ${socket.id}:`,
error,
);
this.reportSubscribers.delete(socket);
}
});
} else {
this.logger.debug(`Bulk action completed but no report subscribers`);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,21 @@ export abstract class AbstractBulkActionSimpleRunner extends AbstractBulkActionR
this.summary.addKeys(keys.filter((_, index) => !res[index][0]));

const errors = [];
const successfulKeys = [];

res.forEach(([err], i) => {
if (err) {
errors.push({ key: keys[i], error: err.message });
} else {
this.summary.addSuccess(1);
successfulKeys.push(keys[i]);
}
});

if (successfulKeys.length > 0) {
this.bulkAction.emitDeletedKeys(successfulKeys);
}

this.summary.addErrors(errors);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ export class BulkActionsProvider {
dto.filter,
socket,
this.analytics,
dto.enableReporting ?? true,
);

this.bulkActions.set(dto.id, bulkAction);
Expand All @@ -67,8 +68,6 @@ export class BulkActionsProvider {
BulkActionsProvider.getSimpleRunnerClass(dto),
);

bulkAction.start().catch();

return bulkAction;
}

Expand Down Expand Up @@ -141,4 +140,18 @@ export class BulkActionsProvider {

return aborted;
}

/**
* Start execution for a deferred bulk action
* @param id
*/
async startExecution(id: string): Promise<BulkAction> {
const bulkAction = this.get(id);

this.logger.debug(`Starting deferred execution for bulk action ${id}`);

bulkAction.start().catch();

return bulkAction;
}
}
Loading
Loading