Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cluster): autopipeline when prefix is used #1335

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion lib/autoPipelining.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ function findAutoPipeline(
}

// We have slot information, we can improve routing by grouping slots served by the same subset of nodes
return client.slots[calculateSlot(args[0])].join(",");
const prefixedKey = client.options.keyPrefix
Copy link
Collaborator

@TysonAndre TysonAndre Apr 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm working on the same codebase as the PR author using ioredis, so I decided to take a look. I'm only a bit familiar with ioredis

This approach seems reasonable.

Looking at lib/commander.ts

  • When there is NO pipelining, new Command is used to transform args (e.g. add key prefixes) in its constructor
  • When there is pipelining, args are passed without any prefixing/transformation to the autoPipelining.ts - i.e. transforming here should be appropriate
    // No auto pipeline, use regular command sending
    if (!shouldUseAutoPipelining(this, commandName)) {
      return this.sendCommand(
        new Command(commandName, args, options, callback)
      );
    }

    // Create a new pipeline and make sure it's scheduled
    return executeWithAutoPipelining(this, commandName, args, callback);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at lib/pipeline.ts for the "All the keys in a pipeline command should belong to the same slot" I don't see any issues - that checks if everything would go to the same shard using the prefixed keys (from Command.getKeys, Command.iterateKeys)

  • e.g. if a pipeline only had one key, it would use the computed keys this._iterateKeys((key) => options.keyPrefix + key); to determine the correct slots, and there would only be an issue if there were multiple keys and those would get sent to different slots if the prefix wasn't added.

I'd guess the new test case would be less prone to similar coincidences in the future if the unit test was called with more distinct keys

? `${client.options.keyPrefix}${args[0]}`
: args[0];
return client.slots[calculateSlot(prefixedKey)].join(",");
}

function executeAutoPipeline(client, slotKey: string) {
Expand Down
25 changes: 25 additions & 0 deletions test/functional/cluster/autopipelining.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ describe("autoPipelining for cluster", function () {
if (argv[0] === "get" && argv[1] === "foo6") {
return "bar6";
}

if (argv[0] === "get" && argv[1] === "baz:foo10") {
return "bar10";
}
});

new MockServer(30002, function (argv) {
Expand Down Expand Up @@ -63,6 +67,10 @@ describe("autoPipelining for cluster", function () {
return "bar5";
}

if (argv[0] === "get" && argv[1] === "baz:foo1") {
return "bar1";
}

if (argv[0] === "evalsha") {
return argv.slice(argv.length - 4);
}
Expand Down Expand Up @@ -172,6 +180,23 @@ describe("autoPipelining for cluster", function () {
cluster.disconnect();
});

it("should support building pipelines when a prefix is used", async () => {
const cluster = new Cluster(hosts, {
enableAutoPipelining: true,
keyPrefix: "baz:",
});
await new Promise((resolve) => cluster.once("connect", resolve));

await cluster.set("foo1", "bar1");
await cluster.set("foo10", "bar10");

expect(
await Promise.all([cluster.get("foo1"), cluster.get("foo10")])
).to.eql(["bar1", "bar10"]);

cluster.disconnect();
});

it("should support commands queued after a pipeline is already queued for execution", (done) => {
const cluster = new Cluster(hosts, { enableAutoPipelining: true });

Expand Down