Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5a3e034
#RI-4380 BE Upload data in bulk base implementation
Apr 9, 2023
22c4d67
add tests + change endpoint url + fix bug
Apr 12, 2023
0f40f21
add tests + change endpoint url + fix bug
Apr 12, 2023
621bc95
Merge pull request #1930 from RedisInsight/be/feature/RI-4290-upload_…
Apr 12, 2023
bf8efa2
#RI-4381 - add bulk upload
rsergeenko Apr 14, 2023
a5cac30
BE add analytics events
Apr 17, 2023
10b602f
#RI-4381 - update styles
rsergeenko Apr 17, 2023
11fbab9
#RI-4381 - fix pr comments
rsergeenko Apr 17, 2023
20a7e84
Merge pull request #1954 from RedisInsight/fe/feature/RI-4381_bulk-up…
rsergeenko Apr 17, 2023
a0c5d65
Merge branch 'main' into feature/RI-4290-upload_data_in_bulk
Apr 18, 2023
bac5482
change max file size validation error message
Apr 19, 2023
579e2ea
#RI-4416 - add error notification
rsergeenko Apr 19, 2023
92e2d8d
#RI-4421,#RI-4424 use pipeline.call to not fail with cannot apply com…
Apr 19, 2023
8d7fa48
Merge pull request #1977 from RedisInsight/fe/bugfix/upload-data-in-bulk
rsergeenko Apr 19, 2023
cf9ac0a
#RI-4416 - add file size validation
rsergeenko Apr 19, 2023
9a82110
Merge pull request #1979 from RedisInsight/fe/bugfix/RI-4416_add-size…
rsergeenko Apr 19, 2023
b38268a
add tests for bulk upload
vlad-dargel Apr 19, 2023
98c6701
add additional tests for 10_000 keys unit + integration
Apr 20, 2023
db3b375
Merge pull request #1982 from RedisInsight/e2e/feature/RI-4290_bulk-u…
vlad-dargel Apr 20, 2023
4f85347
#RI-4326 - add highlighting for bulk upload
rsergeenko Apr 20, 2023
5e382b4
Merge pull request #1985 from RedisInsight/fe/feature/RI-4326_bulk-up…
rsergeenko Apr 20, 2023
2d84bdc
#RI-4428 - update telemetry events
rsergeenko Apr 20, 2023
8ccbec8
Merge pull request #1989 from RedisInsight/fe/bugfix/RI-4428_RI-4417
rsergeenko Apr 20, 2023
def29a1
#RI-4428 - add action to first open
rsergeenko Apr 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions redisinsight/api/src/__mocks__/redis.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import IORedis from 'ioredis';

const getRedisCommanderMockFunctions = () => ({
export const mockIORedisClientExec = jest.fn();
const getRedisCommanderMockFunctions = jest.fn(() => ({
sendCommand: jest.fn(),
info: jest.fn(),
monitor: jest.fn(),
Expand All @@ -12,9 +13,11 @@ const getRedisCommanderMockFunctions = () => ({
unsubscribe: jest.fn(),
punsubscribe: jest.fn(),
publish: jest.fn(),
pipeline: jest.fn().mockReturnThis(),
exec: mockIORedisClientExec,
cluster: jest.fn(),
quit: jest.fn(),
});
}));

export const mockIORedisClient = {
...Object.create(IORedis.prototype),
Expand Down
5 changes: 5 additions & 0 deletions redisinsight/api/src/app.routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { SlowLogModule } from 'src/modules/slow-log/slow-log.module';
import { PubSubModule } from 'src/modules/pub-sub/pub-sub.module';
import { ClusterMonitorModule } from 'src/modules/cluster-monitor/cluster-monitor.module';
import { DatabaseAnalysisModule } from 'src/modules/database-analysis/database-analysis.module';
import { BulkActionsModule } from 'src/modules/bulk-actions/bulk-actions.module';

export const routes: Routes = [
{
Expand Down Expand Up @@ -39,6 +40,10 @@ export const routes: Routes = [
path: '/:dbInstance',
module: DatabaseAnalysisModule,
},
{
path: '/:dbInstance',
module: BulkActionsModule,
},
],
},
];
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ export class BulkActionsAnalyticsService extends TelemetryBaseService {
type: overview.type,
duration: overview.duration,
filter: {
match: overview.filter.match === '*' ? '*' : 'PATTERN',
type: overview.filter.type,
match: overview.filter?.match === '*' ? '*' : 'PATTERN',
type: overview.filter?.type,
},
progress: {
scanned: overview.progress.scanned,
total: overview.progress.total,
scanned: overview.progress?.scanned,
total: overview.progress?.total,
},
},
);
Expand All @@ -54,16 +54,16 @@ export class BulkActionsAnalyticsService extends TelemetryBaseService {
type: overview.type,
duration: overview.duration,
filter: {
match: overview.filter.match === '*' ? '*' : 'PATTERN',
type: overview.filter.type,
match: overview.filter?.match === '*' ? '*' : 'PATTERN',
type: overview.filter?.type,
},
progress: {
scanned: overview.progress.scanned,
total: overview.progress.total,
scanned: overview.progress?.scanned,
total: overview.progress?.total,
},
summary: {
processed: overview.summary.processed,
succeed: overview.summary.succeed,
processed: overview.summary?.processed,
succeed: overview.summary?.succeed,
failed: overview.summary.failed,
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ import { BulkActionsService } from 'src/modules/bulk-actions/bulk-actions.servic
import { BulkActionsProvider } from 'src/modules/bulk-actions/providers/bulk-actions.provider';
import { BulkActionsGateway } from 'src/modules/bulk-actions/bulk-actions.gateway';
import { BulkActionsAnalyticsService } from 'src/modules/bulk-actions/bulk-actions-analytics.service';
import { BulkImportController } from 'src/modules/bulk-actions/bulk-import.controller';
import { BulkImportService } from 'src/modules/bulk-actions/bulk-import.service';

@Module({
controllers: [BulkImportController],
providers: [
BulkActionsGateway,
BulkActionsService,
BulkActionsProvider,
BulkActionsAnalyticsService,
BulkImportService,
],
})
export class BulkActionsModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import {
Body,
ClassSerializerInterceptor,
Controller, HttpCode, Post,
UseInterceptors, UsePipes, ValidationPipe,
} from '@nestjs/common';
import {
ApiConsumes, ApiTags,
} from '@nestjs/swagger';
import { ApiEndpoint } from 'src/decorators/api-endpoint.decorator';
import { FormDataRequest } from 'nestjs-form-data';
import { BulkImportService } from 'src/modules/bulk-actions/bulk-import.service';
import { UploadImportFileDto } from 'src/modules/bulk-actions/dto/upload-import-file.dto';
import { ClientMetadataParam } from 'src/common/decorators';
import { ClientMetadata } from 'src/common/models';
import { IBulkActionOverview } from 'src/modules/bulk-actions/interfaces/bulk-action-overview.interface';

@UsePipes(new ValidationPipe({ transform: true }))
@UseInterceptors(ClassSerializerInterceptor)
@ApiTags('Bulk Actions')
@Controller('/bulk-actions')
export class BulkImportController {
constructor(private readonly service: BulkImportService) {}

@Post('import')
@ApiConsumes('multipart/form-data')
@HttpCode(200)
@FormDataRequest()
@ApiEndpoint({
description: 'Import data from file',
responses: [
{
type: Object,
},
],
})
async import(
@Body() dto: UploadImportFileDto,
@ClientMetadataParam() clientMetadata: ClientMetadata,
): Promise<IBulkActionOverview> {
return this.service.import(clientMetadata, dto);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import { Test, TestingModule } from '@nestjs/testing';
import { BulkImportService } from 'src/modules/bulk-actions/bulk-import.service';
import { DatabaseConnectionService } from 'src/modules/database/database-connection.service';
import {
mockClientMetadata,
mockDatabaseConnectionService,
mockIORedisClient,
mockIORedisCluster, MockType
} from 'src/__mocks__';
import { MemoryStoredFile } from 'nestjs-form-data';
import { BulkActionSummary } from 'src/modules/bulk-actions/models/bulk-action-summary';
import { IBulkActionOverview } from 'src/modules/bulk-actions/interfaces/bulk-action-overview.interface';
import { BulkActionStatus, BulkActionType } from 'src/modules/bulk-actions/constants';
import { NotFoundException } from '@nestjs/common';
import { BulkActionsAnalyticsService } from 'src/modules/bulk-actions/bulk-actions-analytics.service';

const generateNCommandsBuffer = (n: number) => Buffer.from(
(new Array(n)).fill(1).map(() => ['set', ['foo', 'bar']]).join('\n'),
);
const generateNBatchCommands = (n: number) => (new Array(n)).fill(1).map(() => ['set', ['foo', 'bar']]);
const generateNBatchCommandsResults = (n: number) => (new Array(n)).fill(1).map(() => [null, 'OK']);
const mockBatchCommands = generateNBatchCommands(100);
const mockBatchCommandsResult = generateNBatchCommandsResults(100);
const mockBatchCommandsResultWithErrors = [...(new Array(99)).fill(1).map(() => [null, 'OK']), ['ReplyError']];
const mockSummary: BulkActionSummary = Object.assign(new BulkActionSummary(), {
processed: 100,
succeed: 100,
failed: 0,
errors: [],
});

const mockSummaryWithErrors = Object.assign(new BulkActionSummary(), {
processed: 100,
succeed: 99,
failed: 1,
errors: [],
});

const mockImportResult: IBulkActionOverview = {
id: 'empty',
databaseId: mockClientMetadata.databaseId,
type: BulkActionType.Import,
summary: mockSummary.getOverview(),
progress: null,
filter: null,
status: BulkActionStatus.Completed,
duration: 100,
};

const mockUploadImportFileDto = {
file: {
originalname: 'filename',
size: 1,
buffer: Buffer.from('SET foo bar'),
} as unknown as MemoryStoredFile,
};

describe('BulkImportService', () => {
let service: BulkImportService;
let databaseConnectionService: MockType<DatabaseConnectionService>;
let analytics: MockType<BulkActionsAnalyticsService>;

beforeEach(async () => {
jest.clearAllMocks();

const module: TestingModule = await Test.createTestingModule({
providers: [
BulkImportService,
{
provide: DatabaseConnectionService,
useFactory: mockDatabaseConnectionService,
},
{
provide: BulkActionsAnalyticsService,
useFactory: () => ({
sendActionStarted: jest.fn(),
sendActionStopped: jest.fn(),
}),
},
],
}).compile();

service = module.get(BulkImportService);
databaseConnectionService = module.get(DatabaseConnectionService);
analytics = module.get(BulkActionsAnalyticsService);
});

describe('executeBatch', () => {
it('should execute batch in pipeline for standalone', async () => {
mockIORedisClient.exec.mockResolvedValueOnce(mockBatchCommandsResult);
expect(await service['executeBatch'](mockIORedisClient, mockBatchCommands)).toEqual(mockSummary);
});
it('should execute batch in pipeline for standalone with errors', async () => {
mockIORedisClient.exec.mockResolvedValueOnce(mockBatchCommandsResultWithErrors);
expect(await service['executeBatch'](mockIORedisClient, mockBatchCommands)).toEqual(mockSummaryWithErrors);
});
it('should return all failed in case of global error', async () => {
mockIORedisClient.exec.mockRejectedValueOnce(new Error());
expect(await service['executeBatch'](mockIORedisClient, mockBatchCommands)).toEqual({
...mockSummary.getOverview(),
succeed: 0,
failed: mockSummary.getOverview().processed,
});
});
it('should execute batch of commands without pipeline for cluster', async () => {
mockIORedisCluster.call.mockRejectedValueOnce(new Error());
mockIORedisCluster.call.mockResolvedValue('OK');
expect(await service['executeBatch'](mockIORedisCluster, mockBatchCommands)).toEqual(mockSummaryWithErrors);
});
});

describe('import', () => {
let spy;

beforeEach(() => {
spy = jest.spyOn(service as any, 'executeBatch');
});

it('should import data', async () => {
spy.mockResolvedValue(mockSummary);
expect(await service.import(mockClientMetadata, mockUploadImportFileDto)).toEqual({
...mockImportResult,
duration: jasmine.anything(),
});
expect(analytics.sendActionStopped).toHaveBeenCalledWith({
...mockImportResult,
duration: jasmine.anything(),
});
});

it('should import data (100K) from file in batches 10K each', async () => {
spy.mockResolvedValue(Object.assign(new BulkActionSummary(), {
processed: 10_000,
succeed: 10_000,
failed: 0,
}));
expect(await service.import(mockClientMetadata, {
file: {
...mockUploadImportFileDto.file,
buffer: generateNCommandsBuffer(100_000),
} as unknown as MemoryStoredFile,
})).toEqual({
...mockImportResult,
summary: {
processed: 100_000,
succeed: 100_000,
failed: 0,
errors: [],
},
duration: jasmine.anything(),
});
});

it('should import data (10K) from file in batches 10K each', async () => {
spy.mockResolvedValue(Object.assign(new BulkActionSummary(), {
processed: 10_000,
succeed: 10_000,
failed: 0,
}));
expect(await service.import(mockClientMetadata, {
file: {
...mockUploadImportFileDto.file,
buffer: generateNCommandsBuffer(10_000),
} as unknown as MemoryStoredFile,
})).toEqual({
...mockImportResult,
summary: {
processed: 10_000,
succeed: 10_000,
failed: 0,
errors: [],
},
duration: jasmine.anything(),
});
});

it('should not import any data due to parse error', async () => {
spy.mockResolvedValue(Object.assign(new BulkActionSummary(), {
processed: 0,
succeed: 0,
failed: 0,
}));
expect(await service.import(mockClientMetadata, {
file: {
...mockUploadImportFileDto.file,
buffer: Buffer.from('{"incorrectdata"}\n{"incorrectdata"}'),
} as unknown as MemoryStoredFile,
})).toEqual({
...mockImportResult,
summary: {
processed: 2,
succeed: 0,
failed: 2,
errors: [],
},
duration: jasmine.anything(),
});
});

it('should throw an error in case of global error', async () => {
try {
databaseConnectionService.createClient.mockRejectedValueOnce(new NotFoundException());

await service.import(mockClientMetadata, mockUploadImportFileDto);

fail();
} catch (e) {
expect(e).toBeInstanceOf(NotFoundException);
}
});
});
});
Loading