Skip to content

Commit

Permalink
Propagate cause if cluster topology refresh can't connect to any node #…
Browse files Browse the repository at this point in the history
…427

RedisClusterClient.loadPartitions now propagates the cause if lettuce can't connect to any of the topology refresh sources (initial seed nodes). Cause propagation covers I/O errors and authentication issues. Command execution issues are just logged.
  • Loading branch information
mp911de committed Dec 18, 2016
1 parent 1324730 commit 642b403
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 48 deletions.
37 changes: 21 additions & 16 deletions src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -791,9 +791,7 @@ protected void updatePartitionsInConnections() {
}

protected void initializePartitions() {

Partitions loadedPartitions = loadPartitions();
this.partitions = loadedPartitions;
this.partitions = loadPartitions();
}

/**
Expand All @@ -816,25 +814,32 @@ public Partitions getPartitions() {
protected Partitions loadPartitions() {

Iterable<RedisURI> topologyRefreshSource = getTopologyRefreshSource();
Map<RedisURI, Partitions> partitions = refresh.loadViews(topologyRefreshSource, useDynamicRefreshSources());

if (partitions.isEmpty()) {
throw new RedisException("Cannot retrieve initial cluster partitions from initial URIs " + topologyRefreshSource);
}
String message = "Cannot retrieve initial cluster partitions from initial URIs " + topologyRefreshSource;
try {
Map<RedisURI, Partitions> partitions = refresh.loadViews(topologyRefreshSource, useDynamicRefreshSources());

if (partitions.isEmpty()) {
throw new RedisException(message);
}

Partitions loadedPartitions = determinePartitions(this.partitions, partitions);
RedisURI viewedBy = refresh.getViewedBy(partitions, loadedPartitions);
Partitions loadedPartitions = determinePartitions(this.partitions, partitions);
RedisURI viewedBy = refresh.getViewedBy(partitions, loadedPartitions);

for (RedisClusterNode partition : loadedPartitions) {
if (viewedBy != null) {
RedisURI uri = partition.getUri();
RedisClusterURIUtil.applyUriConnectionSettings(viewedBy, uri);
for (RedisClusterNode partition : loadedPartitions) {
if (viewedBy != null) {
RedisURI uri = partition.getUri();
RedisClusterURIUtil.applyUriConnectionSettings(viewedBy, uri);
}
}
}

activateTopologyRefreshIfNeeded();
activateTopologyRefreshIfNeeded();

return loadedPartitions;
return loadedPartitions;

} catch (RedisConnectionException e) {
throw new RedisException(message, e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -46,7 +47,6 @@ public void addConnection(RedisURI redisURI, CompletableFuture<StatefulRedisConn
}

/**
*
* @return a set of {@link RedisURI} for which {@link Connections} has a connection.
*/
public Set<RedisURI> connectedNodes() {
Expand All @@ -59,6 +59,43 @@ public Set<RedisURI> connectedNodes() {
*/
public Connections get(long timeout, TimeUnit timeUnit) throws InterruptedException {

Connections connections = new Connections();
List<Throwable> exceptions = new CopyOnWriteArrayList<>();
List<Future<?>> sync = new ArrayList<>(this.futures.size());

for (Map.Entry<RedisURI, CompletableFuture<StatefulRedisConnection<String, String>>> entry : this.futures.entrySet()) {

CompletableFuture<StatefulRedisConnection<String, String>> future = entry.getValue();

sync.add(future.whenComplete((connection, throwable) -> {

if (throwable != null) {
exceptions.add(throwable);
} else {
connections.addConnection(entry.getKey(), connection);
}
}));
}

RefreshFutures.awaitAll(timeout, timeUnit, sync);

if (connections.isEmpty() && !sync.isEmpty() && !exceptions.isEmpty()) {

RedisConnectionException collector = new RedisConnectionException(
"Unable to establish a connection to Redis Cluster");
exceptions.forEach(collector::addSuppressed);

throw collector;
}

return connections;
}

/**
* @return the {@link Connections}.
*/
public Connections optionalGet(long timeout, TimeUnit timeUnit) throws InterruptedException {

Connections connections = new Connections();
List<Future<?>> sync = new ArrayList<>(this.futures.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public Map<RedisURI, Partitions> loadViews(Iterable<RedisURI> seed, boolean disc
Set<RedisURI> discoveredNodes = difference(allKnownUris, toSet(seed));

if (!discoveredNodes.isEmpty()) {
Connections discoveredConnections = getConnections(discoveredNodes).get(commandTimeoutNs,
Connections discoveredConnections = getConnections(discoveredNodes).optionalGet(commandTimeoutNs,
TimeUnit.NANOSECONDS);
connections = connections.mergeWith(discoveredConnections);

Expand Down Expand Up @@ -139,19 +139,18 @@ NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requ
.filter(ClusterTopologyRefresh::validNode) //
.map(RedisClusterNodeSnapshot::new).collect(Collectors.toList());

for (RedisClusterNodeSnapshot partition : nodeWithStats) {
nodeWithStats.stream() //
.filter(partition -> partition.is(RedisClusterNode.NodeFlag.MYSELF)) //
.forEach(partition -> {

if (partition.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) {
if (partition.getUri() == null) {
partition.setUri(node);
}

if (partition.getUri() == null) {
partition.setUri(node);
}

// record latency for later partition ordering
latencies.put(partition.getNodeId(), nodeTopologyView.getLatency());
clientCountByNodeId.put(partition.getNodeId(), nodeTopologyView.getConnectedClients());
}
}
// record latency for later partition ordering
latencies.put(partition.getNodeId(), nodeTopologyView.getLatency());
clientCountByNodeId.put(partition.getNodeId(), nodeTopologyView.getConnectedClients());
});

allNodes.addAll(nodeWithStats);

Expand Down Expand Up @@ -211,28 +210,31 @@ private AsyncConnections getConnections(Iterable<RedisURI> redisURIs) throws Int
CompletableFuture<StatefulRedisConnection<String, String>> connectionFuture = nodeConnectionFactory
.connectToNodeAsync(CODEC, socketAddress);

connectionFuture.thenAccept(connection -> connection.async().clientSetname("lettuce#ClusterTopologyRefresh"));

CompletableFuture<StatefulRedisConnection<String, String>> sync = new CompletableFuture<>();

connectionFuture.whenComplete((connection, throwable) -> {

if (throwable != null) {
sync.completeExceptionally(new RedisConnectionException(
String.format("Unable to connect to %s", socketAddress), throwable));

String message = String.format("Unable to connect to %s", socketAddress);
if (throwable instanceof RedisConnectionException) {
if (logger.isDebugEnabled()) {
logger.debug(throwable.getMessage(), throwable);
} else {
logger.warn(throwable.getMessage());
}
} else {
logger.warn(message, throwable);
}

sync.completeExceptionally(new RedisConnectionException(message, throwable));
} else {
connection.async().clientSetname("lettuce#ClusterTopologyRefresh");
sync.complete(connection);
}
});

connections.addConnection(redisURI, sync);
} catch (RedisConnectionException e) {

if (logger.isDebugEnabled()) {
logger.debug(e.getMessage(), e);
} else {
logger.warn(e.getMessage());
}
} catch (RuntimeException e) {
logger.warn(String.format("Cannot connect to %s", redisURI), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ public void operateOnPartiallyDownCluster() throws Exception {
connection.sync().get(key_10439);
fail("Missing RedisException");
} catch (RedisConnectionException e) {
assertThat(e).hasRootCauseInstanceOf(
ConnectException.class);
assertThat(e).hasRootCauseInstanceOf(ConnectException.class);
}

connection.close();
Expand All @@ -106,7 +105,8 @@ public void seedNodesAreOffline() throws Exception {
redisClusterClient.connect();
fail("Missing RedisException");
} catch (RedisException e) {
assertThat(e).hasNoCause();
assertThat(e).hasCauseInstanceOf(RedisConnectionException.class);
assertThat(e.getCause()).hasMessage("Unable to establish a connection to Redis Cluster");
}
}

Expand All @@ -117,10 +117,8 @@ public void partitionNodesAreOffline() throws Exception {
redisClusterClient = RedisClusterClient.create(clientResources, seed);

Partitions partitions = new Partitions();
partitions.addPartition(
new RedisClusterNode(URI_1, "a", true, null, 0, 0, 0, new ArrayList<>(), new HashSet<>()));
partitions.addPartition(
new RedisClusterNode(URI_2, "b", true, null, 0, 0, 0, new ArrayList<>(), new HashSet<>()));
partitions.addPartition(new RedisClusterNode(URI_1, "a", true, null, 0, 0, 0, new ArrayList<>(), new HashSet<>()));
partitions.addPartition(new RedisClusterNode(URI_2, "b", true, null, 0, 0, 0, new ArrayList<>(), new HashSet<>()));

redisClusterClient.setPartitions(partitions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.verify;
Expand All @@ -29,6 +30,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import io.netty.util.concurrent.ImmediateEventExecutor;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -90,6 +92,7 @@ public class ClusterTopologyRefreshTest {
public void before() throws Exception {

when(clientResources.dnsResolver()).thenReturn(DnsResolvers.JVM_DEFAULT);
when(clientResources.eventExecutorGroup()).thenReturn(ImmediateEventExecutor.INSTANCE);
when(connection1.async()).thenReturn(asyncCommands1);
when(connection2.async()).thenReturn(asyncCommands2);

Expand Down Expand Up @@ -220,6 +223,28 @@ public void shouldAttemptToConnectOnlyOnce() throws Exception {
verify(nodeConnectionFactory).connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7381)));
}

@Test
public void shouldFailIfNoNodeConnects() throws Exception {

List<RedisURI> seed = Arrays.asList(RedisURI.create("127.0.0.1", 7380), RedisURI.create("127.0.0.1", 7381));

when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7380))))
.thenReturn(completedWithException(new RedisException("connection failed")));
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7381))))
.thenReturn(completedWithException(new RedisException("connection failed")));

try {
sut.loadViews(seed, true);
fail("Missing RedisConnectionException");
} catch (Exception e) {
assertThat(e).hasNoCause().hasMessage("Unable to establish a connection to Redis Cluster");
assertThat(e.getSuppressed()).hasSize(2);
}

verify(nodeConnectionFactory).connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7380)));
verify(nodeConnectionFactory).connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7381)));
}

@Test
public void shouldShouldDiscoverNodes() throws Exception {

Expand Down Expand Up @@ -359,7 +384,6 @@ protected Requests createClusterNodesRequests(int duration, String nodes) {
command.completedAtNs = duration;

return requests;

}

protected Requests createClientListRequests(int duration, String response) {
Expand Down

0 comments on commit 642b403

Please sign in to comment.