Skip to content

Commit

Permalink
Apply best-effort unknown-target host MOVED redirect handling #2290
Browse files Browse the repository at this point in the history
We attempt resolving a host name from the MOVED redirect using Partitions. First, we try to handle the redirect by port lookup. If we cannot find a host with such a port, then we attempt to use the hostname from the node that would serve the slot.

This isn't ideal as we do not track the logical hostname for connections. Additionally, our reconnect/address supplier mechanism does not expose the socket address for the connection.
  • Loading branch information
mp911de committed Feb 15, 2023
1 parent 4391235 commit dcd7085
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import io.lettuce.core.RedisChannelHandler;
import io.lettuce.core.RedisChannelWriter;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.cluster.event.AskRedirectionEvent;
import io.lettuce.core.cluster.event.MovedRedirectionEvent;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.event.Event;
import io.lettuce.core.internal.Futures;
Expand Down Expand Up @@ -110,7 +112,6 @@ private <K, V, T> RedisCommand<K, V, T> doWrite(RedisCommand<K, V, T> command) {
ClusterCommand<K, V, T> clusterCommand = (ClusterCommand<K, V, T>) command;
if (clusterCommand.isMoved() || clusterCommand.isAsk()) {


HostAndPort target;
boolean asking;
ByteBuffer firstEncodedKey = clusterCommand.getArgs().getFirstEncodedKey();
Expand All @@ -125,7 +126,7 @@ private <K, V, T> RedisCommand<K, V, T> doWrite(RedisCommand<K, V, T> command) {

if (clusterCommand.isMoved()) {

target = getMoveTarget(clusterCommand.getError());
target = getMoveTarget(partitions, clusterCommand.getError());
clusterEventListener.onMovedRedirection();
asking = false;

Expand Down Expand Up @@ -329,8 +330,8 @@ private static RedisChannelWriter getWriterToUse(RedisChannelWriter writer) {
/**
* Optimization: Determine command intents and optimize for bulk execution preferring one node.
* <p>
* If there is only one connectionIntent, then we take the connectionIntent derived from the commands. If there is more than one connectionIntent, then
* use {@link ConnectionIntent#WRITE}.
* If there is only one connectionIntent, then we take the connectionIntent derived from the commands. If there is more than
* one connectionIntent, then use {@link ConnectionIntent#WRITE}.
*
* @param commands {@link Collection} of {@link RedisCommand commands}.
* @return the connectionIntent.
Expand Down Expand Up @@ -368,16 +369,38 @@ private static ConnectionIntent getIntent(ProtocolKeyword type) {
return ReadOnlyCommands.isReadOnlyCommand(type) ? ConnectionIntent.READ : ConnectionIntent.WRITE;
}

static HostAndPort getMoveTarget(String errorMessage) {
static HostAndPort getMoveTarget(Partitions partitions, String errorMessage) {

LettuceAssert.notEmpty(errorMessage, "ErrorMessage must not be empty");
LettuceAssert.isTrue(errorMessage.startsWith(CommandKeyword.MOVED.name()),
"ErrorMessage must start with " + CommandKeyword.MOVED);

String[] movedMessageParts = errorMessage.split(" ");
LettuceAssert.isTrue(movedMessageParts.length >= 3, "ErrorMessage must consist of 3 tokens (" + errorMessage + ")");
String redirectTarget = movedMessageParts[2];

return HostAndPort.parseCompat(movedMessageParts[2]);
if (redirectTarget.startsWith(":")) {

// unknown redirection hostname. We attempt discovering the hostname from Partitions

int redirectPort = Integer.parseInt(redirectTarget.substring(1));
for (RedisClusterNode partition : partitions) {

RedisURI uri = partition.getUri();
if (uri.getPort() == redirectPort) {
return HostAndPort.of(uri.getHost(), redirectPort);
}
}

int slot = Integer.parseInt(movedMessageParts[1]);
RedisClusterNode partition = partitions.getPartitionBySlot(slot);
if (partition != null) {
RedisURI uri = partition.getUri();
return HostAndPort.of(uri.getHost(), redirectPort);
}
}

return HostAndPort.parseCompat(redirectTarget);
}

static HostAndPort getAskTarget(String errorMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.util.Collections;
import java.util.concurrent.CompletableFuture;

import io.lettuce.core.RedisURI;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -116,16 +119,38 @@ void shouldParseIPv6AskTargetCorrectly() {
@Test
void shouldParseMovedTargetCorrectly() {

HostAndPort moveTarget = ClusterDistributionChannelWriter.getMoveTarget("MOVED 1234-2020 127.0.0.1:6381");
HostAndPort moveTarget = ClusterDistributionChannelWriter.getMoveTarget(new Partitions(), "MOVED 1234-2020 127.0.0.1:6381");

assertThat(moveTarget.getHostText()).isEqualTo("127.0.0.1");
assertThat(moveTarget.getPort()).isEqualTo(6381);
}

@Test
void shouldParseMovedTargetWithoutHostnameCorrectly() {

Partitions partitions = new Partitions();
partitions.add(new RedisClusterNode(RedisURI.create("redis://1.2.3.4:6381"), "foo", false,null,0,0,0,Collections.emptyList(), Collections.emptySet()));
HostAndPort moveTarget = ClusterDistributionChannelWriter.getMoveTarget(partitions, "MOVED 1234 :6381");

assertThat(moveTarget.getHostText()).isEqualTo("1.2.3.4");
assertThat(moveTarget.getPort()).isEqualTo(6381);
}

@Test
void shouldParseMovedTargetWithoutHostnameUsingSlotFallbackCorrectly() {

Partitions partitions = new Partitions();
partitions.add(new RedisClusterNode(RedisURI.create("redis://1.2.3.4:5678"), "foo", false,null,0,0,0, Collections.singletonList(1234), Collections.emptySet()));
HostAndPort moveTarget = ClusterDistributionChannelWriter.getMoveTarget(partitions, "MOVED 1234 :6381");

assertThat(moveTarget.getHostText()).isEqualTo("1.2.3.4");
assertThat(moveTarget.getPort()).isEqualTo(6381);
}

@Test
void shouldParseIPv6MovedTargetCorrectly() {

HostAndPort moveTarget = ClusterDistributionChannelWriter.getMoveTarget("MOVED 1234-2020 1:2:3:4::6:6381");
HostAndPort moveTarget = ClusterDistributionChannelWriter.getMoveTarget(new Partitions(), "MOVED 1234-2020 1:2:3:4::6:6381");

assertThat(moveTarget.getHostText()).isEqualTo("1:2:3:4::6");
assertThat(moveTarget.getPort()).isEqualTo(6381);
Expand Down

0 comments on commit dcd7085

Please sign in to comment.