diff --git a/lib/autoPipelining.ts b/lib/autoPipelining.ts index 9aec27f5..7ae0e7d3 100644 --- a/lib/autoPipelining.ts +++ b/lib/autoPipelining.ts @@ -1,5 +1,5 @@ import * as PromiseContainer from "./promiseContainer"; -import { flatten, isArguments } from "./utils/lodash"; +import { flatten, isArguments, noop } from "./utils/lodash"; import * as calculateSlot from "cluster-key-slot"; import asCallback from "standard-as-callback"; @@ -120,6 +120,7 @@ export function executeWithAutoPipelining( // On cluster mode let's wait for slots to be available if (client.isCluster && !client.slots.length) { + if (client.status === "wait") client.connect().catch(noop); return new CustomPromise(function (resolve, reject) { client.delayUntilReady((err) => { if (err) { diff --git a/lib/pipeline.ts b/lib/pipeline.ts index 40aaba62..d2390792 100644 --- a/lib/pipeline.ts +++ b/lib/pipeline.ts @@ -7,6 +7,7 @@ import * as pMap from "p-map"; import * as PromiseContainer from "./promiseContainer"; import { CallbackFunction } from "./types"; import Commander from "./commander"; +import { noop } from "./utils"; /* This function derives from the cluster-key-slot implementation. @@ -16,7 +17,7 @@ import Commander from "./commander"; function generateMultiWithNodes(redis, keys) { const slot = calculateSlot(keys[0]); const target = redis._groupsBySlot[slot]; - + for (let i = 1; i < keys.length; i++) { if (redis._groupsBySlot[calculateSlot(keys[i])] !== target) { return -1; @@ -156,7 +157,8 @@ Pipeline.prototype.fillResult = function (value, position) { moved: function (slot, key) { _this.preferKey = key; _this.redis.slots[errv[1]] = [key]; - _this.redis._groupsBySlot[errv[1]] = _this.redis._groupsIds[_this.redis.slots[errv[1]].join(";")]; + _this.redis._groupsBySlot[errv[1]] = + _this.redis._groupsIds[_this.redis.slots[errv[1]].join(";")]; _this.redis.refreshSlotsCache(); _this.exec(); }, @@ -241,6 +243,7 @@ Pipeline.prototype.execBuffer = deprecate(function () { Pipeline.prototype.exec = function (callback: CallbackFunction) { // Wait for the cluster to be connected, since we need nodes information before continuing if (this.isCluster && !this.redis.slots.length) { + if (this.redis.status === "wait") this.redis.connect().catch(noop); this.redis.delayUntilReady((err) => { if (err) { callback(err); diff --git a/lib/transaction.ts b/lib/transaction.ts index a0c11901..7e39d4a1 100644 --- a/lib/transaction.ts +++ b/lib/transaction.ts @@ -1,4 +1,4 @@ -import { wrapMultiResult } from "./utils"; +import { wrapMultiResult, noop } from "./utils"; import asCallback from "standard-as-callback"; import Pipeline from "./pipeline"; import { CallbackFunction } from "./types"; @@ -30,6 +30,7 @@ export function addTransactionSupport(redis) { pipeline.exec = function (callback: CallbackFunction) { // Wait for the cluster to be connected, since we need nodes information before continuing if (this.isCluster && !this.redis.slots.length) { + if (this.redis.status === "wait") this.redis.connect().catch(noop); return asCallback( new Promise((resolve, reject) => { this.redis.delayUntilReady((err) => { diff --git a/test/functional/cluster/autopipelining.ts b/test/functional/cluster/autopipelining.ts index 82ac77ad..be3a8a33 100644 --- a/test/functional/cluster/autopipelining.ts +++ b/test/functional/cluster/autopipelining.ts @@ -594,4 +594,26 @@ describe("autoPipelining for cluster", function () { changeSlot(cluster, key1Slot, key2Slot); }); }); + + it("should support lazyConnect", async () => { + const cluster = new Cluster(hosts, { + enableAutoPipelining: true, + lazyConnect: true, + }); + + await cluster.set("foo1", "bar1"); + await cluster.set("foo5", "bar5"); + + expect( + await Promise.all([ + cluster.get("foo1"), + cluster.get("foo5"), + cluster.get("foo1"), + cluster.get("foo5"), + cluster.get("foo1"), + ]) + ).to.eql(["bar1", "bar5", "bar1", "bar5", "bar1"]); + + cluster.disconnect(); + }); }); diff --git a/test/functional/lazy_connect.ts b/test/functional/lazy_connect.ts index b31aae66..ed5a542a 100644 --- a/test/functional/lazy_connect.ts +++ b/test/functional/lazy_connect.ts @@ -2,6 +2,7 @@ import Redis from "../../lib/redis"; import { expect } from "chai"; import * as sinon from "sinon"; import { Cluster } from "../../lib"; +import Pipeline from "../../lib/pipeline"; describe("lazy connect", function () { it("should not call `connect` when init", function () { @@ -51,6 +52,28 @@ describe("lazy connect", function () { stub.restore(); }); + it("should call connect when pipeline exec", (done) => { + const stub = sinon.stub(Cluster.prototype, "connect").callsFake(() => { + stub.restore(); + done(); + }); + const cluster = new Cluster([], { lazyConnect: true }); + const pipline = new Pipeline(cluster); + pipline.get("fool1").exec(() => {}); + }); + + it("should call connect when transction exec", (done) => { + const stub = sinon.stub(Cluster.prototype, "connect").callsFake(() => { + stub.restore(); + done(); + }); + const cluster = new Cluster([], { lazyConnect: true }); + cluster + .multi() + .get("fool1") + .exec(() => {}); + }); + it('should quit before "close" being emited', function (done) { const stub = sinon .stub(Cluster.prototype, "connect")