diff --git a/perf/bulkInsert.js b/perf/bulkInsert.js new file mode 100644 index 000000000..57a6ea7d3 --- /dev/null +++ b/perf/bulkInsert.js @@ -0,0 +1,129 @@ +const { + DocumentStore, + DeleteDatabasesOperation, + CreateDatabaseOperation, + RequestExecutor +} = require("../dist"); +const { + bench, + settings +} = require("./common"); + +const nodbStore = new DocumentStore(settings.urls); +nodbStore.initialize(); + +// RequestExecutor.requestPostProcessor = (req) => { +// req.proxy = "http://127.0.0.1:8888"; +// }; + +const store = new DocumentStore(settings.urls, settings.database); +store.initialize(); + +class Order { + constructor(opts) { + if (opts) { + Object.assign(this, opts); + } + } +} + +let data; + +(async function main() { + + const getBenchOpts = (n) => ({ + async before() { + const dataGen = getData(); + data = Object.keys(Array.apply(0,Array(n))).map(x => new Order(dataGen.next().value)); + try { + await nodbStore.maintenance.server.send(new DeleteDatabasesOperation({ + databaseNames: [settings.database], + hardDelete: true + })); + } finally { + await nodbStore.maintenance.server.send(new CreateDatabaseOperation({ + databaseName: settings.database + })); + } + }, + async after() { + await nodbStore.maintenance.server.send(new DeleteDatabasesOperation({ + databaseNames: [settings.database], + hardDelete: true + })); + } + }); + + try { + const name = "bulk-insert-2018-10-18-pipeline"; + await bench(name, 10, bulkInsertPipeline, getBenchOpts(1000)); + await bench(name, 50, bulkInsertPipeline, getBenchOpts(1000)); + await bench(name, 100, bulkInsertPipeline, getBenchOpts(1000)); + } finally { + store.dispose(); + nodbStore.dispose(); + } + +}()); + +function randomDate() { + return new Date(2018, Math.floor(Math.random() * 11), Math.floor(Math.random() * 25)); +} + +function randomInt(max = 100) { + return Math.floor(Math.random() * max); +} + +function* getData() { + let i = 1; + while (true) { + i++; + yield new Order({ + "Id": "orders/" + i, + "Name": "Order #" + i, + "Company": "companies/58-A", + "Employee": "employees/2-A", + "Freight": randomInt(), + "Lines": [{ + "Discount": 0, + "PricePerUnit": randomInt(), + "Product": "products/11-A", + "ProductName": "Queso Cabrales", + "Quantity": 10 + }, + { + "Discount": 0, + "PricePerUnit": 4.5, + "Product": "products/24-A", + "ProductName": "Guaraná Fantástica", + "Quantity": randomInt() + } + ], + "OrderedAt": randomDate(), + "RequireAt": randomDate(), + "ShipTo": { + "City": "México D.F.", + "Country": "Mexico", + "Line1": "Calle Dr. Jorge Cash 321", + "Line2": null, + "Location": { + "Latitude": Math.random() * 100, + "Longitude": Math.random() * 100 + }, + "PostalCode": "05033", + "Region": null + }, + "ShipVia": "shippers/2-A", + "ShippedAt": null + }); + } +} + +async function bulkInsertPipeline() { + const bulk = store.bulkInsert(); + for (const item of data) { + await bulk.store(item); + } + + await bulk.finish(); +} \ No newline at end of file diff --git a/perf/common.js b/perf/common.js index 4ed0ac8d7..c4ddd79bb 100644 --- a/perf/common.js +++ b/perf/common.js @@ -1,8 +1,36 @@ -import { DocumentStore } from "../src"; - // tslint:disable-next-line:no-var-requires const settings = require("./settings.json"); -export function getStore() { - return new DocumentStore(settings.urls, settings.database); -} \ No newline at end of file +async function bench(name, attempts, run, opts) { + const benchName = `${name} x${ attempts }`; + + if (opts && opts.before) { + try { + await opts.before(); + } catch (err) { + console.log("BENCH BEFORE", err); + } + } + + console.time(benchName); + try { + console.profile(benchName); + for (let n = 0; n < attempts; n++) { + await run(); + } + console.profileEnd(benchName); + } catch (err) { + console.log("BENCH ERROR", err); + } + console.timeEnd(benchName); + + if (opts && opts.after) { + try { + await opts.after(); + } catch (err) { + console.log("BENCH AFTER", err); + } + } +} + +module.exports = { settings, bench }; \ No newline at end of file diff --git a/perf/loadPipeline.js b/perf/loadPipeline.js index f6fedbd9d..391f63ade 100644 --- a/perf/loadPipeline.js +++ b/perf/loadPipeline.js @@ -12,35 +12,19 @@ const streamValues = require("stream-json/streamers/StreamValues") const StreamUtil = require("../dist/Utility/StreamUtil"); const stream = require("readable-stream"); const Asm = require('stream-json/Assembler'); +const { bench } = require("./common"); const store = new DocumentStore("http://localhost:8080", "Perf"); store.initialize(); -async function bench(name, attempts, run) { - const benchName = `${name} x${ attempts }`; - console.time(benchName); - for (let n = 0; n < attempts; n++) { - await run(); - } - console.timeEnd(benchName); -} - (async function main() { { - const name = "4.0.4-load-full-pipeline"; + const name = "load-full-pipeline"; await bench(name, 10, loadPipeline); await bench(name, 50, loadPipeline); await bench(name, 100, loadPipeline); } - // { - // const name = "stream-json-with-proper-casing"; - // // enhancedStreamJson(); - // await bench(name, 10, enhancedStreamJson); - // await bench(name, 50, enhancedStreamJson); - // await bench(name, 100, enhancedStreamJson); - // } - store.dispose(); }()); @@ -68,8 +52,6 @@ async function rawStreamJson() { await donePromise; } - - async function enhancedStreamJson() { const dataStream = fs.createReadStream("./data/load_data.json"); const streams = [ @@ -96,5 +78,4 @@ async function enhancedStreamJson() { }); await StreamUtil.pipelineAsync(streams); const result = await donePromise; - // console.log(JSON.stringify(result, null, 2)); } \ No newline at end of file diff --git a/perf/results/bulk-insert.md b/perf/results/bulk-insert.md new file mode 100644 index 000000000..55c9091b5 --- /dev/null +++ b/perf/results/bulk-insert.md @@ -0,0 +1,17 @@ +# BULK INSERT + +## Current 4.0.3 - 2018-10-16 + +``` +bulk-insert-2018-16-10-pipeline x10: 21635.223ms +``` + +# 4.0.4 optimizations - 2018-10-18 + +Buffer.concat() usage and redundant buffering logic removal. + +``` +bulk-insert-2018-16-10-pipeline x10: 2490.231ms +bulk-insert-2018-16-10-pipeline x50: 8280.333ms +bulk-insert-2018-16-10-pipeline x100: 15802.916ms +``` \ No newline at end of file diff --git a/src/Documents/BulkInsertOperation.ts b/src/Documents/BulkInsertOperation.ts index 2da29e5a3..2962c99be 100644 --- a/src/Documents/BulkInsertOperation.ts +++ b/src/Documents/BulkInsertOperation.ts @@ -25,22 +25,34 @@ export class BulkInsertOperation { private readonly _generateEntityIdOnTheClient: GenerateEntityIdOnTheClient; private readonly _requestExecutor: RequestExecutor; - private _bulkInsertExecuteTask: Promise; - private _pipelinePromise: Promise; + private _bulkInsertExecuteTask: Promise; private _completedWithError = false; - private _completedWithPipelineError = false; private _first: boolean = true; private _operationId = -1; private _useCompression: boolean = false; + private _bulkInsertAborted: Promise; + private _abortReject: Function; + private _aborted: boolean; + private _currentWriter: stream.Readable; + private _requestBodyStream: stream.PassThrough; + private _pipelineFinished: Promise; + + private static readonly _maxSizeInBuffer = 1024 * 1024; + public constructor(database: string, store: IDocumentStore) { this._conventions = store.conventions; this._requestExecutor = store.getRequestExecutor(database); this._generateEntityIdOnTheClient = new GenerateEntityIdOnTheClient(this._requestExecutor.conventions, entity => this._requestExecutor.conventions.generateDocumentId(database, entity)); + this._bulkInsertAborted = new Promise((_, reject) => this._abortReject = reject); + + this._bulkInsertAborted.catch(err => { + // we're awaiting it elsewhere + }); } get useCompression(): boolean { @@ -130,7 +142,7 @@ export class BulkInsertOperation { private async _store( entity: object, { id, getId, metadata }: { id: string, getId: boolean, metadata: IMetadataDictionary }) { - + id = getId ? await this._getId(entity) : id; BulkInsertOperation._verifyValidId(id); @@ -139,18 +151,10 @@ export class BulkInsertOperation { await this._ensureStream(); } - if (this._completedWithError) { + if (this._completedWithError || this._aborted) { await this._checkIfBulkInsertWasAborted(); } - if (this._completedWithPipelineError) { - try { - await this._pipelinePromise; - } catch (error) { - throwError("BulkInsertStreamError", "Failed to execute bulk insert", error); - } - } - if (!metadata) { metadata = createMetadataDictionary({ raw: {} @@ -172,16 +176,12 @@ export class BulkInsertOperation { } } - if (!this._first) { + if (this._first) { + this._first = false; + } else { this._currentWriter.push(","); } - this._first = false; - - this._currentWriter.push("{'Id':'"); - this._currentWriter.push(id); - this._currentWriter.push("','Type':'PUT','Document':"); - const documentInfo = new DocumentInfo(); documentInfo.metadataInstance = metadata; let json = EntityToJson.convertEntityToJson(entity, this._conventions, documentInfo); @@ -191,9 +191,7 @@ export class BulkInsertOperation { } const jsonString = JsonSerializer.getDefault().serialize(json); - this._currentWriter.push(jsonString); - this._currentWriter.push("}"); - + this._currentWriter.push(`{"Id":"${id}","Type":"PUT","Document":${jsonString}}`); } private async _checkIfBulkInsertWasAborted() { @@ -202,14 +200,16 @@ export class BulkInsertOperation { await this._bulkInsertExecuteTask; } catch (error) { await this._throwBulkInsertAborted(error); + } finally { + this._currentWriter.emit("end"); } } - if (this._completedWithPipelineError) { + if (this._aborted) { try { - await this._pipelinePromise; - } catch (error) { - throwError("BulkInsertStreamError", "Failed to execute bulk insert.", error); + await this._bulkInsertAborted; + } finally { + this._currentWriter.emit("end"); } } } @@ -227,6 +227,9 @@ export class BulkInsertOperation { private async _getExceptionFromOperation(): Promise { const stateRequest = new GetOperationStateCommand(this._conventions, this._operationId); await this._requestExecutor.execute(stateRequest); + if (!stateRequest.result) { + return null; + } const result = stateRequest.result["result"]; @@ -237,87 +240,58 @@ export class BulkInsertOperation { return getError("BulkInsertAbortedException", result.error); } - private _currentWriter: stream.Readable; - private _requestBodyStream: stream.Readable; - private static readonly _maxSizeInBuffer = 1024 * 1024; - private async _ensureStream() { try { - this._currentWriter = new stream.PassThrough(); - const streams: stream.Stream[] = [this._currentWriter]; - - this._requestBodyStream = this._getBufferingWriteable(); - streams.push(this._requestBodyStream); - + this._requestBodyStream = new stream.PassThrough(); const bulkCommand = new BulkInsertCommand(this._operationId, this._requestBodyStream, this._useCompression); - this._bulkInsertExecuteTask = this._requestExecutor.execute(bulkCommand); - - this._bulkInsertExecuteTask.catch(() => { - this._completedWithError = true; - }); + const bulkCommandPromise = this._requestExecutor.execute(bulkCommand); + this._pipelineFinished = StreamUtil.pipelineAsync(this._currentWriter, this._requestBodyStream); this._currentWriter.push("["); - this._pipelinePromise = StreamUtil.pipelineAsync(...streams); + this._bulkInsertExecuteTask = Promise.all([ + bulkCommandPromise, + this._pipelineFinished + ]); - this._pipelinePromise.catch(err => { - this._completedWithPipelineError = true; - }); + this._bulkInsertExecuteTask + .catch(() => this._completedWithError = true); + } catch (e) { throwError("RavenException", "Unable to open bulk insert stream.", e); } } - private _getBufferingWriteable(): stream.Transform { - let buffer = Buffer.from([]); - return new stream.Transform({ - transform(chunk, enc, callback) { - buffer = Buffer.concat([buffer, chunk]); - - if (buffer.length > BulkInsertOperation._maxSizeInBuffer) { - this.push(buffer, enc); - buffer = Buffer.from([]); - } - - callback(); - }, - flush(callback) { - if (buffer.length) { - // push remaining part - this.push(buffer); - } - this.push(null); - - callback(); - } - }); - } - public async abort(): Promise; public async abort(callback: AbstractCallback): Promise; public async abort(callback?: AbstractCallback): Promise { const abortPromise = this._abortAsync(); passResultToCallback(abortPromise, callback); - return abortPromise; + return await abortPromise; } private async _abortAsync(): Promise { - if (this._operationId === -1) { - return; // nothing was done, nothing to kill - } + this._aborted = true; - await this._waitForId(); + if (this._operationId !== -1) { + await this._waitForId(); - try { - await this._requestExecutor.execute(new KillOperationCommand(this._operationId)); - } catch (err) { - throwError("BulkInsertAbortedException", - "Unable to kill ths bulk insert operation, because it was not found on the server.", err); + try { + await this._requestExecutor.execute(new KillOperationCommand(this._operationId)); + } catch (err) { + const bulkInsertError = getError("BulkInsertAbortedException", + "Unable to kill bulk insert operation, because it was not found on the server.", err); + this._abortReject(bulkInsertError); + return; + } } + + this._abortReject(getError( + "BulkInsertAbortedException", "Bulk insert was aborted by the user.")); } public async finish(): Promise; @@ -339,14 +313,17 @@ export class BulkInsertOperation { return; } - if (this._bulkInsertExecuteTask !== null) { + if (this._completedWithError || this._aborted) { await this._checkIfBulkInsertWasAborted(); } - return Promise.all([this._bulkInsertExecuteTask, this._pipelinePromise]) + return Promise.race( + [ + this._bulkInsertExecuteTask || Promise.resolve(), + this._bulkInsertAborted || Promise.resolve() + ]) // tslint:disable-next-line:no-empty - .then(() => { - }); + .then(() => {}); } private readonly _conventions: DocumentConventions; @@ -383,10 +360,9 @@ export class BulkInsertCommand extends RavenCommand { public createRequest(node: ServerNode): HttpRequestParameters { const uri = node.url + "/databases/" + node.database + "/bulk_insert?id=" + this._id; - const headers = this._headers().typeAppJson().build(); - - return { //TODO: useCompression ? new GzipCompressingEntity(_stream) : _stream); + // TODO: useCompression ? new GzipCompressingEntity(_stream) : _stream); + return { method: "POST", uri, body: this._stream, diff --git a/src/Http/RavenCommand.ts b/src/Http/RavenCommand.ts index 014cfd6d2..be0d91086 100644 --- a/src/Http/RavenCommand.ts +++ b/src/Http/RavenCommand.ts @@ -8,13 +8,10 @@ import { getLogger } from "../Utility/LogUtil"; import { throwError } from "../Exceptions"; import { IRavenObject } from "../Types/IRavenObject"; import { getEtagHeader, HeadersBuilder, closeHttpResponse } from "../Utility/HttpUtil"; -import { Mapping } from "../Mapping"; -import { TypesAwareObjectMapper, TypeInfo } from "../Mapping/ObjectMapper"; +import { TypeInfo } from "../Mapping/ObjectMapper"; import { ObjectTypeDescriptor } from ".."; import { JsonSerializer } from "../Mapping/Json/Serializer"; import { RavenCommandResponsePipeline } from "./RavenCommandResponsePipeline"; -import { pick } from "stream-json/filters/Pick"; -import { ignore } from "stream-json/filters/Ignore"; import { DocumentConventions } from "../Documents/Conventions/DocumentConventions"; const log = getLogger({ module: "RavenCommand" }); diff --git a/src/Mapping/Json/Streams/TransformJsonKeysProfiles.ts b/src/Mapping/Json/Streams/TransformJsonKeysProfiles.ts index d43fe47b1..af0336b0b 100644 --- a/src/Mapping/Json/Streams/TransformJsonKeysProfiles.ts +++ b/src/Mapping/Json/Streams/TransformJsonKeysProfiles.ts @@ -318,4 +318,4 @@ function handleMetadataJsonKeys(key: string, stack: string[], stackLength: numbe } return null; -} \ No newline at end of file +} diff --git a/test/Ported/BulkInsert/BulkInsertsTest.ts b/test/Ported/BulkInsert/BulkInsertsTest.ts index b411e147b..5ba048e6b 100644 --- a/test/Ported/BulkInsert/BulkInsertsTest.ts +++ b/test/Ported/BulkInsert/BulkInsertsTest.ts @@ -6,6 +6,7 @@ import DocumentStore, { } from "../../../src"; import { createMetadataDictionary } from "../../../src/Mapping/MetadataAsDictionary"; import { CONSTANTS } from "../../../src/Constants"; +import * as BluebirdPromise from "bluebird"; import { DateUtil } from "../../../src/Utility/DateUtil"; describe("bulk insert", function () { @@ -57,12 +58,16 @@ describe("bulk insert", function () { assert.strictEqual(doc3.name, "Mega John"); assert.strictEqual(doc4.name, "Mega Jane"); + const docsInsertedCount = await session + .query({ collection: "fooBars" }) + .count(); + assert.strictEqual(docsInsertedCount, 4); } finally { session.dispose(); } }); - it("can be killed to early", async () => { + it("can be killed early before making connection", async () => { try { const bulkInsert = store.bulkInsert(); await bulkInsert.store(new FooBar()); @@ -71,7 +76,48 @@ describe("bulk insert", function () { assert.fail("Should have thrown."); } catch (error) { - assert.strictEqual(error.name, "BulkInsertAbortedException"); + assert.strictEqual(error.name, "BulkInsertAbortedException", error.message); + const bulkInsertCanceled = /TaskCanceledException/i.test(error.message); + const bulkInsertNotRegisteredYet = + /Unable to kill bulk insert operation, because it was not found on the server./i.test(error.message); + const bulkInsertSuccessfullyKilled = + /Bulk insert was aborted by the user./i.test(error.message); + + // this one's racy, so it's one or the other + assert.ok( + bulkInsertCanceled + || bulkInsertNotRegisteredYet + || bulkInsertSuccessfullyKilled, + "Unexpected error message:" + error.message); + } + }); + + it("can be aborted after a while", async () => { + try { + const bulkInsert = store.bulkInsert(); + await bulkInsert.store(new FooBar()); + await bulkInsert.store(new FooBar()); + await bulkInsert.store(new FooBar()); + await bulkInsert.store(new FooBar()); + await BluebirdPromise.delay(500); + await bulkInsert.abort(); + await bulkInsert.store(new FooBar()); + + assert.fail("Should have thrown."); + } catch (error) { + assert.strictEqual(error.name, "BulkInsertAbortedException", error.message); + const bulkInsertCanceled = /TaskCanceledException/i.test(error.message); + const bulkInsertNotRegisteredYet = + /Unable to kill bulk insert operation, because it was not found on the server./i.test(error.message); + const bulkInsertSuccessfullyKilled = + /Bulk insert was aborted by the user./i.test(error.message); + + // this one's racy, so it's one or the other + assert.ok( + bulkInsertCanceled + || bulkInsertNotRegisteredYet + || bulkInsertSuccessfullyKilled, + "Unexpected error message:" + error.message); } }); diff --git a/test/Ported/Subscriptions/SecuredSubscriptionsBasicTest.ts b/test/Ported/Subscriptions/SecuredSubscriptionsBasicTest.ts index 2578fd246..73198406e 100644 --- a/test/Ported/Subscriptions/SecuredSubscriptionsBasicTest.ts +++ b/test/Ported/Subscriptions/SecuredSubscriptionsBasicTest.ts @@ -21,7 +21,6 @@ describe("SecuredSubscriptionsBasicTest", function () { afterEach(async () => await disposeTestDocumentStore(store)); - it("should stream all documents after subscription creation", async function () { store.initialize(); { diff --git a/test/Ported/Subscriptions/SubscriptionsBasicTest.ts b/test/Ported/Subscriptions/SubscriptionsBasicTest.ts index e02f7ca8a..e0e3cd767 100644 --- a/test/Ported/Subscriptions/SubscriptionsBasicTest.ts +++ b/test/Ported/Subscriptions/SubscriptionsBasicTest.ts @@ -28,7 +28,6 @@ describe("SubscriptionsBasicTest", function () { afterEach(async () => await disposeTestDocumentStore(store)); - it("can delete subscription", async function() { const id1 = await store.subscriptions.create(User); const id2 = await store.subscriptions.create(User); @@ -59,7 +58,7 @@ describe("SubscriptionsBasicTest", function () { subscription.on("error", err => { resolve(err); }); - }) + }); assert.strictEqual( error.name, "SubscriptionDoesNotExistException", "Expected another error but got:" + error.stack);