From 2161a29721597e3e242c5e4199616367cfdfeb4c Mon Sep 17 00:00:00 2001 From: jseagull Date: Fri, 6 Aug 2021 09:50:41 +0800 Subject: [PATCH 1/3] fix(cluster): lazyConnect with pipeline --- lib/autoPipelining.ts | 1 + lib/pipeline.ts | 6 ++++-- lib/transaction.ts | 1 + 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/autoPipelining.ts b/lib/autoPipelining.ts index 45df0fe2..4ace497d 100644 --- a/lib/autoPipelining.ts +++ b/lib/autoPipelining.ts @@ -110,6 +110,7 @@ export function executeWithAutoPipelining( // On cluster mode let's wait for slots to be available if (client.isCluster && !client.slots.length) { + if (client.status === "wait") client.connect(); return new CustomPromise(function (resolve, reject) { client.delayUntilReady((err) => { if (err) { diff --git a/lib/pipeline.ts b/lib/pipeline.ts index 40aaba62..f47b6eeb 100644 --- a/lib/pipeline.ts +++ b/lib/pipeline.ts @@ -16,7 +16,7 @@ import Commander from "./commander"; function generateMultiWithNodes(redis, keys) { const slot = calculateSlot(keys[0]); const target = redis._groupsBySlot[slot]; - + for (let i = 1; i < keys.length; i++) { if (redis._groupsBySlot[calculateSlot(keys[i])] !== target) { return -1; @@ -156,7 +156,8 @@ Pipeline.prototype.fillResult = function (value, position) { moved: function (slot, key) { _this.preferKey = key; _this.redis.slots[errv[1]] = [key]; - _this.redis._groupsBySlot[errv[1]] = _this.redis._groupsIds[_this.redis.slots[errv[1]].join(";")]; + _this.redis._groupsBySlot[errv[1]] = + _this.redis._groupsIds[_this.redis.slots[errv[1]].join(";")]; _this.redis.refreshSlotsCache(); _this.exec(); }, @@ -241,6 +242,7 @@ Pipeline.prototype.execBuffer = deprecate(function () { Pipeline.prototype.exec = function (callback: CallbackFunction) { // Wait for the cluster to be connected, since we need nodes information before continuing if (this.isCluster && !this.redis.slots.length) { + if (this.redis.status === "wait") this.redis.connect(); this.redis.delayUntilReady((err) => { if (err) { callback(err); diff --git a/lib/transaction.ts b/lib/transaction.ts index a0c11901..22df4fbe 100644 --- a/lib/transaction.ts +++ b/lib/transaction.ts @@ -30,6 +30,7 @@ export function addTransactionSupport(redis) { pipeline.exec = function (callback: CallbackFunction) { // Wait for the cluster to be connected, since we need nodes information before continuing if (this.isCluster && !this.redis.slots.length) { + if (this.redis.status === "wait") this.redis.connect(); return asCallback( new Promise((resolve, reject) => { this.redis.delayUntilReady((err) => { From 283f742d72a33ab37ac9c5b9b3a4d1508ed12855 Mon Sep 17 00:00:00 2001 From: jseagull Date: Wed, 22 Sep 2021 09:32:09 +0800 Subject: [PATCH 2/3] add test for cluster lazyConnect --- test/functional/cluster/autopipelining.ts | 22 ++++++++++++++++++++++ test/functional/lazy_connect.ts | 23 +++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/test/functional/cluster/autopipelining.ts b/test/functional/cluster/autopipelining.ts index 82ac77ad..be3a8a33 100644 --- a/test/functional/cluster/autopipelining.ts +++ b/test/functional/cluster/autopipelining.ts @@ -594,4 +594,26 @@ describe("autoPipelining for cluster", function () { changeSlot(cluster, key1Slot, key2Slot); }); }); + + it("should support lazyConnect", async () => { + const cluster = new Cluster(hosts, { + enableAutoPipelining: true, + lazyConnect: true, + }); + + await cluster.set("foo1", "bar1"); + await cluster.set("foo5", "bar5"); + + expect( + await Promise.all([ + cluster.get("foo1"), + cluster.get("foo5"), + cluster.get("foo1"), + cluster.get("foo5"), + cluster.get("foo1"), + ]) + ).to.eql(["bar1", "bar5", "bar1", "bar5", "bar1"]); + + cluster.disconnect(); + }); }); diff --git a/test/functional/lazy_connect.ts b/test/functional/lazy_connect.ts index b31aae66..ed5a542a 100644 --- a/test/functional/lazy_connect.ts +++ b/test/functional/lazy_connect.ts @@ -2,6 +2,7 @@ import Redis from "../../lib/redis"; import { expect } from "chai"; import * as sinon from "sinon"; import { Cluster } from "../../lib"; +import Pipeline from "../../lib/pipeline"; describe("lazy connect", function () { it("should not call `connect` when init", function () { @@ -51,6 +52,28 @@ describe("lazy connect", function () { stub.restore(); }); + it("should call connect when pipeline exec", (done) => { + const stub = sinon.stub(Cluster.prototype, "connect").callsFake(() => { + stub.restore(); + done(); + }); + const cluster = new Cluster([], { lazyConnect: true }); + const pipline = new Pipeline(cluster); + pipline.get("fool1").exec(() => {}); + }); + + it("should call connect when transction exec", (done) => { + const stub = sinon.stub(Cluster.prototype, "connect").callsFake(() => { + stub.restore(); + done(); + }); + const cluster = new Cluster([], { lazyConnect: true }); + cluster + .multi() + .get("fool1") + .exec(() => {}); + }); + it('should quit before "close" being emited', function (done) { const stub = sinon .stub(Cluster.prototype, "connect") From 5397ff180b52e0f8cf09e9e8028ca08489fba3a5 Mon Sep 17 00:00:00 2001 From: jseagull Date: Fri, 24 Sep 2021 12:19:30 +0800 Subject: [PATCH 3/3] catch connect error --- lib/autoPipelining.ts | 4 ++-- lib/pipeline.ts | 3 ++- lib/transaction.ts | 4 ++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/autoPipelining.ts b/lib/autoPipelining.ts index 4ace497d..7a12bb0c 100644 --- a/lib/autoPipelining.ts +++ b/lib/autoPipelining.ts @@ -1,5 +1,5 @@ import * as PromiseContainer from "./promiseContainer"; -import { flatten, isArguments } from "./utils/lodash"; +import { flatten, isArguments, noop } from "./utils/lodash"; import * as calculateSlot from "cluster-key-slot"; import asCallback from "standard-as-callback"; @@ -110,7 +110,7 @@ export function executeWithAutoPipelining( // On cluster mode let's wait for slots to be available if (client.isCluster && !client.slots.length) { - if (client.status === "wait") client.connect(); + if (client.status === "wait") client.connect().catch(noop); return new CustomPromise(function (resolve, reject) { client.delayUntilReady((err) => { if (err) { diff --git a/lib/pipeline.ts b/lib/pipeline.ts index f47b6eeb..d2390792 100644 --- a/lib/pipeline.ts +++ b/lib/pipeline.ts @@ -7,6 +7,7 @@ import * as pMap from "p-map"; import * as PromiseContainer from "./promiseContainer"; import { CallbackFunction } from "./types"; import Commander from "./commander"; +import { noop } from "./utils"; /* This function derives from the cluster-key-slot implementation. @@ -242,7 +243,7 @@ Pipeline.prototype.execBuffer = deprecate(function () { Pipeline.prototype.exec = function (callback: CallbackFunction) { // Wait for the cluster to be connected, since we need nodes information before continuing if (this.isCluster && !this.redis.slots.length) { - if (this.redis.status === "wait") this.redis.connect(); + if (this.redis.status === "wait") this.redis.connect().catch(noop); this.redis.delayUntilReady((err) => { if (err) { callback(err); diff --git a/lib/transaction.ts b/lib/transaction.ts index 22df4fbe..7e39d4a1 100644 --- a/lib/transaction.ts +++ b/lib/transaction.ts @@ -1,4 +1,4 @@ -import { wrapMultiResult } from "./utils"; +import { wrapMultiResult, noop } from "./utils"; import asCallback from "standard-as-callback"; import Pipeline from "./pipeline"; import { CallbackFunction } from "./types"; @@ -30,7 +30,7 @@ export function addTransactionSupport(redis) { pipeline.exec = function (callback: CallbackFunction) { // Wait for the cluster to be connected, since we need nodes information before continuing if (this.isCluster && !this.redis.slots.length) { - if (this.redis.status === "wait") this.redis.connect(); + if (this.redis.status === "wait") this.redis.connect().catch(noop); return asCallback( new Promise((resolve, reject) => { this.redis.delayUntilReady((err) => {