diff --git a/package-lock.json b/package-lock.json index e04d703dc..9c16dcdae 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "ravendb", - "version": "4.1.3", + "version": "4.1.5", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -144,13 +144,33 @@ "to-fast-properties": "^2.0.0" } }, + "@sinonjs/commons": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/@sinonjs/commons/-/commons-1.3.0.tgz", + "integrity": "sha512-j4ZwhaHmwsCb4DlDOIWnI5YyKDNMoNThsmwEpfHx6a1EpsGZ9qYLxP++LMlmBRjtGptGHFsGItJ768snllFWpA==", + "dev": true, + "requires": { + "type-detect": "4.0.8" + } + }, "@sinonjs/formatio": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/@sinonjs/formatio/-/formatio-2.0.0.tgz", - "integrity": "sha512-ls6CAMA6/5gG+O/IdsBcblvnd8qcO/l1TYoNeAzp3wcISOxlPXQEus0mLcdwazEkWjaBdaJ3TaxmNgCLWwvWzg==", + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/@sinonjs/formatio/-/formatio-3.1.0.tgz", + "integrity": "sha512-ZAR2bPHOl4Xg6eklUGpsdiIJ4+J1SNag1DHHrG/73Uz/nVwXqjgUtRPLoS+aVyieN9cSbc0E4LsU984tWcDyNg==", "dev": true, "requires": { - "samsam": "1.3.0" + "@sinonjs/samsam": "^2 || ^3" + } + }, + "@sinonjs/samsam": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/@sinonjs/samsam/-/samsam-3.1.0.tgz", + "integrity": "sha512-IXio+GWY+Q8XUjHUOgK7wx8fpvr7IFffgyXb1bnJFfX3001KmHt35Zq4tp7MXZyjJPCLPuadesDYNk41LYtVjw==", + "dev": true, + "requires": { + "@sinonjs/commons": "^1.0.2", + "array-from": "^2.1.1", + "lodash.get": "^4.4.2" } }, "@types/bluebird": { @@ -281,6 +301,12 @@ "sprintf-js": "~1.0.2" } }, + "array-from": { + "version": "2.1.1", + "resolved": "https://registry.npmjs.org/array-from/-/array-from-2.1.1.tgz", + "integrity": "sha1-z+nYwmYoudxa7MYqn12PHzUsEZU=", + "dev": true + }, "asn1": { "version": "0.2.4", "resolved": "https://registry.npmjs.org/asn1/-/asn1-0.2.4.tgz", @@ -921,9 +947,9 @@ } }, "just-extend": { - "version": "1.1.27", - "resolved": "https://registry.npmjs.org/just-extend/-/just-extend-1.1.27.tgz", - "integrity": "sha512-mJVp13Ix6gFo3SBAy9U/kL+oeZqzlYYYLQBwXVBlVzIsZwBqGREnOro24oC/8s8aox+rJhtZ2DiQof++IrkA+g==", + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/just-extend/-/just-extend-4.0.2.tgz", + "integrity": "sha512-FrLwOgm+iXrPV+5zDU6Jqu4gCRXbWEQg2O3SKONsWE4w7AXFRkryS53bpWdaL9cNol+AmR3AEYz6kn+o0fCPnw==", "dev": true }, "lodash": { @@ -945,9 +971,9 @@ "dev": true }, "lolex": { - "version": "2.7.1", - "resolved": "https://registry.npmjs.org/lolex/-/lolex-2.7.1.tgz", - "integrity": "sha512-Oo2Si3RMKV3+lV5MsSWplDQFoTClz/24S0MMHYcgGWWmFXr6TMlqcqk/l1GtH+d5wLBwNRiqGnwDRMirtFalJw==", + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lolex/-/lolex-3.1.0.tgz", + "integrity": "sha512-zFo5MgCJ0rZ7gQg69S4pqBsLURbFw11X68C18OcJjJQbqaXm2NoTrGl1IMM3TIz0/BnN1tIs2tzmmqvCsOMMjw==", "dev": true }, "lower-case": { @@ -1099,16 +1125,24 @@ "dev": true }, "nise": { - "version": "1.4.2", - "resolved": "https://registry.npmjs.org/nise/-/nise-1.4.2.tgz", - "integrity": "sha512-BxH/DxoQYYdhKgVAfqVy4pzXRZELHOIewzoesxpjYvpU+7YOalQhGNPf7wAx8pLrTNPrHRDlLOkAl8UI0ZpXjw==", + "version": "1.4.8", + "resolved": "https://registry.npmjs.org/nise/-/nise-1.4.8.tgz", + "integrity": "sha512-kGASVhuL4tlAV0tvA34yJYZIVihrUt/5bDwpp4tTluigxUr2bBlJeDXmivb6NuEdFkqvdv/Ybb9dm16PSKUhtw==", "dev": true, "requires": { - "@sinonjs/formatio": "^2.0.0", - "just-extend": "^1.1.27", + "@sinonjs/formatio": "^3.1.0", + "just-extend": "^4.0.2", "lolex": "^2.3.2", "path-to-regexp": "^1.7.0", "text-encoding": "^0.6.4" + }, + "dependencies": { + "lolex": { + "version": "2.7.5", + "resolved": "https://registry.npmjs.org/lolex/-/lolex-2.7.5.tgz", + "integrity": "sha512-l9x0+1offnKKIzYVjyXU2SiwhXDLekRzKyhnbyldPHvC7BvLPVpdNUNR2KeMAiCN2D/kLNttZgQD5WjSxuBx3Q==", + "dev": true + } } }, "no-case": { @@ -2447,12 +2481,6 @@ "resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz", "integrity": "sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg==" }, - "samsam": { - "version": "1.3.0", - "resolved": "https://registry.npmjs.org/samsam/-/samsam-1.3.0.tgz", - "integrity": "sha512-1HwIYD/8UlOtFS3QO3w7ey+SdSDFE4HRNLZoZRYVQefrOY3l17epswImeB1ijgJFQJodIaHcwkp3r/myBjFVbg==", - "dev": true - }, "semaphore": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/semaphore/-/semaphore-1.1.0.tgz", @@ -2474,30 +2502,30 @@ } }, "sinon": { - "version": "4.5.0", - "resolved": "https://registry.npmjs.org/sinon/-/sinon-4.5.0.tgz", - "integrity": "sha512-trdx+mB0VBBgoYucy6a9L7/jfQOmvGeaKZT4OOJ+lPAtI8623xyGr8wLiE4eojzBS8G9yXbhx42GHUOVLr4X2w==", + "version": "7.2.3", + "resolved": "https://registry.npmjs.org/sinon/-/sinon-7.2.3.tgz", + "integrity": "sha512-i6j7sqcLEqTYqUcMV327waI745VASvYuSuQMCjbAwlpAeuCgKZ3LtrjDxAbu+GjNQR0FEDpywtwGCIh8GicNyg==", "dev": true, "requires": { - "@sinonjs/formatio": "^2.0.0", - "diff": "^3.1.0", - "lodash.get": "^4.4.2", - "lolex": "^2.2.0", - "nise": "^1.2.0", - "supports-color": "^5.1.0", - "type-detect": "^4.0.5" + "@sinonjs/commons": "^1.3.0", + "@sinonjs/formatio": "^3.1.0", + "@sinonjs/samsam": "^3.0.2", + "diff": "^3.5.0", + "lolex": "^3.0.0", + "nise": "^1.4.8", + "supports-color": "^5.5.0" }, "dependencies": { - "has-flag": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/has-flag/-/has-flag-3.0.0.tgz", - "integrity": "sha1-tdRU3CGZriJWmfNGfloH87lVuv0=", + "diff": { + "version": "3.5.0", + "resolved": "https://registry.npmjs.org/diff/-/diff-3.5.0.tgz", + "integrity": "sha512-A46qtFgd+g7pDZinpnwiRJtxbC1hpgf0uzP3iG89scHk0AUC7A1TGxf5OiiOUv/JMZR8GOt8hL900hV0bOy5xA==", "dev": true }, "supports-color": { - "version": "5.4.0", - "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-5.4.0.tgz", - "integrity": "sha512-zjaXglF5nnWpsq470jSv6P9DwPvgLkuapYmfDm3JWOm0vkNTVF2tI4UrN2r6jH1qM/uc/WtxYY1hYoA2dOKj5w==", + "version": "5.5.0", + "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-5.5.0.tgz", + "integrity": "sha512-QjVjwdXIt408MIiAqCX4oUKsgU2EqAGzs2Ppkm4aQYbjm+ZEWEcW4SfFNTr4uMNZma0ey4f5lgLrkB0aX0QMow==", "dev": true, "requires": { "has-flag": "^3.0.0" diff --git a/package.json b/package.json index d88fbd171..349d5e1c1 100644 --- a/package.json +++ b/package.json @@ -61,7 +61,7 @@ "lodash.orderby": "^4.6.0", "mocha": "^5.2.0", "nyc": "^13.1.0", - "sinon": "^4.0.1", + "sinon": "^7.2.3", "source-map-support": "^0.5.9", "tslint": "^5.12.0", "tslint-eslint-rules": "^5.4.0", @@ -79,9 +79,9 @@ "change-case": "^3.0.2", "deprecate": "^1.1.0", "md5-hex": "^2.0.0", + "moment": "^2.23.0", "object.entries": "^1.0.4", "object.values": "^1.0.4", - "moment": "^2.23.0", "pluralize": "^4.0.0", "qs": "^6.6.0", "readable-stream": "^3.1.0", diff --git a/src/Documents/Changes/DatabaseChanges.ts b/src/Documents/Changes/DatabaseChanges.ts index f00ae240c..2082e277c 100644 --- a/src/Documents/Changes/DatabaseChanges.ts +++ b/src/Documents/Changes/DatabaseChanges.ts @@ -66,23 +66,23 @@ export class DatabaseChanges implements IDatabaseChanges { return new WebSocket(url, options); } - private _onConnectionStatusChanged() { + private async _onConnectionStatusChanged() { const acquiredSemContext = acquireSemaphore(this._semaphore); - BluebirdPromise.resolve(acquiredSemContext.promise) - .then(() => { - if (this.connected) { - this._tcs.resolve(this); - return; - } + try { + await acquiredSemContext.promise; + + if (this.connected) { + this._tcs.resolve(this); + return; + } - if (this._tcs.promise.isFulfilled()) { - this._tcs = PromiseUtil.defer(); - } - }) - .finally(() => { - acquiredSemContext.dispose(); - }); + if (this._tcs.promise.isFulfilled()) { + this._tcs = PromiseUtil.defer(); + } + } finally { + acquiredSemContext.dispose(); + } } public get connected() { @@ -201,7 +201,9 @@ export class DatabaseChanges implements IDatabaseChanges { } public dispose(): void { - Array.from(this._confirmations.values()).forEach(confirmation => confirmation.reject()); + for (const confirmation of this._confirmations.values()) { + confirmation.reject(); + } this._isCancelled = true; if (this._client) { @@ -258,37 +260,42 @@ export class DatabaseChanges implements IDatabaseChanges { } private _send(command: string, value: string, values: string[]): Promise { - return new Promise(((resolve, reject) => { + return new Promise((async (resolve, reject) => { let currentCommandId: number; - const acquiredSemContext = acquireSemaphore(this._semaphore); + const acquiredSemContext = acquireSemaphore(this._semaphore, { + timeout: 15000, + contextName: "DatabaseChanges._send()" + }); - BluebirdPromise.resolve(acquiredSemContext.promise) - .then(() => { - currentCommandId = ++this._commandId; + try { + await acquiredSemContext.promise; - const payload = { - CommandId: currentCommandId, - Command: command, - Param: value - }; + currentCommandId = ++this._commandId; - if (values && values.length) { - payload["Params"] = values; - } + const payload = { + CommandId: currentCommandId, + Command: command, + Param: value + }; - this._confirmations.set(currentCommandId, { resolve, reject }); - const payloadAsString = JSON.stringify(payload, null, 0); + if (values && values.length) { + payload["Params"] = values; + } - this._client.send(payloadAsString); - }) - .catch((err) => { - if (!this._isCancelled) { - throw err; - } - }) - .timeout(15000) - .finally(() => acquiredSemContext.dispose()); + this._confirmations.set(currentCommandId, { resolve, reject }); + const payloadAsString = JSON.stringify(payload, null, 0); + + this._client.send(payloadAsString); + } catch (err) { + if (!this._isCancelled) { + throw err; + } + } finally { + if (acquiredSemContext) { + acquiredSemContext.dispose(); + } + } })); } @@ -336,7 +343,10 @@ export class DatabaseChanges implements IDatabaseChanges { setTimeout(() => this._doWorkInternal(url), 1000); } - Array.from(this._confirmations.values()).forEach(v => v.reject()); + for (const confirm of this._confirmations.values()) { + confirm.reject(); + } + this._confirmations.clear(); }); @@ -365,8 +375,13 @@ export class DatabaseChanges implements IDatabaseChanges { const payloadParsed = JSON.parse(data) as any[]; try { - for (const message of (Array.isArray(payloadParsed) ? payloadParsed : [payloadParsed])) { + const messages = Array.isArray(payloadParsed) ? payloadParsed : [payloadParsed]; + for (const message of messages) { const type = message.Type; + if (!type) { + continue; + } + switch (type) { case "Error": const exceptionAsString = message.Exception; @@ -419,9 +434,9 @@ export class DatabaseChanges implements IDatabaseChanges { this._emitter.emit("error", e); - Array.from(this._counters.values()).forEach(state => { + for (const state of this._counters.values()) { state.error(e); - }); + } } public forAllCounters(): IChangesObservable { diff --git a/src/Documents/Identity/HiloIdGenerator.ts b/src/Documents/Identity/HiloIdGenerator.ts index cabb77889..bb7402a3f 100644 --- a/src/Documents/Identity/HiloIdGenerator.ts +++ b/src/Documents/Identity/HiloIdGenerator.ts @@ -3,7 +3,7 @@ import * as semaphore from "semaphore"; import { IDocumentStore } from "../../Documents/IDocumentStore"; import { DateUtil } from "../../Utility/DateUtil"; -import { acquireSemaphore, AcquiredSemaphoreContext } from "../../Utility/SemaphoreUtil"; +import { acquireSemaphore, SemaphoreAcquisitionContext } from "../../Utility/SemaphoreUtil"; import { StringUtil } from "../../Utility/StringUtil"; import { HiloReturnCommand } from "./Commands/HiloReturnCommand"; import { NextHiloCommand, HiLoResult } from "./Commands/NextHiloCommand"; @@ -54,7 +54,7 @@ export class HiloIdGenerator { return id; } - let acquiredSemContext: AcquiredSemaphoreContext; + let acquiredSemContext: SemaphoreAcquisitionContext; try { //local range is exhausted , need to get a new range acquiredSemContext = acquireSemaphore(this._generatorLock, { diff --git a/src/Http/RequestExecutor.ts b/src/Http/RequestExecutor.ts index c810b1130..165ea53a7 100644 --- a/src/Http/RequestExecutor.ts +++ b/src/Http/RequestExecutor.ts @@ -2,7 +2,7 @@ import * as os from "os"; import * as BluebirdPromise from "bluebird"; import * as semaphore from "semaphore"; import * as stream from "readable-stream"; -import { acquireSemaphore, AcquiredSemaphoreContext } from "../Utility/SemaphoreUtil"; +import { acquireSemaphore, SemaphoreAcquisitionContext } from "../Utility/SemaphoreUtil"; import { getLogger, ILogger } from "../Utility/LogUtil"; import { Timer } from "../Primitives/Timer"; import { ServerNode } from "./ServerNode"; @@ -35,6 +35,7 @@ import { JsonSerializer } from "../Mapping/Json/Serializer"; import { validateUri } from "../Utility/UriUtil"; import * as StreamUtil from "../Utility/StreamUtil"; import { closeHttpResponse } from "../Utility/HttpUtil"; +import { PromiseStatusTracker } from "../Utility/PromiseUtil"; const DEFAULT_REQUEST_OPTIONS = {}; @@ -145,7 +146,21 @@ export class RequestExecutor implements IDisposable { protected _disposed: boolean; - protected _firstTopologyUpdatePromise: BluebirdPromise; + private _firstTopologyUpdatePromiseInternal; + + protected get _firstTopologyUpdatePromise(): Promise { + return this._firstTopologyUpdatePromiseInternal; + } + + protected set _firstTopologyUpdatePromise(value: Promise) { + this._firstTopologyUpdatePromiseInternal = value; + + if (value) { + this._firstTopologyUpdateStatus = PromiseStatusTracker.track(value); + } + } + + protected _firstTopologyUpdateStatus: PromiseStatusTracker; protected _lastKnownUrls: string[]; @@ -258,8 +273,10 @@ export class RequestExecutor implements IDisposable { const { authOptions, documentConventions } = opts || {} as IRequestExecutorOptions; const executor = new RequestExecutor(database, authOptions, documentConventions); executor._firstTopologyUpdatePromise = executor._firstTopologyUpdate(initialUrls); + // this is just to get rid of unhandled rejection, we're handling it later on executor._firstTopologyUpdatePromise.catch(TypeUtil.NOOP); + return executor; } @@ -299,10 +316,9 @@ export class RequestExecutor implements IDisposable { private async _ensureNodeSelector(): Promise { if (this._firstTopologyUpdatePromise - && (!this._firstTopologyUpdatePromise.isFulfilled() - || this._firstTopologyUpdatePromise.isRejected())) { - - await Promise.resolve(this._firstTopologyUpdatePromise); + && (!this._firstTopologyUpdateStatus.isFullfilled() + || this._firstTopologyUpdateStatus.isRejected())) { + await this._firstTopologyUpdatePromise; } if (!this._nodeSelector) { @@ -362,7 +378,7 @@ export class RequestExecutor implements IDisposable { return; } - let semAcquiredContext: AcquiredSemaphoreContext; + let semAcquiredContext: SemaphoreAcquisitionContext; try { semAcquiredContext = acquireSemaphore(this._updateClientConfigurationSemaphore); @@ -507,83 +523,76 @@ export class RequestExecutor implements IDisposable { }); } - protected _firstTopologyUpdate(inputUrls: string[]): BluebirdPromise { + protected async _firstTopologyUpdate(inputUrls: string[]): Promise { const initialUrls: string[] = RequestExecutor._validateUrls(inputUrls, this._authOptions); const topologyUpdateErrors: Array<{ url: string, error: Error | string }> = []; - const tryUpdateTopology = (url: string, database: string): PromiseLike => { + const tryUpdateTopology = async (url: string, database: string): Promise => { const serverNode = new ServerNode({ url, database }); - return BluebirdPromise.resolve() - .then(() => this.updateTopology(serverNode, TypeUtil.MAX_INT32)) - .then(() => { - this._initializeUpdateTopologyTimer(); - this._topologyTakenFromNode = serverNode; - return true; - }) - .catch(error => { - if ((error.name as RavenErrorType) === "DatabaseDoesNotExistException") { - this._lastKnownUrls = initialUrls; - throw error; - } + try { + await this.updateTopology(serverNode, TypeUtil.MAX_INT32); + this._initializeUpdateTopologyTimer(); + this._topologyTakenFromNode = serverNode; + return true; + } catch (error) { + if ((error.name as RavenErrorType) === "DatabaseDoesNotExistException") { + this._lastKnownUrls = initialUrls; + throw error; + } - if (initialUrls.length === 0) { - this._lastKnownUrls = initialUrls; - throwError("InvalidOperationException", - `Cannot get topology from server: ${url}.`, error); - } + if (initialUrls.length === 0) { + this._lastKnownUrls = initialUrls; + throwError("InvalidOperationException", + `Cannot get topology from server: ${url}.`, error); + } - topologyUpdateErrors.push({ url, error }); - return false; - }); + topologyUpdateErrors.push({ url, error }); + return false; + } }; - const tryUpdateTopologyOnAllNodes = () => { - return initialUrls.reduce((reduceResult, nextUrl) => { - return reduceResult - .then(breakLoop => { - if (!breakLoop) { - return tryUpdateTopology(nextUrl, this._databaseName); - } + const tryUpdateTopologyOnAllNodes = async () => { + for (const url of initialUrls) { + if (await tryUpdateTopology(url, this._databaseName)) { + return; + } + } - return true; - }); - }, BluebirdPromise.resolve(false) as PromiseLike); + return false; }; - return BluebirdPromise.resolve() - .then(() => tryUpdateTopologyOnAllNodes()) - .then(() => { - const topology = new Topology(); - topology.etag = this._topologyEtag; - - let topologyNodes = this.getTopologyNodes(); - if (!topologyNodes) { - topologyNodes = initialUrls.map(url => { - const serverNode = new ServerNode({ - url, database: this._databaseName - }); - serverNode.clusterTag = "!"; - return serverNode; - }); - } + await tryUpdateTopologyOnAllNodes(); + const topology = new Topology(); + topology.etag = this._topologyEtag; - topology.nodes = topologyNodes; + let topologyNodes = this.getTopologyNodes(); + if (!topologyNodes) { + topologyNodes = initialUrls.map(url => { + const serverNode = new ServerNode({ + url, + database: this._databaseName + }); + serverNode.clusterTag = "!"; + return serverNode; + }); + } - this._nodeSelector = new NodeSelector(topology); + topology.nodes = topologyNodes; - if (initialUrls && initialUrls.length > 0) { - this._initializeUpdateTopologyTimer(); - return; - } + this._nodeSelector = new NodeSelector(topology); - this._lastKnownUrls = initialUrls; - const details: string = topologyUpdateErrors - .map(x => `${x.url} -> ${x.error && (x.error as Error).stack ? (x.error as Error).stack : x.error}`) - .join(", "); + if (initialUrls && initialUrls.length > 0) { + this._initializeUpdateTopologyTimer(); + return; + } - this._throwExceptions(details); - }); + this._lastKnownUrls = initialUrls; + const details: string = topologyUpdateErrors + .map(x => `${x.url} -> ${x.error && (x.error as Error).stack ? (x.error as Error).stack : x.error}`) + .join(", "); + + this._throwExceptions(details); } protected _throwExceptions(details: string): void { @@ -633,7 +642,8 @@ export class RequestExecutor implements IDisposable { this._log.info(`Execute command ${command.constructor.name}`); const topologyUpdate = this._firstTopologyUpdatePromise; - if ((topologyUpdate && topologyUpdate.isResolved()) || this._disableTopologyUpdates) { + const topologyUpdateStatus = this._firstTopologyUpdateStatus; + if ((topologyUpdate && topologyUpdateStatus.isResolved()) || this._disableTopologyUpdates) { const currentIndexAndNode: CurrentIndexAndNode = this.chooseNodeForRequest(command, sessionInfo); return this._executeOnSpecificNode(command, sessionInfo, { chosenNode: currentIndexAndNode.currentNode, @@ -645,43 +655,39 @@ export class RequestExecutor implements IDisposable { } } - private _unlikelyExecute( + private async _unlikelyExecute( command: RavenCommand, - topologyUpdate: BluebirdPromise, + topologyUpdate: Promise, sessionInfo: SessionInfo): Promise { - const result = BluebirdPromise.resolve() - .then(() => { - if (!this._firstTopologyUpdatePromise) { - if (!this._lastKnownUrls) { - throwError("InvalidOperationException", - "No known topology and no previously known one, cannot proceed, likely a bug"); - } - - topologyUpdate = this._firstTopologyUpdate(this._lastKnownUrls); + try { + if (!this._firstTopologyUpdatePromise) { + if (!this._lastKnownUrls) { + throwError("InvalidOperationException", + "No known topology and no previously known one, cannot proceed, likely a bug"); } - return topologyUpdate; - }) - .catch(reason => { - if (this._firstTopologyUpdatePromise === topologyUpdate) { - this._firstTopologyUpdatePromise = null; // next request will raise it - } + topologyUpdate = this._firstTopologyUpdate(this._lastKnownUrls); + } - this._log.warn(reason, "Error doing topology update."); + await topologyUpdate; - throw reason; - }) - .then(() => { - const currentIndexAndNode: CurrentIndexAndNode = this.chooseNodeForRequest(command, sessionInfo); - return this._executeOnSpecificNode(command, sessionInfo, { - chosenNode: currentIndexAndNode.currentNode, - nodeIndex: currentIndexAndNode.currentIndex, - shouldRetry: true - }); - }); + } catch (reason) { + if (this._firstTopologyUpdatePromise === topologyUpdate) { + this._firstTopologyUpdatePromise = null; // next request will raise it + } - return Promise.resolve(result); + this._log.warn(reason, "Error doing topology update."); + + throw reason; + } + + const currentIndexAndNode: CurrentIndexAndNode = this.chooseNodeForRequest(command, sessionInfo); + return this._executeOnSpecificNode(command, sessionInfo, { + chosenNode: currentIndexAndNode.currentNode, + nodeIndex: currentIndexAndNode.currentIndex, + shouldRetry: true + }); } private _getFromCache( diff --git a/src/Utility/PromiseUtil.ts b/src/Utility/PromiseUtil.ts index aa59c9082..7efc6b1c0 100644 --- a/src/Utility/PromiseUtil.ts +++ b/src/Utility/PromiseUtil.ts @@ -1,6 +1,5 @@ import * as BluebirdPromise from "bluebird"; import { ErrorFirstCallback } from "./../Types/Callbacks"; -import { VError } from "verror"; import { getError } from "../Exceptions"; export interface IDefer { @@ -64,3 +63,87 @@ export async function timeout(ms: number) { setTimeout(() => reject(getError("TimeoutException", `Timeout after ${ms} ms.`)), ms)); } + +export class AsyncTimeout { + public get promise() { + return this._promise; + } + + public get timedOut() { + return this._timedOut; + } + + private _timedOut: boolean = false; + + private _timer: NodeJS.Timer; + + private _promise: Promise; + + private _op: string; + + private _resolve: () => void; + + private _reject: (err: Error) => void; + + public constructor(ms: number, op?: string) { + this._op = op; + this._promise = new Promise((resolve, reject) => { + this._resolve = resolve; + this._reject = reject; + }); + + this._timer = setTimeout(() => { + this._timedOut = true; + this._reject(this._getTimeoutError(ms)); + }, ms); + } + + private _getTimeoutError(ms) { + const opText = this._op ? `Operation '${this._op}'` : `Operation`; + const timeoutError = getError("TimeoutError", `${opText} timed out after ${ms} ms.`); + return timeoutError; + } + + public cancel() { + if (this._timer) { + clearTimeout(this._timer); + } + + this._resolve(); + } +} + +export type PromiseStatus = "PENDING" | "RESOLVED" | "REJECTED"; +export class PromiseStatusTracker { + private _status: PromiseStatus; + private _promise: Promise; + + public constructor(promise: Promise) { + if (!promise) { + throw new Error("Promise to track cannot be null."); + } + + this._status = "PENDING"; + this._promise = promise; + + this._promise + .then(() => this._status = "RESOLVED") + .catch(() => this._status = "REJECTED"); + } + + public static track(promise: Promise): PromiseStatusTracker { + return new PromiseStatusTracker(promise); + } + + public isFullfilled() { + return this._status === "REJECTED" || this._status === "RESOLVED"; + } + + public isResolved() { + return this._status === "RESOLVED"; + } + + public isRejected() { + return this._status === "REJECTED"; + } +} diff --git a/src/Utility/SemaphoreUtil.ts b/src/Utility/SemaphoreUtil.ts index c39160968..754b026e0 100644 --- a/src/Utility/SemaphoreUtil.ts +++ b/src/Utility/SemaphoreUtil.ts @@ -1,46 +1,106 @@ import * as semaphore from "semaphore"; -import * as BluebirdPromise from "bluebird"; import { IDisposable } from "../Types/Contracts"; +import { AsyncTimeout } from "./PromiseUtil"; +import { getError } from "../Exceptions"; export interface AcquireSemaphoreOptions { timeout?: number; contextName?: string; } -export interface AcquiredSemaphoreContext extends IDisposable { +export interface SemaphoreAcquisitionContext extends IDisposable { promise: Promise; } -export function acquireSemaphore( - sem: semaphore.Semaphore, semOpts?: AcquireSemaphoreOptions) - : AcquiredSemaphoreContext { - - const contextName = semOpts ? semOpts.contextName : ""; - - const acquiredObj = { - acquired: false - }; - const acquiredSemaphorePromise = - new BluebirdPromise.Promise(resolve => { - sem.take(() => { - acquiredObj.acquired = true; - resolve(); +class SemaphoreAcquisition implements SemaphoreAcquisitionContext { + + private _acquired: boolean; + private _disposed: boolean = false; + private _timeout?: AsyncTimeout; + + private _sem: semaphore.Semaphore; + private _promise: Promise; + + public get promise() { + return this._promise; + } + + private _isTimedOut() { + return this._timeout && this._timeout.timedOut; + } + + public constructor(sem: semaphore.Semaphore, semOpts?: AcquireSemaphoreOptions) { + const contextName = semOpts ? semOpts.contextName : ""; + + if (semOpts && semOpts.timeout) { + const timedOutOpName = contextName ? `WAIT_FOR_SEM_${contextName}` : null; + this._timeout = new AsyncTimeout(semOpts.timeout, timedOutOpName); + } + + this._acquired = false; + this._sem = sem; + + this._initialize(); + } + + private _initialize() { + const sem = this._sem; + const semAcquired = + new Promise((resolve, reject) => { + sem.take(() => { + + if (this._disposed || this._isTimedOut()) { + // when we finally got here after timeout or disposal + // need to release it anyway + sem.leave(); + reject(getError( + "InvalidOperationException", + "Semaphore acquire timed out or was disposed.")); + return; + } + + this._acquired = true; + resolve(); + }); }); - }); - let p = acquiredSemaphorePromise; - if (semOpts && semOpts.timeout) { - p = p.timeout(semOpts.timeout) as BluebirdPromise; + let resultPromise = semAcquired; + + if (this._timeout) { + resultPromise = Promise.race([ + semAcquired, + this._timeout.promise + ]) + .then(() => this._timeout.cancel()); + } + + this._promise = resultPromise; } - const releaseFunc = () => { - acquiredSemaphorePromise - .then(() => sem.leave()); - }; + public dispose() { + if (this._disposed) { + return; + } + + this._disposed = true; + + if (this._timeout) { + this._timeout.cancel(); + } + + if (!this._acquired) { + return; + } + + this._sem.leave(); - const result = Promise.resolve(p); - return { - dispose: releaseFunc, - promise: result - }; + this._acquired = false; + } + +} + +export function acquireSemaphore( + sem: semaphore.Semaphore, semOpts?: AcquireSemaphoreOptions) + : SemaphoreAcquisitionContext { + return new SemaphoreAcquisition(sem, semOpts); } diff --git a/test/Executor/RequestExecutorTests.ts b/test/Executor/RequestExecutorTests.ts index 2ee6f278a..3f9b7f1ae 100644 --- a/test/Executor/RequestExecutorTests.ts +++ b/test/Executor/RequestExecutorTests.ts @@ -139,7 +139,7 @@ describe("Request executor", function () { try { executor = RequestExecutor.create([ "http://no_such_host:8080", - "http://another_offlilne:8080", + "http://another_offline:8080", url ], dbName, diff --git a/test/Ported/Server/Documents/Notifications/ChangesTest.ts b/test/Ported/Server/Documents/Notifications/ChangesTest.ts index 8f9549b31..15ebcd44b 100644 --- a/test/Ported/Server/Documents/Notifications/ChangesTest.ts +++ b/test/Ported/Server/Documents/Notifications/ChangesTest.ts @@ -1,3 +1,4 @@ + import * as assert from "assert"; import { testContext, disposeTestDocumentStore } from "../../../../Utils/TestUtil"; diff --git a/test/Utilities/SemaphoreUtilTests.ts b/test/Utilities/SemaphoreUtilTests.ts new file mode 100644 index 000000000..16dc8c713 --- /dev/null +++ b/test/Utilities/SemaphoreUtilTests.ts @@ -0,0 +1,88 @@ +import * as mocha from "mocha"; +import * as assert from "assert"; +import { User, Company, Order } from "../Assets/Entities"; +import { testContext, disposeTestDocumentStore } from "../Utils/TestUtil"; +import * as SemaphoreUtil from "../../src/Utility/SemaphoreUtil"; +import { + RavenErrorType, + IDocumentStore, +} from "../../src"; +import * as semaphore from "semaphore"; +import { delay } from "bluebird"; + +describe("SemaphoreUtil", function () { + + let sem: semaphore.Semaphore; + + beforeEach(function () { + sem = semaphore(); + }); + + it("should be able to acquire and release semaphore ", async () => { + const semContext = SemaphoreUtil.acquireSemaphore(sem); + await semContext.promise; + semContext.dispose(); + }); + + it("can timeout and try again", async () => { + assert.ok(sem.available(1)); + + const semContextLocked = SemaphoreUtil.acquireSemaphore(sem, { + contextName: "LOCK" + }); + + await semContextLocked.promise; + assert.ok(!sem.available(1)); + + const semContextTimingOut = SemaphoreUtil.acquireSemaphore(sem, { + timeout: 100, + contextName: "SHOULD_TIMEOUT" + }); + + assert.ok(!sem.available(1)); + + try { + await semContextTimingOut.promise; + } catch (err) { + assert.strictEqual(err.name, "TimeoutError"); + assert.ok(!sem.available(1)); + + const secondSemAcqAttempt = SemaphoreUtil.acquireSemaphore(sem, { + timeout: 1000, + contextName: "SHOULD_NOT_TIMEOUT" + }); + + assert.ok(!sem.available(1)); + semContextLocked.dispose(); + + await secondSemAcqAttempt.promise; + + secondSemAcqAttempt.dispose(); + + return; + } + + assert.fail("it should have timed out."); + }); + + it("should be able to acquire and release semaphore with multiple clients", async () => { + const semContext = SemaphoreUtil.acquireSemaphore(sem, { contextName: "1" }); + const semContext2 = SemaphoreUtil.acquireSemaphore(sem, { contextName: "2" }); + const semContext3 = SemaphoreUtil.acquireSemaphore(sem, { contextName: "3" }); + + async function semTryf(semContext, delayMs) { + try { + await semContext.promise; + await delay(delayMs); + } finally { + semContext.dispose(); + } + } + + await Promise.all([ + semTryf(semContext, 10), + semTryf(semContext2, 15), + semTryf(semContext3, 5), + ]); + }); +}); diff --git a/test/Utils/TestUtil.ts b/test/Utils/TestUtil.ts index c17113a98..40df1e33c 100644 --- a/test/Utils/TestUtil.ts +++ b/test/Utils/TestUtil.ts @@ -1,3 +1,7 @@ +// tslint:disable-next-line:no-var-requires +// const why = require("why-is-node-running"); +// setTimeout(why, 30000); + import { RequestExecutor } from "../../src/Http/RequestExecutor"; import * as fs from "fs"; import * as path from "path";