Skip to content

Commit

Permalink
Reattempt initially failed Sentinel connections in Master/Slave API #306
Browse files Browse the repository at this point in the history


Lettuce now retries to connect to Redis Sentinel nodes that were unavailable initially. The Master/Slave connection tracks established and missing connections. It retries only missing connections and does not obtain Sentinel endpoint details from pub/sub messages. Connection retries are signalled by Sentinel events.
  • Loading branch information
mp911de committed Oct 25, 2016
1 parent 790f506 commit 7a279a6
Show file tree
Hide file tree
Showing 2 changed files with 320 additions and 85 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package com.lambdaworks.redis.masterslave;

import java.io.Closeable;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Supplier;

import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnectionException;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.api.StatefulConnection;
import com.lambdaworks.redis.codec.Utf8StringCodec;
import com.lambdaworks.redis.codec.StringCodec;
import com.lambdaworks.redis.internal.LettuceLists;
import com.lambdaworks.redis.protocol.LettuceCharsets;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.StatefulRedisPubSubConnection;

Expand All @@ -19,51 +24,91 @@
import io.netty.util.internal.logging.InternalLoggerFactory;

/**
* Sentinel Pub/Sub listener-enabled topology refresh.
* Sentinel Pub/Sub listener-enabled topology refresh. This refresh triggers topology updates if Redis topology changes
* (monitored master/slaves) or the Sentinel availability changes.
*
* @author Mark Paluch
* @since 4.2
*/
class SentinelTopologyRefresh implements Closeable {

private static final InternalLogger LOG = InternalLoggerFactory.getInstance(SentinelTopologyRefresh.class);
private static final StringCodec CODEC = new StringCodec(LettuceCharsets.ASCII);
private static final Set<String> PROCESSING_CHANNELS = new HashSet<>(
Arrays.asList("failover-end", "failover-end-for-timeout"));

private final List<StatefulRedisPubSubConnection<String, String>> pubSubConnections = new ArrayList<>();
private final Map<RedisURI, StatefulRedisPubSubConnection<String, String>> pubSubConnections = new ConcurrentHashMap<>();
private final RedisClient redisClient;
private final String masterId;
private final List<RedisURI> sentinels;
private final AtomicReference<Timeout> timeoutRef = new AtomicReference<Timeout>();
private final Set<String> PROCESSING_CHANNELS = new HashSet<>(Arrays.asList("failover-end", "failover-end-for-timeout"));
private final List<Runnable> refreshRunnables = new CopyOnWriteArrayList<>();
private final RedisPubSubAdapter<String, String> adapter = new RedisPubSubAdapter<String, String>() {

private int timeout = 5;
private TimeUnit timeUnit = TimeUnit.SECONDS;
@Override
public void message(String pattern, String channel, String message) {
SentinelTopologyRefresh.this.processMessage(pattern, channel, message);
}
};

private final PubSubMessageActionScheduler topologyRefresh;
private final PubSubMessageActionScheduler sentinelReconnect;

private RedisPubSubAdapter<String, String> adapter;
private volatile boolean closed = false;

SentinelTopologyRefresh(RedisClient redisClient, String masterId, List<RedisURI> sentinels) {

this.redisClient = redisClient;
this.masterId = masterId;
this.sentinels = sentinels;
this.sentinels = LettuceLists.newList(sentinels);
this.topologyRefresh = new PubSubMessageActionScheduler(redisClient.getResources().eventExecutorGroup(),
new TopologyRefreshMessagePredicate(masterId));
this.sentinelReconnect = new PubSubMessageActionScheduler(redisClient.getResources().eventExecutorGroup(),
new SentinelReconnectMessagePredicate());

}

@Override
public void close() throws IOException {
public void close() {

pubSubConnections.forEach(c -> c.removeListener(adapter));
pubSubConnections.forEach(StatefulConnection::close);
closed = true;

HashMap<RedisURI, StatefulRedisPubSubConnection<String, String>> connections = new HashMap<>(pubSubConnections);
connections.forEach((k, c) -> {
c.removeListener(adapter);
c.close();
pubSubConnections.remove(k);
});
}

void bind(Runnable runnable) {

Utf8StringCodec codec = new Utf8StringCodec();
refreshRunnables.add(runnable);

initializeSentinels();
}

private void initializeSentinels() {

if (closed) {
return;
}

AtomicReference<RedisConnectionException> ref = new AtomicReference<>();

sentinels.forEach(redisURI -> {

if (closed) {
return;
}

StatefulRedisPubSubConnection<String, String> pubSubConnection = null;
try {
StatefulRedisPubSubConnection<String, String> pubSubConnection = redisClient.connectPubSub(codec, redisURI);
pubSubConnections.add(pubSubConnection);
if (!pubSubConnections.containsKey(redisURI)) {

pubSubConnection = redisClient.connectPubSub(CODEC, redisURI);
pubSubConnections.put(redisURI, pubSubConnection);

pubSubConnection.addListener(adapter);
pubSubConnection.async().psubscribe("*");
}
} catch (RedisConnectionException e) {
if (ref.get() == null) {
ref.set(e);
Expand All @@ -77,84 +122,169 @@ void bind(Runnable runnable) {
throw ref.get();
}

adapter = new RedisPubSubAdapter<String, String>() {

@Override
public void message(String pattern, String channel, String message) {
if (closed) {
close();
}
}

if (processingAllowed(channel, message)) {
private void processMessage(String pattern, String channel, String message) {

LOG.debug("Received topology changed signal from Redis Sentinel, scheduling topology update");
topologyRefresh.processMessage(channel, message, () -> {
LOG.debug("Received topology changed signal from Redis Sentinel, scheduling topology update");
return () -> refreshRunnables.forEach(Runnable::run);
});

Timeout timeout = timeoutRef.get();
if (timeout == null) {
getEventExecutor().submit(runnable);
} else {
getEventExecutor().schedule(runnable, timeout.remaining(), TimeUnit.MILLISECONDS);
}
}
}
};
sentinelReconnect.processMessage(channel, message, () -> {

pubSubConnections.forEach(c -> {
LOG.debug("Received sentinel state changed signal from Redis Sentinel, scheduling sentinel reconnect attempts");

c.addListener(adapter);
c.async().psubscribe("*");
return this::initializeSentinels;
});

}

private boolean processingAllowed(String channel, String message) {
private static class PubSubMessageActionScheduler {

if (getEventExecutor().isShuttingDown()) {
return false;
private final TimedSemaphore timedSemaphore = new TimedSemaphore();
private final EventExecutorGroup eventExecutors;
private final MessagePredicate filter;

PubSubMessageActionScheduler(EventExecutorGroup eventExecutors, MessagePredicate filter) {
this.eventExecutors = eventExecutors;
this.filter = filter;
}

if (!messageMatches(channel, message)) {
return false;
void processMessage(String channel, String message, Supplier<Runnable> runnableSupplier) {

if (!processingAllowed(channel, message)) {
return;
}

timedSemaphore.onEvent(timeout -> {

Runnable runnable = runnableSupplier.get();

if (timeout == null) {
eventExecutors.submit(runnable);
} else {
eventExecutors.schedule(runnable, timeout.remaining(), TimeUnit.MILLISECONDS);
}

});
}

Timeout existingTimeout = timeoutRef.get();
private boolean processingAllowed(String channel, String message) {

if (eventExecutors.isShuttingDown()) {
return false;
}

if (existingTimeout != null) {
if (!existingTimeout.isExpired()) {
if (!filter.test(channel, message)) {
return false;
}

return true;
}
}

/**
* Lock-free semaphore that limits calls by using a {@link Timeout}. This class is thread-safe and
* {@link #onEvent(Consumer)} may be called by multiple threads concurrently. It's guaranteed the first caller for an
* expired {@link Timeout} will be called.
*/
private static class TimedSemaphore {

private final AtomicReference<Timeout> timeoutRef = new AtomicReference<>();

Timeout timeout = new Timeout(this.timeout, this.timeUnit);
return timeoutRef.compareAndSet(existingTimeout, timeout);
private final int timeout = 5;
private final TimeUnit timeUnit = TimeUnit.SECONDS;

/**
* Rate-limited method that notifies the given {@link Consumer} once the current {@link Timeout} is expired.
*
* @param timeoutConsumer callback.
*/
protected void onEvent(Consumer<Timeout> timeoutConsumer) {

Timeout existingTimeout = timeoutRef.get();

if (existingTimeout != null) {
if (!existingTimeout.isExpired()) {
return;
}
}

Timeout timeout = new Timeout(this.timeout, this.timeUnit);
boolean state = timeoutRef.compareAndSet(existingTimeout, timeout);

if (state) {
timeoutConsumer.accept(timeout);
}
}
}

protected EventExecutorGroup getEventExecutor() {
return redisClient.getResources().eventExecutorGroup();
static interface MessagePredicate extends BiPredicate<String, String> {

@Override
boolean test(String message, String channel);
}

private boolean messageMatches(String channel, String message) {
/**
* {@link MessagePredicate} to check whether the channel and message contain topology changes related to the monitored
* master.
*/
private static class TopologyRefreshMessagePredicate implements MessagePredicate {

// trailing spaces after the master name are not bugs
if (channel.equals("+elected-leader")) {
if (message.startsWith(String.format("master %s ", masterId))) {
return true;
}
private final String masterId;

TopologyRefreshMessagePredicate(String masterId) {
this.masterId = masterId;
}

if (channel.equals("+switch-master")) {
if (message.startsWith(String.format("%s ", masterId))) {
return true;
@Override
public boolean test(String channel, String message) {

// trailing spaces after the master name are not bugs
if (channel.equals("+elected-leader")) {
if (message.startsWith(String.format("master %s ", masterId))) {
return true;
}
}

if (channel.equals("+switch-master")) {
if (message.startsWith(String.format("%s ", masterId))) {
return true;
}
}

if (channel.equals("fix-slave-config")) {
if (message.contains(String.format("@ %s ", masterId))) {
return true;
}
}

return PROCESSING_CHANNELS.contains(channel);
}
}

/**
* {@link MessagePredicate} to check whether the channel and message contain Sentinel availability changes or a Sentinel was
* added.
*/
private static class SentinelReconnectMessagePredicate implements MessagePredicate {

if (channel.equals("fix-slave-config")) {
if (message.contains(String.format("@ %s ", masterId))) {
@Override
public boolean test(String message, String channel) {
if (channel.equals("+sentinel")) {
return true;
}
}

if (PROCESSING_CHANNELS.contains(channel)) {
return true;
}
if (channel.equals("-odown") || channel.equals("-sdown")) {
if (message.startsWith("sentinel ")) {
return true;
}
}

return false;
return false;
}
}
}

0 comments on commit 7a279a6

Please sign in to comment.