Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions redisinsight/api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"analytics-node": "^4.0.1",
"axios": "^0.25.0",
"body-parser": "^1.19.0",
"busboy": "^1.6.0",
"class-transformer": "^0.2.3",
"class-validator": "^0.14.0",
"connect-timeout": "^1.9.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import {
Body,
ClassSerializerInterceptor,
Controller, HttpCode, Post,
Controller, HttpCode, Post, Req,
UseInterceptors, UsePipes, ValidationPipe,
} from '@nestjs/common';
import * as Busboy from 'busboy';
import { Readable } from 'stream';
import { Request } from 'express';
import {
ApiConsumes, ApiTags,
} from '@nestjs/swagger';
import { ApiEndpoint } from 'src/decorators/api-endpoint.decorator';
import { FormDataRequest } from 'nestjs-form-data';
import { BulkImportService } from 'src/modules/bulk-actions/bulk-import.service';
import { UploadImportFileDto } from 'src/modules/bulk-actions/dto/upload-import-file.dto';
import { ClientMetadataParam } from 'src/common/decorators';
import { ClientMetadata } from 'src/common/models';
import { IBulkActionOverview } from 'src/modules/bulk-actions/interfaces/bulk-action-overview.interface';
Expand All @@ -26,7 +27,6 @@ export class BulkImportController {
@Post('import')
@ApiConsumes('multipart/form-data')
@HttpCode(200)
@FormDataRequest()
@ApiEndpoint({
description: 'Import data from file',
responses: [
Expand All @@ -36,10 +36,21 @@ export class BulkImportController {
],
})
async import(
@Body() dto: UploadImportFileDto,
@Req() req: Request,
@ClientMetadataParam() clientMetadata: ClientMetadata,
): Promise<IBulkActionOverview> {
return this.service.import(clientMetadata, dto);
return new Promise((res, rej) => {
const busboy = Busboy({ headers: req.headers });

busboy.on(
'file',
(_fieldName: string, fileStream: Readable) => {
this.service.import(clientMetadata, fileStream).then(res).catch(rej);
},
);

req.pipe(busboy);
});
}

@Post('import/tutorial-data')
Expand Down
108 changes: 38 additions & 70 deletions redisinsight/api/src/modules/bulk-actions/bulk-import.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
mockIORedisClient,
mockIORedisCluster, MockType,
} from 'src/__mocks__';
import { MemoryStoredFile } from 'nestjs-form-data';
import { BulkActionSummary } from 'src/modules/bulk-actions/models/bulk-action-summary';
import { IBulkActionOverview } from 'src/modules/bulk-actions/interfaces/bulk-action-overview.interface';
import { BulkActionStatus, BulkActionType } from 'src/modules/bulk-actions/constants';
Expand All @@ -17,6 +16,7 @@ import * as fs from 'fs-extra';
import config from 'src/utils/config';
import { join } from 'path';
import { wrapHttpError } from 'src/common/utils';
import { Readable } from 'stream';

const PATH_CONFIG = config.get('dir_path');

Expand Down Expand Up @@ -71,13 +71,7 @@ const mockEmptyImportResult: IBulkActionOverview = {
duration: 0,
};

const mockUploadImportFileDto = {
file: {
originalname: 'filename',
size: 1,
buffer: Buffer.from('SET foo bar'),
} as unknown as MemoryStoredFile,
};
const mockReadableStream = Readable.from(Buffer.from('SET foo bar'));

const mockUploadImportFileByPathDto = {
path: '/some/path',
Expand Down Expand Up @@ -152,7 +146,7 @@ describe('BulkImportService', () => {

it('should import data', async () => {
spy.mockResolvedValue(mockSummary);
expect(await service.import(mockClientMetadata, mockUploadImportFileDto)).toEqual({
expect(await service.import(mockClientMetadata, mockReadableStream)).toEqual({
...mockImportResult,
duration: jasmine.anything(),
});
Expand All @@ -168,21 +162,17 @@ describe('BulkImportService', () => {
succeed: 10_000,
failed: 0,
}));
expect(await service.import(mockClientMetadata, {
file: {
...mockUploadImportFileDto.file,
buffer: generateNCommandsBuffer(100_000),
} as unknown as MemoryStoredFile,
})).toEqual({
...mockImportResult,
summary: {
processed: 100_000,
succeed: 100_000,
failed: 0,
errors: [],
},
duration: jasmine.anything(),
});
expect(await service.import(mockClientMetadata, Readable.from(generateNCommandsBuffer(100_000))))
.toEqual({
...mockImportResult,
summary: {
processed: 100_000,
succeed: 100_000,
failed: 0,
errors: [],
},
duration: jasmine.anything(),
});
});

it('should import data (10K) from file in batches 10K each', async () => {
Expand All @@ -191,21 +181,17 @@ describe('BulkImportService', () => {
succeed: 10_000,
failed: 0,
}));
expect(await service.import(mockClientMetadata, {
file: {
...mockUploadImportFileDto.file,
buffer: generateNCommandsBuffer(10_000),
} as unknown as MemoryStoredFile,
})).toEqual({
...mockImportResult,
summary: {
processed: 10_000,
succeed: 10_000,
failed: 0,
errors: [],
},
duration: jasmine.anything(),
});
expect(await service.import(mockClientMetadata, Readable.from(generateNCommandsBuffer(10_000))))
.toEqual({
...mockImportResult,
summary: {
processed: 10_000,
succeed: 10_000,
failed: 0,
errors: [],
},
duration: jasmine.anything(),
});
});

it('should not import any data due to parse error', async () => {
Expand All @@ -214,12 +200,10 @@ describe('BulkImportService', () => {
succeed: 0,
failed: 0,
}));
expect(await service.import(mockClientMetadata, {
file: {
...mockUploadImportFileDto.file,
buffer: Buffer.from('{"incorrectdata"}\n{"incorrectdata"}'),
} as unknown as MemoryStoredFile,
})).toEqual({
expect(await service.import(
mockClientMetadata,
Readable.from(Buffer.from('{"incorrectdata"}\n{"incorrectdata"}')),
)).toEqual({
...mockImportResult,
summary: {
processed: 2,
Expand All @@ -233,21 +217,19 @@ describe('BulkImportService', () => {
});

it('should ignore blank lines', async () => {
await service.import(mockClientMetadata, {
file: {
...mockUploadImportFileDto.file,
buffer: Buffer.from('\n SET foo bar \n \n SET foo bar \n '),
} as unknown as MemoryStoredFile,
})
expect(spy).toBeCalledWith(mockIORedisClient, [['set', ['foo', 'bar']], ['set', ['foo', 'bar']]])
await service.import(
mockClientMetadata,
Readable.from(Buffer.from('\n SET foo bar \n \n SET foo bar \n ')),
);
expect(spy).toBeCalledWith(mockIORedisClient, [['set', ['foo', 'bar']], ['set', ['foo', 'bar']]]);
expect(mockIORedisClient.disconnect).toHaveBeenCalled();
});

it('should throw an error in case of global error', async () => {
try {
databaseConnectionService.createClient.mockRejectedValueOnce(new NotFoundException());

await service.import(mockClientMetadata, mockUploadImportFileDto);
await service.import(mockClientMetadata, mockReadableStream);

fail();
} catch (e) {
Expand Down Expand Up @@ -275,15 +257,15 @@ describe('BulkImportService', () => {

await service.uploadFromTutorial(mockClientMetadata, mockUploadImportFileByPathDto);

expect(mockedFs.readFile).toHaveBeenCalledWith(join(PATH_CONFIG.homedir, mockUploadImportFileByPathDto.path));
expect(mockedFs.createReadStream).toHaveBeenCalledWith(join(PATH_CONFIG.homedir, mockUploadImportFileByPathDto.path));
});

it('should import file by path with static', async () => {
mockedFs.pathExists.mockImplementationOnce(async () => true);

await service.uploadFromTutorial(mockClientMetadata, { path: '/static/guides/_data.file' });

expect(mockedFs.readFile).toHaveBeenCalledWith(join(PATH_CONFIG.homedir, '/guides/_data.file'));
expect(mockedFs.createReadStream).toHaveBeenCalledWith(join(PATH_CONFIG.homedir, '/guides/_data.file'));
});

it('should normalize path before importing and not search for file outside home folder', async () => {
Expand All @@ -293,7 +275,7 @@ describe('BulkImportService', () => {
path: '/../../../danger',
});

expect(mockedFs.readFile).toHaveBeenCalledWith(join(PATH_CONFIG.homedir, 'danger'));
expect(mockedFs.createReadStream).toHaveBeenCalledWith(join(PATH_CONFIG.homedir, 'danger'));
});

it('should normalize path before importing and throw an error when search for file outside home folder (relative)', async () => {
Expand Down Expand Up @@ -324,19 +306,5 @@ describe('BulkImportService', () => {
expect(e.message).toEqual('Data file was not found');
}
});

it('should throw BadRequest when file size is greater then 100MB', async () => {
mockedFs.pathExists.mockImplementationOnce(async () => true);
mockedFs.stat.mockImplementationOnce(async () => ({ size: 100 * 1024 * 1024 + 1 } as fs.Stats));

try {
await service.uploadFromTutorial(mockClientMetadata, mockUploadImportFileByPathDto);

fail();
} catch (e) {
expect(e).toBeInstanceOf(BadRequestException);
expect(e.message).toEqual('Maximum file size is 100MB');
}
});
});
});
52 changes: 20 additions & 32 deletions redisinsight/api/src/modules/bulk-actions/bulk-import.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { BadRequestException, Injectable, Logger } from '@nestjs/common';
import { Readable } from 'stream';
import * as readline from 'readline';
import { wrapHttpError } from 'src/common/utils';
import { UploadImportFileDto } from 'src/modules/bulk-actions/dto/upload-import-file.dto';
import { DatabaseConnectionService } from 'src/modules/database/database-connection.service';
import { ClientMetadata } from 'src/common/models';
import { splitCliCommandLine } from 'src/utils/cli-helper';
Expand All @@ -14,7 +13,6 @@ import { BulkActionStatus, BulkActionType } from 'src/modules/bulk-actions/const
import { BulkActionsAnalyticsService } from 'src/modules/bulk-actions/bulk-actions-analytics.service';
import { UploadImportFileByPathDto } from 'src/modules/bulk-actions/dto/upload-import-file-by-path.dto';
import config from 'src/utils/config';
import { MemoryStoredFile } from 'nestjs-form-data';

const BATCH_LIMIT = 10_000;
const PATH_CONFIG = config.get('dir_path');
Expand Down Expand Up @@ -63,9 +61,9 @@ export class BulkImportService {

/**
* @param clientMetadata
* @param dto
* @param fileStream
*/
public async import(clientMetadata: ClientMetadata, dto: UploadImportFileDto): Promise<IBulkActionOverview> {
public async import(clientMetadata: ClientMetadata, fileStream: Readable): Promise<IBulkActionOverview> {
const startTime = Date.now();
const result: IBulkActionOverview = {
id: 'empty',
Expand All @@ -92,18 +90,20 @@ export class BulkImportService {
try {
client = await this.databaseConnectionService.createClient(clientMetadata);

const stream = Readable.from(dto.file.buffer);
let batch = [];

const batchResults: Promise<BulkActionSummary>[] = [];
const batchResults: BulkActionSummary[] = [];

try {
const rl = readline.createInterface({
input: fileStream,
});

await new Promise((res) => {
const rl = readline.createInterface(stream);
rl.on('line', (line) => {
for await (const line of rl) {
try {
const [command, ...args] = splitCliCommandLine((line.trim()));
if (batch.length >= BATCH_LIMIT) {
batchResults.push(this.executeBatch(client, batch));
batchResults.push(await this.executeBatch(client, batch));
batch = [];
}
if (command) {
Expand All @@ -112,20 +112,16 @@ export class BulkImportService {
} catch (e) {
parseErrors += 1;
}
});
rl.on('error', (error) => {
result.summary.errors.push(error);
result.status = BulkActionStatus.Failed;
this.analyticsService.sendActionFailed(result, error);
res(null);
});
rl.on('close', () => {
batchResults.push(this.executeBatch(client, batch));
res(null);
});
});
}
} catch (e) {
result.summary.errors.push(e);
result.status = BulkActionStatus.Failed;
this.analyticsService.sendActionFailed(result, e);
}

batchResults.push(await this.executeBatch(client, batch));

(await Promise.all(batchResults)).forEach((batchResult) => {
batchResults.forEach((batchResult) => {
result.summary.processed += batchResult.getOverview().processed;
result.summary.succeed += batchResult.getOverview().succeed;
result.summary.failed += batchResult.getOverview().failed;
Expand Down Expand Up @@ -176,15 +172,7 @@ export class BulkImportService {
throw new BadRequestException('Data file was not found');
}

if ((await fs.stat(path))?.size > 100 * 1024 * 1024) {
throw new BadRequestException('Maximum file size is 100MB');
}

const buffer = await fs.readFile(path);

return this.import(clientMetadata, {
file: { buffer } as MemoryStoredFile,
});
return this.import(clientMetadata, fs.createReadStream(path));
} catch (e) {
this.logger.error('Unable to process an import file path from tutorial', e);
throw wrapHttpError(e);
Expand Down

This file was deleted.