From 33ea4d8cb18b163cacd0e15ede2d8082430d8717 Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Mon, 2 Nov 2020 15:24:04 +0100 Subject: [PATCH 1/4] fix: Fixed autopipeline performances. --- .gitignore | 3 +- lib/autoPipelining.ts | 16 ++-------- lib/cluster/index.ts | 21 +++++++++++++ lib/pipeline.ts | 9 +++--- playground.js | 38 +++++++++++++++++++++++ test/functional/cluster/autopipelining.ts | 32 +++++++++++-------- 6 files changed, 86 insertions(+), 33 deletions(-) create mode 100644 playground.js diff --git a/.gitignore b/.gitignore index 1dfab893..c1561bc0 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ node_modules built .vscode -benchmarks/fixtures/*.txt \ No newline at end of file +benchmarks/fixtures/*.txt +.clinic \ No newline at end of file diff --git a/lib/autoPipelining.ts b/lib/autoPipelining.ts index fe1948d5..238fe114 100644 --- a/lib/autoPipelining.ts +++ b/lib/autoPipelining.ts @@ -18,19 +18,6 @@ export const notAllowedAutoPipelineCommands = [ "unpsubscribe", ]; -function findAutoPipeline( - client, - _commandName, - ...args: Array -): string { - if (!client.isCluster) { - return "main"; - } - - // We have slot information, we can improve routing by grouping slots served by the same subset of nodes - return client.slots[calculateSlot(args[0])].join(","); -} - function executeAutoPipeline(client, slotKey: string) { /* If a pipeline is already executing, keep queueing up commands @@ -116,7 +103,8 @@ export function executeWithAutoPipelining( }); } - const slotKey = findAutoPipeline(client, commandName, ...args); + // If we have slot information, we can improve routing by grouping slots served by the same subset of nodes + const slotKey = client.isCluster ? client.slots[calculateSlot(args[0])].join(",") : 'main'; if (!client._autoPipelines.has(slotKey)) { const pipeline = client.pipeline(); diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index 86bdae5f..6aa7a09f 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -69,6 +69,8 @@ class Cluster extends EventEmitter { private isRefreshing = false; public isCluster = true; private _autoPipelines: Map = new Map(); + private _groupsIds: {[key: string]: number} = {}; + private _groupsBySlot: number[] = Array(16384); private _runningAutoPipelines: Set = new Set(); private _readyDelayedCallbacks: CallbackFunction[] = []; public _addedScriptHashes: { [key: string]: any } = {}; @@ -627,6 +629,7 @@ class Cluster extends EventEmitter { } else { _this.slots[slot] = [key]; } + _this._groupsBySlot[slot] = _this._groupsIds[_this.slots[slot].join(';')]; _this.connectionPool.findOrCreate(_this.natMapper(key)); tryConnection(); debug("refreshing slot caches... (triggered by MOVED error)"); @@ -860,6 +863,24 @@ class Cluster extends EventEmitter { } } + // Assign to each node keys a numeric value to make autopipeline comparison faster. + this._groupsIds = {}; + let j = 0; + for (let i = 0; i < 16384; i++) { + const target = (this.slots[i] || []).join(';'); + + if (!target.length) { + this._groupsBySlot[i] = undefined; + continue; + } + + if (!this._groupsIds[target]) { + this._groupsIds[target] = ++j; + } + + this._groupsBySlot[i] = this._groupsIds[target]; + } + this.connectionPool.reset(nodes); callback(); }, this.options.slotsRefreshTimeout) diff --git a/lib/pipeline.ts b/lib/pipeline.ts index 11621ecd..40aaba62 100644 --- a/lib/pipeline.ts +++ b/lib/pipeline.ts @@ -15,12 +15,10 @@ import Commander from "./commander"; */ function generateMultiWithNodes(redis, keys) { const slot = calculateSlot(keys[0]); - const target = redis.slots[slot].join(","); - + const target = redis._groupsBySlot[slot]; + for (let i = 1; i < keys.length; i++) { - const currentTarget = redis.slots[calculateSlot(keys[i])].join(","); - - if (currentTarget !== target) { + if (redis._groupsBySlot[calculateSlot(keys[i])] !== target) { return -1; } } @@ -158,6 +156,7 @@ Pipeline.prototype.fillResult = function (value, position) { moved: function (slot, key) { _this.preferKey = key; _this.redis.slots[errv[1]] = [key]; + _this.redis._groupsBySlot[errv[1]] = _this.redis._groupsIds[_this.redis.slots[errv[1]].join(";")]; _this.redis.refreshSlotsCache(); _this.exec(); }, diff --git a/playground.js b/playground.js new file mode 100644 index 00000000..a0f1360c --- /dev/null +++ b/playground.js @@ -0,0 +1,38 @@ +const { readFileSync } = require('fs') +const { join } = require('path') +const Cluster = require('./built/cluster').default + +const numNodes = parseInt(process.env.NODES || '3', 10) +const iterations = parseInt(process.env.ITERATIONS || '10000', 10) +const batchSize = parseInt(process.env.BATCH_SIZE || '1000', 10) +const keys = readFileSync(join(__dirname, `benchmarks/fixtures/cluster-${numNodes}.txt`), 'utf-8').split('\n') +const configuration = Array.from(Array(numNodes), (_, i) => ({ + host: '127.0.0.1', + port: 30000 + i + 1 +})) + +const cluster = new Cluster(configuration, { enableAutoPipelining: true }) + +function command() { + const choice = Math.random() + + if (choice < 0.3) { + return 'ttl' + } else if (choice < 0.6) { + return 'exists' + } + + return 'get' +} + +async function main() { + for (let i = 0; i < 1500; i++) { + const index = Math.floor(Math.random() * keys.length) + + await Promise.all(Array.from(Array(batchSize)).map(() => cluster[command()](keys[index]))) + } +} + +main() + .then(console.log, console.error) + .finally(() => cluster.disconnect()) diff --git a/test/functional/cluster/autopipelining.ts b/test/functional/cluster/autopipelining.ts index 4039c9bf..02f933d6 100644 --- a/test/functional/cluster/autopipelining.ts +++ b/test/functional/cluster/autopipelining.ts @@ -11,6 +11,11 @@ use(require("chai-as-promised")); Instead foo1 and foo2 are usually served by different nodes in a 3-nodes cluster. */ describe("autoPipelining for cluster", function () { + function changeSlot(cluster, from, to) { + cluster.slots[from] = cluster.slots[to]; + cluster._groupsBySlot[from] = cluster._groupsBySlot[to]; + } + beforeEach(() => { const slotTable = [ [0, 5000, ["127.0.0.1", 30001]], @@ -402,11 +407,12 @@ describe("autoPipelining for cluster", function () { const promise4 = cluster.set("foo6", "bar"); // Override slots to induce a failure - const key1Slot = calculateKeySlot("foo1"); - const key2Slot = calculateKeySlot("foo2"); - const key5Slot = calculateKeySlot("foo5"); - cluster.slots[key1Slot] = cluster.slots[key2Slot]; - cluster.slots[key2Slot] = cluster.slots[key5Slot]; + const key1Slot = calculateKeySlot('foo1'); + const key2Slot = calculateKeySlot('foo2'); + const key5Slot = calculateKeySlot('foo5'); + + changeSlot(cluster, key1Slot, key2Slot); + changeSlot(cluster, key2Slot, key5Slot); await expect(promise1).to.eventually.be.rejectedWith( "All keys in the pipeline should belong to the same slots allocation group" @@ -492,11 +498,11 @@ describe("autoPipelining for cluster", function () { expect(cluster.autoPipelineQueueSize).to.eql(4); // Override slots to induce a failure - const key1Slot = calculateKeySlot("foo1"); - const key2Slot = calculateKeySlot("foo2"); - const key5Slot = calculateKeySlot("foo5"); - cluster.slots[key1Slot] = cluster.slots[key2Slot]; - cluster.slots[key2Slot] = cluster.slots[key5Slot]; + const key1Slot = calculateKeySlot('foo1'); + const key2Slot = calculateKeySlot('foo2'); + const key5Slot = calculateKeySlot('foo5'); + changeSlot(cluster, key1Slot, key2Slot); + changeSlot(cluster, key2Slot, key5Slot); }); }); @@ -541,9 +547,9 @@ describe("autoPipelining for cluster", function () { expect(cluster.autoPipelineQueueSize).to.eql(3); - const key1Slot = calculateKeySlot("foo1"); - const key2Slot = calculateKeySlot("foo2"); - cluster.slots[key1Slot] = cluster.slots[key2Slot]; + const key1Slot = calculateKeySlot('foo1'); + const key2Slot = calculateKeySlot('foo2'); + changeSlot(cluster, key1Slot, key2Slot); }); }); }); From 41099c004942d08a92dad3cd46ec16f88ed60cb4 Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Mon, 9 Nov 2020 15:24:30 +0100 Subject: [PATCH 2/4] fix: Make sure only one script caches interval is active. [#1215] --- lib/cluster/index.ts | 3 +++ lib/redis/index.ts | 4 ++++ test/functional/cluster/connect.ts | 18 ++++++++++++++++++ test/functional/connection.ts | 17 +++++++++++++++++ 4 files changed, 42 insertions(+) diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index 6aa7a09f..993ac036 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -190,7 +190,10 @@ class Cluster extends EventEmitter { return; } + // Make sure only one timer is active at a time clearInterval(this._addedScriptHashesCleanInterval); + + // Start the script cache cleaning this._addedScriptHashesCleanInterval = setInterval(() => { this._addedScriptHashes = {}; }, this.options.maxScriptsCachingTime); diff --git a/lib/redis/index.ts b/lib/redis/index.ts index 4cfc36ab..e5bf5b53 100644 --- a/lib/redis/index.ts +++ b/lib/redis/index.ts @@ -304,7 +304,11 @@ Redis.prototype.connect = function (callback) { reject(new Error("Redis is already connecting/connected")); return; } + + // Make sure only one timer is active at a time clearInterval(this._addedScriptHashesCleanInterval); + + // Start the script cache cleaning this._addedScriptHashesCleanInterval = setInterval(() => { this._addedScriptHashes = {}; }, this.options.maxScriptsCachingTime); diff --git a/test/functional/cluster/connect.ts b/test/functional/cluster/connect.ts index f1e4ba33..4d51f918 100644 --- a/test/functional/cluster/connect.ts +++ b/test/functional/cluster/connect.ts @@ -438,4 +438,22 @@ describe("cluster:disconnect", function () { done(); }); }); + + it("should clear the added script hashes interval even when no connection succeeded", function (done) { + const cluster = new Cluster([{ host: "127.0.0.1", port: "0" }], { + enableReadyCheck: false, + }); + + let attempt = 0; + cluster.on("error", function () { + if(attempt < 5) { + attempt ++; + return + } + cluster.quit(); + + expect(cluster._addedScriptHashesCleanInterval).to.be.null; + done(); + }); + }); }); diff --git a/test/functional/connection.ts b/test/functional/connection.ts index ef4a3447..8ee58f13 100644 --- a/test/functional/connection.ts +++ b/test/functional/connection.ts @@ -549,4 +549,21 @@ describe("disconnection", function () { } }); }); + + it("should clear the added script hashes interval even when no connection succeeded", function (done) { + let attempt = 0; + const redis = new Redis(0, 'localhost'); + + redis.on("error", function () { + if(attempt < 5) { + attempt ++; + return + } + + redis.quit(); + + expect(redis._addedScriptHashesCleanInterval).to.be.null; + done(); + }); + }); }); From e5877212ce822c5f70c9c8e61126670df7cf68bc Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Wed, 11 Nov 2020 13:35:11 +0100 Subject: [PATCH 3/4] feat: Minor perf improvement. --- lib/cluster/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index 993ac036..58b7c31e 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -867,7 +867,7 @@ class Cluster extends EventEmitter { } // Assign to each node keys a numeric value to make autopipeline comparison faster. - this._groupsIds = {}; + this._groupsIds = Object.create(null); let j = 0; for (let i = 0; i < 16384; i++) { const target = (this.slots[i] || []).join(';'); From 24c612d5b7e3a429da8a3e14d1e2406e19cd629d Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Sat, 12 Jun 2021 18:57:52 +0200 Subject: [PATCH 4/4] fix: Removed useless files. --- .gitignore | 2 +- playground.js | 38 -------------------------------------- 2 files changed, 1 insertion(+), 39 deletions(-) delete mode 100644 playground.js diff --git a/.gitignore b/.gitignore index c1561bc0..101a6de3 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,4 @@ built .vscode benchmarks/fixtures/*.txt -.clinic \ No newline at end of file + diff --git a/playground.js b/playground.js deleted file mode 100644 index a0f1360c..00000000 --- a/playground.js +++ /dev/null @@ -1,38 +0,0 @@ -const { readFileSync } = require('fs') -const { join } = require('path') -const Cluster = require('./built/cluster').default - -const numNodes = parseInt(process.env.NODES || '3', 10) -const iterations = parseInt(process.env.ITERATIONS || '10000', 10) -const batchSize = parseInt(process.env.BATCH_SIZE || '1000', 10) -const keys = readFileSync(join(__dirname, `benchmarks/fixtures/cluster-${numNodes}.txt`), 'utf-8').split('\n') -const configuration = Array.from(Array(numNodes), (_, i) => ({ - host: '127.0.0.1', - port: 30000 + i + 1 -})) - -const cluster = new Cluster(configuration, { enableAutoPipelining: true }) - -function command() { - const choice = Math.random() - - if (choice < 0.3) { - return 'ttl' - } else if (choice < 0.6) { - return 'exists' - } - - return 'get' -} - -async function main() { - for (let i = 0; i < 1500; i++) { - const index = Math.floor(Math.random() * keys.length) - - await Promise.all(Array.from(Array(batchSize)).map(() => cluster[command()](keys[index]))) - } -} - -main() - .then(console.log, console.error) - .finally(() => cluster.disconnect())