Skip to content

Commit

Permalink
Connect asynchronously in cluster topology refresh #424
Browse files Browse the repository at this point in the history
Switch from blocking to asynchronous connection initialization when connecting Redis Cluster nodes for topology refresh. Asynchronous connection allows connecting concurrently.
  • Loading branch information
mp911de committed Dec 18, 2016
1 parent a4655b9 commit a2316f8
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,11 @@ public void run() {
logger.debug("ClusterTopologyRefreshTask requesting partitions from {}",
redisClusterClient.getTopologyRefreshSource());
}
redisClusterClient.reloadPartitions();
try {
redisClusterClient.reloadPartitions();
} catch (Exception e) {
logger.warn("Cannot refresh Redis Cluster topology", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lambdaworks.redis.cluster.topology;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import com.lambdaworks.redis.RedisConnectionException;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.api.StatefulRedisConnection;

/**
* @author Mark Paluch
*/
class AsyncConnections {

private final Map<RedisURI, CompletableFuture<StatefulRedisConnection<String, String>>> futures = new TreeMap<>(
TopologyComparators.RedisURIComparator.INSTANCE);

public AsyncConnections() {
}

/**
* Add a connection for a {@link RedisURI}
*
* @param redisURI
* @param connection
*/
public void addConnection(RedisURI redisURI, CompletableFuture<StatefulRedisConnection<String, String>> connection) {
futures.put(redisURI, connection);
}

/**
*
* @return a set of {@link RedisURI} for which {@link Connections} has a connection.
*/
public Set<RedisURI> connectedNodes() {
return futures.keySet();
}

/**
* @return the {@link Connections}.
* @throws RedisConnectionException if no connection could be established.
*/
public Connections get(long timeout, TimeUnit timeUnit) throws InterruptedException {

Connections connections = new Connections();
List<Future<?>> sync = new ArrayList<>(this.futures.size());

for (Map.Entry<RedisURI, CompletableFuture<StatefulRedisConnection<String, String>>> entry : this.futures.entrySet()) {

CompletableFuture<StatefulRedisConnection<String, String>> future = entry.getValue();

sync.add(future.thenAccept((connection) -> {
connections.addConnection(entry.getKey(), connection);
}));
}

RefreshFutures.awaitAll(timeout, timeUnit, sync);

return connections;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -64,21 +65,24 @@ public ClusterTopologyRefresh(NodeConnectionFactory nodeConnectionFactory, Clien
*/
public Map<RedisURI, Partitions> loadViews(Iterable<RedisURI> seed, boolean discovery) {

Connections connections = getConnections(seed);
Requests requestedTopology = connections.requestTopology();
Requests requestedClients = connections.requestClients();

long commandTimeoutNs = getCommandTimeoutNs(seed);

Connections connections = null;
try {
connections = getConnections(seed).get(commandTimeoutNs, TimeUnit.NANOSECONDS);

Requests requestedTopology = connections.requestTopology();
Requests requestedClients = connections.requestClients();

NodeTopologyViews nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs);

if (discovery) {
Set<RedisURI> allKnownUris = nodeSpecificViews.getClusterNodes();
Set<RedisURI> discoveredNodes = difference(allKnownUris, toSet(seed));

if (!discoveredNodes.isEmpty()) {
Connections discoveredConnections = getConnections(discoveredNodes);
Connections discoveredConnections = getConnections(discoveredNodes).get(commandTimeoutNs,
TimeUnit.NANOSECONDS);
connections = connections.mergeWith(discoveredConnections);

requestedTopology = requestedTopology.mergeWith(discoveredConnections.requestTopology());
Expand All @@ -97,7 +101,9 @@ public Map<RedisURI, Partitions> loadViews(Iterable<RedisURI> seed, boolean disc
Thread.currentThread().interrupt();
throw new RedisCommandInterruptedException(e);
} finally {
connections.close();
if (connections != null) {
connections.close();
}
}
}

Expand Down Expand Up @@ -190,9 +196,9 @@ private static boolean validNode(RedisClusterNode redisClusterNode) {
/*
* Open connections where an address can be resolved.
*/
private Connections getConnections(Iterable<RedisURI> redisURIs) {
private AsyncConnections getConnections(Iterable<RedisURI> redisURIs) throws InterruptedException {

Connections connections = new Connections();
AsyncConnections connections = new AsyncConnections();

for (RedisURI redisURI : redisURIs) {
if (redisURI.getHost() == null || connections.connectedNodes().contains(redisURI)) {
Expand All @@ -201,10 +207,25 @@ private Connections getConnections(Iterable<RedisURI> redisURIs) {

try {
SocketAddress socketAddress = SocketAddressResolver.resolve(redisURI, clientResources.dnsResolver());
StatefulRedisConnection<String, String> connection = nodeConnectionFactory.connectToNode(CODEC, socketAddress);
connection.async().clientSetname("lettuce#ClusterTopologyRefresh");

connections.addConnection(redisURI, connection);
CompletableFuture<StatefulRedisConnection<String, String>> connectionFuture = nodeConnectionFactory
.connectToNodeAsync(CODEC, socketAddress);

connectionFuture.thenAccept(connection -> connection.async().clientSetname("lettuce#ClusterTopologyRefresh"));

CompletableFuture<StatefulRedisConnection<String, String>> sync = new CompletableFuture<>();

connectionFuture.whenComplete((connection, throwable) -> {

if (throwable != null) {
sync.completeExceptionally(new RedisConnectionException(
String.format("Unable to connect to %s", socketAddress), throwable));
} else {
sync.complete(connection);
}
});

connections.addConnection(redisURI, sync);
} catch (RedisConnectionException e) {

if (logger.isDebugEnabled()) {
Expand All @@ -216,6 +237,7 @@ private Connections getConnections(Iterable<RedisURI> redisURIs) {
logger.warn(String.format("Cannot connect to %s", redisURI), e);
}
}

return connections;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.lambdaworks.redis.cluster.topology;

import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

import com.lambdaworks.redis.RedisURI;
Expand Down Expand Up @@ -49,7 +48,18 @@ private Connections(Map<RedisURI, StatefulRedisConnection<String, String>> conne
* @param connection
*/
public void addConnection(RedisURI redisURI, StatefulRedisConnection<String, String> connection) {
connections.put(redisURI, connection);
synchronized (connections) {
connections.put(redisURI, connection);
}
}

/**
* @return {@literal true} if no connections present.
*/
public boolean isEmpty() {
synchronized (connections) {
return connections.isEmpty();
}
}

/*
Expand Down Expand Up @@ -107,14 +117,6 @@ public void close() {
}
}

/**
*
* @return a set of {@link RedisURI} for which {@link Connections} has a connection.
*/
public Set<RedisURI> connectedNodes() {
return connections.keySet();
}

public Connections mergeWith(Connections discoveredConnections) {

Map<RedisURI, StatefulRedisConnection<String, String>> result = new TreeMap<>(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lambdaworks.redis.cluster.topology;

import java.util.Collection;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* @author Mark Paluch
*/
class RefreshFutures {

static long awaitAll(long timeout, TimeUnit timeUnit, Collection<? extends Future<?>> futures) throws InterruptedException {

long waitTime = 0;

for (Future<?> future : futures) {

long timeoutLeft = timeUnit.toNanos(timeout) - waitTime;

if (timeoutLeft <= 0) {
break;
}

long startWait = System.nanoTime();

try {
future.get(timeoutLeft, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
} catch (Exception e) {
continue;
} finally {
waitTime += System.nanoTime() - startWait;
}

}
return waitTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;

import com.lambdaworks.redis.RedisFuture;
import com.lambdaworks.redis.RedisURI;

/**
Expand All @@ -43,29 +42,7 @@ void addRequest(RedisURI redisURI, TimedAsyncCommand<String, String, String> com
}

long await(long timeout, TimeUnit timeUnit) throws InterruptedException {

long waitTime = 0;

for (Map.Entry<RedisURI, ? extends RedisFuture<?>> entry : rawViews.entrySet()) {
long timeoutLeft = timeUnit.toNanos(timeout) - waitTime;

if (timeoutLeft <= 0) {
break;
}

long startWait = System.nanoTime();
RedisFuture<?> future = entry.getValue();

try {
if (!future.await(timeoutLeft, TimeUnit.NANOSECONDS)) {
break;
}
} finally {
waitTime += System.nanoTime() - startWait;
}

}
return waitTime;
return RefreshFutures.awaitAll(timeout, timeUnit, rawViews.values());
}

Set<RedisURI> nodes() {
Expand Down
Loading

0 comments on commit a2316f8

Please sign in to comment.