Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions redisinsight/api/src/__mocks__/analytics.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
import { EventEmitter2 } from '@nestjs/event-emitter';
import { BulkActionsAnalyticsService } from 'src/modules/bulk-actions/bulk-actions-analytics.service';

const mockEmitter = new EventEmitter2();

class AnalyticsService extends BulkActionsAnalyticsService {
constructor(protected eventEmitter: EventEmitter2) {
super(eventEmitter);
}
}

export const mockInstancesAnalyticsService = () => ({
sendInstanceListReceivedEvent: jest.fn(),
sendInstanceAddedEvent: jest.fn(),
Expand Down Expand Up @@ -29,6 +40,8 @@ export const mockSettingsAnalyticsService = () => ({
sendSettingsUpdatedEvent: jest.fn(),
});

export const mockBulActionsAnalyticsService = new AnalyticsService(mockEmitter);

export const mockPubSubAnalyticsService = () => ({
sendMessagePublishedEvent: jest.fn(),
sendChannelSubscribeEvent: jest.fn(),
Expand Down
2 changes: 2 additions & 0 deletions redisinsight/api/src/constants/telemetry-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ export enum TelemetryEvents {
// Bulk Actions
BulkActionsStarted = 'BULK_ACTIONS_STARTED',
BulkActionsStopped = 'BULK_ACTIONS_STOPPED',
BulkActionsSucceed = 'BULK_ACTIONS_SUCCEED',
BulkActionsFailed = 'BULK_ACTIONS_FAILED',
}

export enum CommandType {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Injectable } from '@nestjs/common';
import { HttpException, Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { TelemetryEvents } from 'src/constants';
import { TelemetryBaseService } from 'src/modules/analytics/telemetry.base.service';
import { getRangeForNumber, BULK_ACTIONS_BREAKPOINTS } from 'src/utils';
import { CommandExecutionStatus } from 'src/modules/cli/dto/cli.dto';
import { RedisError, ReplyError } from 'src/models';
import { IBulkActionOverview } from 'src/modules/bulk-actions/interfaces/bulk-action-overview.interface';
Expand All @@ -20,6 +21,8 @@ export class BulkActionsAnalyticsService extends TelemetryBaseService {
super(eventEmitter);
this.events.set(TelemetryEvents.BulkActionsStarted, this.sendActionStarted.bind(this));
this.events.set(TelemetryEvents.BulkActionsStopped, this.sendActionStopped.bind(this));
this.events.set(TelemetryEvents.BulkActionsSucceed, this.sendActionSucceed.bind(this));
this.events.set(TelemetryEvents.BulkActionsFailed, this.sendActionFailed.bind(this));
}

sendActionStarted(overview: IBulkActionOverview): void {
Expand All @@ -28,15 +31,17 @@ export class BulkActionsAnalyticsService extends TelemetryBaseService {
TelemetryEvents.BulkActionsStarted,
{
databaseId: overview.databaseId,
type: overview.type,
action: overview.type,
duration: overview.duration,
filter: {
match: overview.filter?.match === '*' ? '*' : 'PATTERN',
type: overview.filter?.type,
},
progress: {
scanned: overview.progress?.scanned,
scannedRange: getRangeForNumber(overview.progress?.scanned, BULK_ACTIONS_BREAKPOINTS),
total: overview.progress?.total,
totalRange: getRangeForNumber(overview.progress?.total, BULK_ACTIONS_BREAKPOINTS),
},
},
);
Expand All @@ -51,20 +56,25 @@ export class BulkActionsAnalyticsService extends TelemetryBaseService {
TelemetryEvents.BulkActionsStopped,
{
databaseId: overview.databaseId,
type: overview.type,
action: overview.type,
duration: overview.duration,
filter: {
match: overview.filter?.match === '*' ? '*' : 'PATTERN',
type: overview.filter?.type,
},
progress: {
scanned: overview.progress?.scanned,
scannedRange: getRangeForNumber(overview.progress?.scanned, BULK_ACTIONS_BREAKPOINTS),
total: overview.progress?.total,
totalRange: getRangeForNumber(overview.progress?.total, BULK_ACTIONS_BREAKPOINTS),
},
summary: {
processed: overview.summary?.processed,
processedRange: getRangeForNumber(overview.summary?.processed, BULK_ACTIONS_BREAKPOINTS),
succeed: overview.summary?.succeed,
succeedRange: getRangeForNumber(overview.summary?.succeed, BULK_ACTIONS_BREAKPOINTS),
failed: overview.summary.failed,
failedRange: getRangeForNumber(overview.summary.failed, BULK_ACTIONS_BREAKPOINTS),
},
},
);
Expand All @@ -73,6 +83,48 @@ export class BulkActionsAnalyticsService extends TelemetryBaseService {
}
}

sendActionSucceed(overview: IBulkActionOverview): void {
try {
this.sendEvent(
TelemetryEvents.BulkActionsSucceed,
{
databaseId: overview.databaseId,
action: overview.type,
duration: overview.duration,
filter: {
match: overview.filter?.match === '*' ? '*' : 'PATTERN',
type: overview.filter?.type,
},
summary: {
processed: overview.summary?.processed,
processedRange: getRangeForNumber(overview.summary?.processed, BULK_ACTIONS_BREAKPOINTS),
succeed: overview.summary?.succeed,
succeedRange: getRangeForNumber(overview.summary?.succeed, BULK_ACTIONS_BREAKPOINTS),
failed: overview.summary.failed,
failedRange: getRangeForNumber(overview.summary.failed, BULK_ACTIONS_BREAKPOINTS),
},
},
);
} catch (e) {
// continue regardless of error
}
}

sendActionFailed(overview: IBulkActionOverview, error: HttpException | Error): void {
try {
this.sendEvent(
TelemetryEvents.BulkActionsFailed,
{
databaseId: overview.databaseId,
action: overview.type,
error,
},
);
} catch (e) {
// continue regardless of error
}
}

getEventsEmitters(): Map<TelemetryEvents, Function> {
return this.events;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as MockedSocket from 'socket.io-mock';
import { Test, TestingModule } from '@nestjs/testing';
import {
MockType,
mockBulActionsAnalyticsService,
} from 'src/__mocks__';
import { BulkActionsProvider } from 'src/modules/bulk-actions/providers/bulk-actions.provider';
import { RedisDataType } from 'src/modules/browser/dto';
Expand Down Expand Up @@ -43,6 +44,7 @@ const mockBulkAction = new BulkAction(
mockCreateBulkActionDto.type,
mockBulkActionFilter,
mockSocket1,
mockBulActionsAnalyticsService,
);
const mockOverview = 'mocked overview...';

Expand All @@ -51,6 +53,7 @@ mockBulkAction['getOverview'] = jest.fn().mockReturnValue(mockOverview);
describe('BulkActionsService', () => {
let service: BulkActionsService;
let bulkActionProvider: MockType<BulkActionsProvider>;
let analyticsService: MockType<BulkActionsAnalyticsService>;

beforeEach(async () => {
jest.clearAllMocks();
Expand All @@ -72,19 +75,23 @@ describe('BulkActionsService', () => {
useFactory: () => ({
sendActionStarted: jest.fn(),
sendActionStopped: jest.fn(),
sendActionSucceed: jest.fn(),
sendActionFailed: jest.fn(),
}),
},
],
}).compile();

service = module.get(BulkActionsService);
bulkActionProvider = module.get(BulkActionsProvider);
analyticsService = module.get(BulkActionsAnalyticsService);
});

describe('create', () => {
it('should create and return overview', async () => {
expect(await service.create(mockCreateBulkActionDto, mockSocket1)).toEqual(mockOverview);
expect(bulkActionProvider.create).toHaveBeenCalledTimes(1);
expect(analyticsService.sendActionStarted).toHaveBeenCalledTimes(1);
});
});
describe('get', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@ export class BulkActionsService {

async abort(dto: BulkActionIdDto) {
const bulkAction = await this.bulkActionsProvider.abort(dto.id);
const overview = bulkAction.getOverview();

this.analyticsService.sendActionStopped(overview);

return overview;
return bulkAction.getOverview();
}

disconnect(socketId: string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { BulkActionsAnalyticsService } from 'src/modules/bulk-actions/bulk-actio
import * as fs from 'fs-extra';
import config from 'src/utils/config';
import { join } from 'path';
import { wrapHttpError } from 'src/common/utils';

const PATH_CONFIG = config.get('dir_path');

Expand All @@ -34,6 +35,13 @@ const mockSummary: BulkActionSummary = Object.assign(new BulkActionSummary(), {
errors: [],
});

const mockEmptySummary: BulkActionSummary = Object.assign(new BulkActionSummary(), {
processed: 0,
succeed: 0,
failed: 0,
errors: [],
});

const mockSummaryWithErrors = Object.assign(new BulkActionSummary(), {
processed: 100,
succeed: 99,
Expand All @@ -44,14 +52,25 @@ const mockSummaryWithErrors = Object.assign(new BulkActionSummary(), {
const mockImportResult: IBulkActionOverview = {
id: 'empty',
databaseId: mockClientMetadata.databaseId,
type: BulkActionType.Import,
type: BulkActionType.Upload,
summary: mockSummary.getOverview(),
progress: null,
filter: null,
status: BulkActionStatus.Completed,
duration: 100,
};

const mockEmptyImportResult: IBulkActionOverview = {
id: 'empty',
databaseId: mockClientMetadata.databaseId,
type: BulkActionType.Upload,
summary: mockEmptySummary.getOverview(),
progress: null,
filter: null,
status: BulkActionStatus.Completed,
duration: 0,
};

const mockUploadImportFileDto = {
file: {
originalname: 'filename',
Expand Down Expand Up @@ -88,6 +107,8 @@ describe('BulkImportService', () => {
useFactory: () => ({
sendActionStarted: jest.fn(),
sendActionStopped: jest.fn(),
sendActionSucceed: jest.fn(),
sendActionFailed: jest.fn(),
}),
},
],
Expand Down Expand Up @@ -135,7 +156,7 @@ describe('BulkImportService', () => {
...mockImportResult,
duration: jasmine.anything(),
});
expect(analytics.sendActionStopped).toHaveBeenCalledWith({
expect(analytics.sendActionSucceed).toHaveBeenCalledWith({
...mockImportResult,
duration: jasmine.anything(),
});
Expand Down Expand Up @@ -220,6 +241,10 @@ describe('BulkImportService', () => {
fail();
} catch (e) {
expect(mockIORedisClient.disconnect).not.toHaveBeenCalled();
expect(analytics.sendActionFailed).toHaveBeenCalledWith(
{ ...mockEmptyImportResult },
wrapHttpError(e),
);
expect(e).toBeInstanceOf(NotFoundException);
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { join, resolve } from 'path';
import { join } from 'path';
import * as fs from 'fs-extra';
import { BadRequestException, Injectable, Logger } from '@nestjs/common';
import { Readable } from 'stream';
Expand Down Expand Up @@ -70,7 +70,7 @@ export class BulkImportService {
const result: IBulkActionOverview = {
id: 'empty',
databaseId: clientMetadata.databaseId,
type: BulkActionType.Import,
type: BulkActionType.Upload,
summary: {
processed: 0,
succeed: 0,
Expand Down Expand Up @@ -115,6 +115,7 @@ export class BulkImportService {
rl.on('error', (error) => {
result.summary.errors.push(error);
result.status = BulkActionStatus.Failed;
this.analyticsService.sendActionFailed(result, error);
res(null);
});
rl.on('close', () => {
Expand All @@ -133,15 +134,19 @@ export class BulkImportService {
result.summary.processed += parseErrors;
result.summary.failed += parseErrors;

this.analyticsService.sendActionStopped(result);
if (result.status === BulkActionStatus.Completed) {
this.analyticsService.sendActionSucceed(result);
}

client.disconnect();

return result;
} catch (e) {
this.logger.error('Unable to process an import file', e);
const exception = wrapHttpError(e);
this.analyticsService.sendActionFailed(result, exception);
client?.disconnect();
throw wrapHttpError(e);
throw exception;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export enum BulkActionsServerEvents {

export enum BulkActionType {
Delete = 'delete',
Import = 'import',
Upload = 'upload',
}

export enum BulkActionStatus {
Expand Down
Loading