Skip to content

Commit

Permalink
Stop connection fetching before sync/exec (#3756)
Browse files Browse the repository at this point in the history
in multi cluster failover mode
  • Loading branch information
sazzad16 committed Mar 6, 2024
1 parent 64b5aac commit eff8468
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 13 deletions.
4 changes: 4 additions & 0 deletions src/main/java/redis/clients/jedis/AbstractTransaction.java
Expand Up @@ -9,6 +9,10 @@ protected AbstractTransaction() {
super(new CommandObjects());
}

protected AbstractTransaction(CommandObjects commandObjects) {
super(commandObjects);
}

public abstract void multi();

/**
Expand Down
Expand Up @@ -40,8 +40,7 @@ public final class MultiClusterClientConfig {
private static final float CIRCUIT_BREAKER_SLOW_CALL_RATE_THRESHOLD_DEFAULT = 100.0f; // measured as percentage
private static final List<Class> CIRCUIT_BREAKER_INCLUDED_EXCEPTIONS_DEFAULT = Arrays.asList(JedisConnectionException.class);

private static final List<Class<? extends Throwable>> FALLBACK_EXCEPTIONS_DEFAULT =
Arrays.asList(CallNotPermittedException.class, JedisConnectionException.class);
private static final List<Class<? extends Throwable>> FALLBACK_EXCEPTIONS_DEFAULT = Arrays.asList(CallNotPermittedException.class);

private final ClusterConfig[] clusterConfigs;

Expand Down
4 changes: 4 additions & 0 deletions src/main/java/redis/clients/jedis/TransactionBase.java
Expand Up @@ -9,4 +9,8 @@ public abstract class TransactionBase extends AbstractTransaction {
protected TransactionBase() {
super();
}

protected TransactionBase(CommandObjects commandObjects) {
super(commandObjects);
}
}
4 changes: 2 additions & 2 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Expand Up @@ -4839,7 +4839,7 @@ public PipelineBase pipelined() {
if (provider == null) {
throw new IllegalStateException("It is not allowed to create Pipeline from this " + getClass());
} else if (provider instanceof MultiClusterPooledConnectionProvider) {
return new MultiClusterPipeline((MultiClusterPooledConnectionProvider) provider);
return new MultiClusterPipeline((MultiClusterPooledConnectionProvider) provider, commandObjects);
} else {
return new Pipeline(provider.getConnection(), true);
}
Expand All @@ -4849,7 +4849,7 @@ public AbstractTransaction multi() {
if (provider == null) {
throw new IllegalStateException("It is not allowed to create Pipeline from this " + getClass());
} else if (provider instanceof MultiClusterPooledConnectionProvider) {
return new MultiClusterTransaction((MultiClusterPooledConnectionProvider) provider);
return new MultiClusterTransaction((MultiClusterPooledConnectionProvider) provider, true, commandObjects);
} else {
return new Transaction(provider.getConnection(), true, true);
}
Expand Down
Expand Up @@ -20,6 +20,7 @@ public class MultiClusterPipeline extends PipelineBase implements Closeable {
private final CircuitBreakerFailoverConnectionProvider failoverProvider;
private final Queue<KeyValue<CommandArguments, Response<?>>> commands = new LinkedList<>();

@Deprecated
public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider) {
super(new CommandObjects());

Expand All @@ -31,6 +32,11 @@ public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider)
}
}

public MultiClusterPipeline(MultiClusterPooledConnectionProvider pooledProvider, CommandObjects commandObjects) {
super(commandObjects);
this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(pooledProvider);
}

@Override
protected final <T> Response<T> appendCommand(CommandObject<T> commandObject) {
CommandArguments args = commandObject.getArguments();
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/redis/clients/jedis/mcf/MultiClusterTransaction.java
Expand Up @@ -38,6 +38,7 @@ public class MultiClusterTransaction extends TransactionBase {
* called with this object.
* @param provider
*/
@Deprecated
public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider) {
this(provider, true);
}
Expand All @@ -49,6 +50,7 @@ public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider) {
* @param provider
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
*/
@Deprecated
public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, boolean doMulti) {
this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(provider);

Expand All @@ -60,6 +62,21 @@ public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, bo
if (doMulti) multi();
}

/**
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param provider
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
* @param commandObjects command objects
*/
public MultiClusterTransaction(MultiClusterPooledConnectionProvider provider, boolean doMulti, CommandObjects commandObjects) {
super(commandObjects);
this.failoverProvider = new CircuitBreakerFailoverConnectionProvider(provider);

if (doMulti) multi();
}

@Override
public final void multi() {
appendCommand(new CommandObject<>(new CommandArguments(MULTI), NO_OP_BUILDER));
Expand Down
32 changes: 23 additions & 9 deletions src/test/java/redis/clients/jedis/misc/AutomaticFailoverTest.java
Expand Up @@ -26,6 +26,7 @@
import redis.clients.jedis.MultiClusterClientConfig;
import redis.clients.jedis.UnifiedJedis;
import redis.clients.jedis.exceptions.JedisAccessControlException;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider;
import redis.clients.jedis.util.IOUtils;

Expand Down Expand Up @@ -68,7 +69,7 @@ public void pipelineWithSwitch() {
AbstractPipeline pipe = client.pipelined();
pipe.set("pstr", "foobar");
pipe.hset("phash", "foo", "bar");
//provider.incrementActiveMultiClusterIndex();
provider.incrementActiveMultiClusterIndex();
pipe.sync();
}

Expand All @@ -85,7 +86,7 @@ public void transactionWithSwitch() {
AbstractTransaction tx = client.multi();
tx.set("tstr", "foobar");
tx.hset("thash", "foo", "bar");
//provider.incrementActiveMultiClusterIndex();
provider.incrementActiveMultiClusterIndex();
assertEquals(Arrays.asList("OK", Long.valueOf(1L)), tx.exec());
}

Expand All @@ -109,9 +110,19 @@ public void commandFailover() {

UnifiedJedis jedis = new UnifiedJedis(cacheProvider);

assertFalse(failoverReporter.failedOver);
log.info("Starting calls to Redis");
String key = "hash-" + System.nanoTime();
log.info("Starting calls to Redis");
assertFalse(failoverReporter.failedOver);
for (int attempt = 0; attempt < 10; attempt++) {
try {
jedis.hset(key, "f1", "v1");
} catch (JedisConnectionException jce) {
//
}
assertFalse(failoverReporter.failedOver);
}

// should failover now
jedis.hset(key, "f1", "v1");
assertTrue(failoverReporter.failedOver);

Expand All @@ -129,19 +140,22 @@ public void pipelineFailover() {
MultiClusterClientConfig.Builder builder = new MultiClusterClientConfig.Builder(
getClusterConfigs(clientConfig, hostPort_1, hostPort_2))
.circuitBreakerSlidingWindowMinCalls(slidingWindowMinCalls)
.circuitBreakerSlidingWindowSize(slidingWindowSize);
.circuitBreakerSlidingWindowSize(slidingWindowSize)
.fallbackExceptionList(Arrays.asList(JedisConnectionException.class));

RedisFailoverReporter failoverReporter = new RedisFailoverReporter();
MultiClusterPooledConnectionProvider cacheProvider = new MultiClusterPooledConnectionProvider(builder.build());
cacheProvider.setClusterFailoverPostProcessor(failoverReporter);

UnifiedJedis jedis = new UnifiedJedis(cacheProvider);

assertFalse(failoverReporter.failedOver);
String key = "hash-" + System.nanoTime();
log.info("Starting calls to Redis");
assertFalse(failoverReporter.failedOver);
AbstractPipeline pipe = jedis.pipelined();
String key = "hash-" + System.nanoTime();
assertFalse(failoverReporter.failedOver);
pipe.hset(key, "f1", "v1");
assertFalse(failoverReporter.failedOver);
pipe.sync();
assertTrue(failoverReporter.failedOver);

Expand All @@ -168,9 +182,9 @@ public void failoverFromAuthError() {

UnifiedJedis jedis = new UnifiedJedis(cacheProvider);

assertFalse(failoverReporter.failedOver);
log.info("Starting calls to Redis");
String key = "hash-" + System.nanoTime();
log.info("Starting calls to Redis");
assertFalse(failoverReporter.failedOver);
jedis.hset(key, "f1", "v1");
assertTrue(failoverReporter.failedOver);

Expand Down

0 comments on commit eff8468

Please sign in to comment.