From dcd7085f18e453cc2624b673f8da1c9fb56c9fe0 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Wed, 15 Feb 2023 10:04:53 +0100 Subject: [PATCH] Apply best-effort unknown-target host MOVED redirect handling #2290 We attempt resolving a host name from the MOVED redirect using Partitions. First, we try to handle the redirect by port lookup. If we cannot find a host with such a port, then we attempt to use the hostname from the node that would serve the slot. This isn't ideal as we do not track the logical hostname for connections. Additionally, our reconnect/address supplier mechanism does not expose the socket address for the connection. --- .../ClusterDistributionChannelWriter.java | 35 +++++++++++++++---- ...terDistributionChannelWriterUnitTests.java | 29 +++++++++++++-- 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java b/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java index 03ad4fa465..22142db25c 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java @@ -33,10 +33,12 @@ import io.lettuce.core.RedisChannelHandler; import io.lettuce.core.RedisChannelWriter; import io.lettuce.core.RedisException; +import io.lettuce.core.RedisURI; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.cluster.event.AskRedirectionEvent; import io.lettuce.core.cluster.event.MovedRedirectionEvent; import io.lettuce.core.cluster.models.partitions.Partitions; +import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.codec.StringCodec; import io.lettuce.core.event.Event; import io.lettuce.core.internal.Futures; @@ -110,7 +112,6 @@ private RedisCommand doWrite(RedisCommand command) { ClusterCommand clusterCommand = (ClusterCommand) command; if (clusterCommand.isMoved() || clusterCommand.isAsk()) { - HostAndPort target; boolean asking; ByteBuffer firstEncodedKey = clusterCommand.getArgs().getFirstEncodedKey(); @@ -125,7 +126,7 @@ private RedisCommand doWrite(RedisCommand command) { if (clusterCommand.isMoved()) { - target = getMoveTarget(clusterCommand.getError()); + target = getMoveTarget(partitions, clusterCommand.getError()); clusterEventListener.onMovedRedirection(); asking = false; @@ -329,8 +330,8 @@ private static RedisChannelWriter getWriterToUse(RedisChannelWriter writer) { /** * Optimization: Determine command intents and optimize for bulk execution preferring one node. *

- * If there is only one connectionIntent, then we take the connectionIntent derived from the commands. If there is more than one connectionIntent, then - * use {@link ConnectionIntent#WRITE}. + * If there is only one connectionIntent, then we take the connectionIntent derived from the commands. If there is more than + * one connectionIntent, then use {@link ConnectionIntent#WRITE}. * * @param commands {@link Collection} of {@link RedisCommand commands}. * @return the connectionIntent. @@ -368,7 +369,7 @@ private static ConnectionIntent getIntent(ProtocolKeyword type) { return ReadOnlyCommands.isReadOnlyCommand(type) ? ConnectionIntent.READ : ConnectionIntent.WRITE; } - static HostAndPort getMoveTarget(String errorMessage) { + static HostAndPort getMoveTarget(Partitions partitions, String errorMessage) { LettuceAssert.notEmpty(errorMessage, "ErrorMessage must not be empty"); LettuceAssert.isTrue(errorMessage.startsWith(CommandKeyword.MOVED.name()), @@ -376,8 +377,30 @@ static HostAndPort getMoveTarget(String errorMessage) { String[] movedMessageParts = errorMessage.split(" "); LettuceAssert.isTrue(movedMessageParts.length >= 3, "ErrorMessage must consist of 3 tokens (" + errorMessage + ")"); + String redirectTarget = movedMessageParts[2]; - return HostAndPort.parseCompat(movedMessageParts[2]); + if (redirectTarget.startsWith(":")) { + + // unknown redirection hostname. We attempt discovering the hostname from Partitions + + int redirectPort = Integer.parseInt(redirectTarget.substring(1)); + for (RedisClusterNode partition : partitions) { + + RedisURI uri = partition.getUri(); + if (uri.getPort() == redirectPort) { + return HostAndPort.of(uri.getHost(), redirectPort); + } + } + + int slot = Integer.parseInt(movedMessageParts[1]); + RedisClusterNode partition = partitions.getPartitionBySlot(slot); + if (partition != null) { + RedisURI uri = partition.getUri(); + return HostAndPort.of(uri.getHost(), redirectPort); + } + } + + return HostAndPort.parseCompat(redirectTarget); } static HostAndPort getAskTarget(String errorMessage) { diff --git a/src/test/java/io/lettuce/core/cluster/ClusterDistributionChannelWriterUnitTests.java b/src/test/java/io/lettuce/core/cluster/ClusterDistributionChannelWriterUnitTests.java index 1d0cf45d76..eeb670924e 100644 --- a/src/test/java/io/lettuce/core/cluster/ClusterDistributionChannelWriterUnitTests.java +++ b/src/test/java/io/lettuce/core/cluster/ClusterDistributionChannelWriterUnitTests.java @@ -23,6 +23,9 @@ import java.util.Collections; import java.util.concurrent.CompletableFuture; +import io.lettuce.core.RedisURI; +import io.lettuce.core.cluster.models.partitions.Partitions; +import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -116,16 +119,38 @@ void shouldParseIPv6AskTargetCorrectly() { @Test void shouldParseMovedTargetCorrectly() { - HostAndPort moveTarget = ClusterDistributionChannelWriter.getMoveTarget("MOVED 1234-2020 127.0.0.1:6381"); + HostAndPort moveTarget = ClusterDistributionChannelWriter.getMoveTarget(new Partitions(), "MOVED 1234-2020 127.0.0.1:6381"); assertThat(moveTarget.getHostText()).isEqualTo("127.0.0.1"); assertThat(moveTarget.getPort()).isEqualTo(6381); } + @Test + void shouldParseMovedTargetWithoutHostnameCorrectly() { + + Partitions partitions = new Partitions(); + partitions.add(new RedisClusterNode(RedisURI.create("redis://1.2.3.4:6381"), "foo", false,null,0,0,0,Collections.emptyList(), Collections.emptySet())); + HostAndPort moveTarget = ClusterDistributionChannelWriter.getMoveTarget(partitions, "MOVED 1234 :6381"); + + assertThat(moveTarget.getHostText()).isEqualTo("1.2.3.4"); + assertThat(moveTarget.getPort()).isEqualTo(6381); + } + + @Test + void shouldParseMovedTargetWithoutHostnameUsingSlotFallbackCorrectly() { + + Partitions partitions = new Partitions(); + partitions.add(new RedisClusterNode(RedisURI.create("redis://1.2.3.4:5678"), "foo", false,null,0,0,0, Collections.singletonList(1234), Collections.emptySet())); + HostAndPort moveTarget = ClusterDistributionChannelWriter.getMoveTarget(partitions, "MOVED 1234 :6381"); + + assertThat(moveTarget.getHostText()).isEqualTo("1.2.3.4"); + assertThat(moveTarget.getPort()).isEqualTo(6381); + } + @Test void shouldParseIPv6MovedTargetCorrectly() { - HostAndPort moveTarget = ClusterDistributionChannelWriter.getMoveTarget("MOVED 1234-2020 1:2:3:4::6:6381"); + HostAndPort moveTarget = ClusterDistributionChannelWriter.getMoveTarget(new Partitions(), "MOVED 1234-2020 1:2:3:4::6:6381"); assertThat(moveTarget.getHostText()).isEqualTo("1:2:3:4::6"); assertThat(moveTarget.getPort()).isEqualTo(6381);