Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cluster): lazyConnect with pipeline #1408

Merged
merged 3 commits into from
Oct 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/autoPipelining.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as PromiseContainer from "./promiseContainer";
import { flatten, isArguments } from "./utils/lodash";
import { flatten, isArguments, noop } from "./utils/lodash";
import * as calculateSlot from "cluster-key-slot";
import asCallback from "standard-as-callback";

Expand Down Expand Up @@ -110,6 +110,7 @@ export function executeWithAutoPipelining(

// On cluster mode let's wait for slots to be available
if (client.isCluster && !client.slots.length) {
if (client.status === "wait") client.connect().catch(noop);
return new CustomPromise(function (resolve, reject) {
client.delayUntilReady((err) => {
if (err) {
Expand Down
7 changes: 5 additions & 2 deletions lib/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import * as pMap from "p-map";
import * as PromiseContainer from "./promiseContainer";
import { CallbackFunction } from "./types";
import Commander from "./commander";
import { noop } from "./utils";

/*
This function derives from the cluster-key-slot implementation.
Expand All @@ -16,7 +17,7 @@ import Commander from "./commander";
function generateMultiWithNodes(redis, keys) {
const slot = calculateSlot(keys[0]);
const target = redis._groupsBySlot[slot];

for (let i = 1; i < keys.length; i++) {
if (redis._groupsBySlot[calculateSlot(keys[i])] !== target) {
return -1;
Expand Down Expand Up @@ -156,7 +157,8 @@ Pipeline.prototype.fillResult = function (value, position) {
moved: function (slot, key) {
_this.preferKey = key;
_this.redis.slots[errv[1]] = [key];
_this.redis._groupsBySlot[errv[1]] = _this.redis._groupsIds[_this.redis.slots[errv[1]].join(";")];
_this.redis._groupsBySlot[errv[1]] =
_this.redis._groupsIds[_this.redis.slots[errv[1]].join(";")];
_this.redis.refreshSlotsCache();
_this.exec();
},
Expand Down Expand Up @@ -241,6 +243,7 @@ Pipeline.prototype.execBuffer = deprecate(function () {
Pipeline.prototype.exec = function (callback: CallbackFunction) {
// Wait for the cluster to be connected, since we need nodes information before continuing
if (this.isCluster && !this.redis.slots.length) {
if (this.redis.status === "wait") this.redis.connect().catch(noop);
this.redis.delayUntilReady((err) => {
if (err) {
callback(err);
Expand Down
3 changes: 2 additions & 1 deletion lib/transaction.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { wrapMultiResult } from "./utils";
import { wrapMultiResult, noop } from "./utils";
import asCallback from "standard-as-callback";
import Pipeline from "./pipeline";
import { CallbackFunction } from "./types";
Expand Down Expand Up @@ -30,6 +30,7 @@ export function addTransactionSupport(redis) {
pipeline.exec = function (callback: CallbackFunction) {
// Wait for the cluster to be connected, since we need nodes information before continuing
if (this.isCluster && !this.redis.slots.length) {
if (this.redis.status === "wait") this.redis.connect().catch(noop);
return asCallback(
new Promise((resolve, reject) => {
this.redis.delayUntilReady((err) => {
Expand Down
22 changes: 22 additions & 0 deletions test/functional/cluster/autopipelining.ts
Original file line number Diff line number Diff line change
Expand Up @@ -594,4 +594,26 @@ describe("autoPipelining for cluster", function () {
changeSlot(cluster, key1Slot, key2Slot);
});
});

it("should support lazyConnect", async () => {
const cluster = new Cluster(hosts, {
enableAutoPipelining: true,
lazyConnect: true,
});

await cluster.set("foo1", "bar1");
await cluster.set("foo5", "bar5");

expect(
await Promise.all([
cluster.get("foo1"),
cluster.get("foo5"),
cluster.get("foo1"),
cluster.get("foo5"),
cluster.get("foo1"),
])
).to.eql(["bar1", "bar5", "bar1", "bar5", "bar1"]);

cluster.disconnect();
});
});
23 changes: 23 additions & 0 deletions test/functional/lazy_connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import Redis from "../../lib/redis";
import { expect } from "chai";
import * as sinon from "sinon";
import { Cluster } from "../../lib";
import Pipeline from "../../lib/pipeline";

describe("lazy connect", function () {
it("should not call `connect` when init", function () {
Expand Down Expand Up @@ -51,6 +52,28 @@ describe("lazy connect", function () {
stub.restore();
});

it("should call connect when pipeline exec", (done) => {
const stub = sinon.stub(Cluster.prototype, "connect").callsFake(() => {
stub.restore();
done();
});
const cluster = new Cluster([], { lazyConnect: true });
const pipline = new Pipeline(cluster);
pipline.get("fool1").exec(() => {});
});

it("should call connect when transction exec", (done) => {
const stub = sinon.stub(Cluster.prototype, "connect").callsFake(() => {
stub.restore();
done();
});
const cluster = new Cluster([], { lazyConnect: true });
cluster
.multi()
.get("fool1")
.exec(() => {});
});

it('should quit before "close" being emited', function (done) {
const stub = sinon
.stub(Cluster.prototype, "connect")
Expand Down