diff --git a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java index 8e94ad53ae..3d1ead4e7d 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java +++ b/src/main/java/com/lambdaworks/redis/cluster/RedisClusterClient.java @@ -791,9 +791,7 @@ protected void updatePartitionsInConnections() { } protected void initializePartitions() { - - Partitions loadedPartitions = loadPartitions(); - this.partitions = loadedPartitions; + this.partitions = loadPartitions(); } /** @@ -816,25 +814,32 @@ public Partitions getPartitions() { protected Partitions loadPartitions() { Iterable topologyRefreshSource = getTopologyRefreshSource(); - Map partitions = refresh.loadViews(topologyRefreshSource, useDynamicRefreshSources()); - if (partitions.isEmpty()) { - throw new RedisException("Cannot retrieve initial cluster partitions from initial URIs " + topologyRefreshSource); - } + String message = "Cannot retrieve initial cluster partitions from initial URIs " + topologyRefreshSource; + try { + Map partitions = refresh.loadViews(topologyRefreshSource, useDynamicRefreshSources()); + + if (partitions.isEmpty()) { + throw new RedisException(message); + } - Partitions loadedPartitions = determinePartitions(this.partitions, partitions); - RedisURI viewedBy = refresh.getViewedBy(partitions, loadedPartitions); + Partitions loadedPartitions = determinePartitions(this.partitions, partitions); + RedisURI viewedBy = refresh.getViewedBy(partitions, loadedPartitions); - for (RedisClusterNode partition : loadedPartitions) { - if (viewedBy != null) { - RedisURI uri = partition.getUri(); - RedisClusterURIUtil.applyUriConnectionSettings(viewedBy, uri); + for (RedisClusterNode partition : loadedPartitions) { + if (viewedBy != null) { + RedisURI uri = partition.getUri(); + RedisClusterURIUtil.applyUriConnectionSettings(viewedBy, uri); + } } - } - activateTopologyRefreshIfNeeded(); + activateTopologyRefreshIfNeeded(); - return loadedPartitions; + return loadedPartitions; + + } catch (RedisConnectionException e) { + throw new RedisException(message, e); + } } /** diff --git a/src/main/java/com/lambdaworks/redis/cluster/topology/AsyncConnections.java b/src/main/java/com/lambdaworks/redis/cluster/topology/AsyncConnections.java index eda0aacb0e..da15091ef1 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/topology/AsyncConnections.java +++ b/src/main/java/com/lambdaworks/redis/cluster/topology/AsyncConnections.java @@ -17,6 +17,7 @@ import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -46,7 +47,6 @@ public void addConnection(RedisURI redisURI, CompletableFuture connectedNodes() { @@ -59,6 +59,43 @@ public Set connectedNodes() { */ public Connections get(long timeout, TimeUnit timeUnit) throws InterruptedException { + Connections connections = new Connections(); + List exceptions = new CopyOnWriteArrayList<>(); + List> sync = new ArrayList<>(this.futures.size()); + + for (Map.Entry>> entry : this.futures.entrySet()) { + + CompletableFuture> future = entry.getValue(); + + sync.add(future.whenComplete((connection, throwable) -> { + + if (throwable != null) { + exceptions.add(throwable); + } else { + connections.addConnection(entry.getKey(), connection); + } + })); + } + + RefreshFutures.awaitAll(timeout, timeUnit, sync); + + if (connections.isEmpty() && !sync.isEmpty() && !exceptions.isEmpty()) { + + RedisConnectionException collector = new RedisConnectionException( + "Unable to establish a connection to Redis Cluster"); + exceptions.forEach(collector::addSuppressed); + + throw collector; + } + + return connections; + } + + /** + * @return the {@link Connections}. + */ + public Connections optionalGet(long timeout, TimeUnit timeUnit) throws InterruptedException { + Connections connections = new Connections(); List> sync = new ArrayList<>(this.futures.size()); diff --git a/src/main/java/com/lambdaworks/redis/cluster/topology/ClusterTopologyRefresh.java b/src/main/java/com/lambdaworks/redis/cluster/topology/ClusterTopologyRefresh.java index a0481961c0..0629106d68 100644 --- a/src/main/java/com/lambdaworks/redis/cluster/topology/ClusterTopologyRefresh.java +++ b/src/main/java/com/lambdaworks/redis/cluster/topology/ClusterTopologyRefresh.java @@ -81,7 +81,7 @@ public Map loadViews(Iterable seed, boolean disc Set discoveredNodes = difference(allKnownUris, toSet(seed)); if (!discoveredNodes.isEmpty()) { - Connections discoveredConnections = getConnections(discoveredNodes).get(commandTimeoutNs, + Connections discoveredConnections = getConnections(discoveredNodes).optionalGet(commandTimeoutNs, TimeUnit.NANOSECONDS); connections = connections.mergeWith(discoveredConnections); @@ -139,19 +139,18 @@ NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requ .filter(ClusterTopologyRefresh::validNode) // .map(RedisClusterNodeSnapshot::new).collect(Collectors.toList()); - for (RedisClusterNodeSnapshot partition : nodeWithStats) { + nodeWithStats.stream() // + .filter(partition -> partition.is(RedisClusterNode.NodeFlag.MYSELF)) // + .forEach(partition -> { - if (partition.getFlags().contains(RedisClusterNode.NodeFlag.MYSELF)) { + if (partition.getUri() == null) { + partition.setUri(node); + } - if (partition.getUri() == null) { - partition.setUri(node); - } - - // record latency for later partition ordering - latencies.put(partition.getNodeId(), nodeTopologyView.getLatency()); - clientCountByNodeId.put(partition.getNodeId(), nodeTopologyView.getConnectedClients()); - } - } + // record latency for later partition ordering + latencies.put(partition.getNodeId(), nodeTopologyView.getLatency()); + clientCountByNodeId.put(partition.getNodeId(), nodeTopologyView.getConnectedClients()); + }); allNodes.addAll(nodeWithStats); @@ -211,28 +210,31 @@ private AsyncConnections getConnections(Iterable redisURIs) throws Int CompletableFuture> connectionFuture = nodeConnectionFactory .connectToNodeAsync(CODEC, socketAddress); - connectionFuture.thenAccept(connection -> connection.async().clientSetname("lettuce#ClusterTopologyRefresh")); - CompletableFuture> sync = new CompletableFuture<>(); connectionFuture.whenComplete((connection, throwable) -> { if (throwable != null) { - sync.completeExceptionally(new RedisConnectionException( - String.format("Unable to connect to %s", socketAddress), throwable)); + + String message = String.format("Unable to connect to %s", socketAddress); + if (throwable instanceof RedisConnectionException) { + if (logger.isDebugEnabled()) { + logger.debug(throwable.getMessage(), throwable); + } else { + logger.warn(throwable.getMessage()); + } + } else { + logger.warn(message, throwable); + } + + sync.completeExceptionally(new RedisConnectionException(message, throwable)); } else { + connection.async().clientSetname("lettuce#ClusterTopologyRefresh"); sync.complete(connection); } }); connections.addConnection(redisURI, sync); - } catch (RedisConnectionException e) { - - if (logger.isDebugEnabled()) { - logger.debug(e.getMessage(), e); - } else { - logger.warn(e.getMessage()); - } } catch (RuntimeException e) { logger.warn(String.format("Cannot connect to %s", redisURI), e); } diff --git a/src/test/java/com/lambdaworks/redis/cluster/ClusterPartiallyDownTest.java b/src/test/java/com/lambdaworks/redis/cluster/ClusterPartiallyDownTest.java index bfea690cea..1a98512dcf 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/ClusterPartiallyDownTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/ClusterPartiallyDownTest.java @@ -89,8 +89,7 @@ public void operateOnPartiallyDownCluster() throws Exception { connection.sync().get(key_10439); fail("Missing RedisException"); } catch (RedisConnectionException e) { - assertThat(e).hasRootCauseInstanceOf( - ConnectException.class); + assertThat(e).hasRootCauseInstanceOf(ConnectException.class); } connection.close(); @@ -106,7 +105,8 @@ public void seedNodesAreOffline() throws Exception { redisClusterClient.connect(); fail("Missing RedisException"); } catch (RedisException e) { - assertThat(e).hasNoCause(); + assertThat(e).hasCauseInstanceOf(RedisConnectionException.class); + assertThat(e.getCause()).hasMessage("Unable to establish a connection to Redis Cluster"); } } @@ -117,10 +117,8 @@ public void partitionNodesAreOffline() throws Exception { redisClusterClient = RedisClusterClient.create(clientResources, seed); Partitions partitions = new Partitions(); - partitions.addPartition( - new RedisClusterNode(URI_1, "a", true, null, 0, 0, 0, new ArrayList<>(), new HashSet<>())); - partitions.addPartition( - new RedisClusterNode(URI_2, "b", true, null, 0, 0, 0, new ArrayList<>(), new HashSet<>())); + partitions.addPartition(new RedisClusterNode(URI_1, "a", true, null, 0, 0, 0, new ArrayList<>(), new HashSet<>())); + partitions.addPartition(new RedisClusterNode(URI_2, "b", true, null, 0, 0, 0, new ArrayList<>(), new HashSet<>())); redisClusterClient.setPartitions(partitions); diff --git a/src/test/java/com/lambdaworks/redis/cluster/topology/ClusterTopologyRefreshTest.java b/src/test/java/com/lambdaworks/redis/cluster/topology/ClusterTopologyRefreshTest.java index bc15a9a2ec..d2addc8b78 100644 --- a/src/test/java/com/lambdaworks/redis/cluster/topology/ClusterTopologyRefreshTest.java +++ b/src/test/java/com/lambdaworks/redis/cluster/topology/ClusterTopologyRefreshTest.java @@ -17,6 +17,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.verify; @@ -29,6 +30,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import io.netty.util.concurrent.ImmediateEventExecutor; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -90,6 +92,7 @@ public class ClusterTopologyRefreshTest { public void before() throws Exception { when(clientResources.dnsResolver()).thenReturn(DnsResolvers.JVM_DEFAULT); + when(clientResources.eventExecutorGroup()).thenReturn(ImmediateEventExecutor.INSTANCE); when(connection1.async()).thenReturn(asyncCommands1); when(connection2.async()).thenReturn(asyncCommands2); @@ -220,6 +223,28 @@ public void shouldAttemptToConnectOnlyOnce() throws Exception { verify(nodeConnectionFactory).connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7381))); } + @Test + public void shouldFailIfNoNodeConnects() throws Exception { + + List seed = Arrays.asList(RedisURI.create("127.0.0.1", 7380), RedisURI.create("127.0.0.1", 7381)); + + when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7380)))) + .thenReturn(completedWithException(new RedisException("connection failed"))); + when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7381)))) + .thenReturn(completedWithException(new RedisException("connection failed"))); + + try { + sut.loadViews(seed, true); + fail("Missing RedisConnectionException"); + } catch (Exception e) { + assertThat(e).hasNoCause().hasMessage("Unable to establish a connection to Redis Cluster"); + assertThat(e.getSuppressed()).hasSize(2); + } + + verify(nodeConnectionFactory).connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7380))); + verify(nodeConnectionFactory).connectToNodeAsync(any(RedisCodec.class), eq(new InetSocketAddress("127.0.0.1", 7381))); + } + @Test public void shouldShouldDiscoverNodes() throws Exception { @@ -359,7 +384,6 @@ protected Requests createClusterNodesRequests(int duration, String nodes) { command.completedAtNs = duration; return requests; - } protected Requests createClientListRequests(int duration, String response) {