Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,8 @@
<include>**/mcf/MultiCluster*.java</include>
<include>**/mcf/StatusTracker*.java</include>
<include>**/Health*.java</include>
<include>**/*IT.java</include>
<include>**/scenario/RestEndpointUtil.java</include>
<include>src/main/java/redis/clients/jedis/MultiClusterClientConfig.java</include>
<include>src/main/java/redis/clients/jedis/HostAndPort.java</include>
</includes>
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public void connect() throws JedisConnectionException {
if (!isConnected()) {
try {
socket = socketFactory.createSocket();
soTimeout = socket.getSoTimeout(); //?
soTimeout = socket.getSoTimeout(); // ?

outputStream = new RedisOutputStream(socket.getOutputStream());
inputStream = new RedisInputStream(socket.getInputStream());
Expand Down Expand Up @@ -326,6 +326,10 @@ public void disconnect() {
}

public void forceDisconnect() throws IOException {
// setBroken() must be called first here,
// otherwise a concurrent close attempt would call 'returnResource' (instead of
// 'returnBrokenResource'),
// assuming it's an open/healthy connection whereas this individual socket is already closed.
setBroken();
IOUtils.closeQuietly(socket);
}
Expand Down Expand Up @@ -476,7 +480,6 @@ public List<Object> getMany(final int count) {

/**
* Check if the client name libname, libver, characters are legal
*
* @param info the name
* @return Returns true if legal, false throws exception
* @throws JedisException if characters illegal
Expand Down
105 changes: 101 additions & 4 deletions src/main/java/redis/clients/jedis/MultiClusterClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
* This configuration enables seamless failover between multiple Redis clusters, databases, or
* endpoints by providing comprehensive settings for retry logic, circuit breaker behavior, health
* checks, and failback mechanisms. It is designed to work with
* {@link redis.clients.jedis.providers.MultiClusterPooledConnectionProvider} to provide high
* availability and disaster recovery capabilities.
* {@link redis.clients.jedis.mcf.MultiClusterPooledConnectionProvider} to provide high availability
* and disaster recovery capabilities.
* </p>
* <p>
* <strong>Key Features:</strong>
Expand Down Expand Up @@ -70,7 +70,7 @@
* The configuration leverages <a href="https://resilience4j.readme.io/docs">Resilience4j</a> for
* circuit breaker and retry implementations, providing battle-tested fault tolerance patterns.
* </p>
* @see redis.clients.jedis.providers.MultiClusterPooledConnectionProvider
* @see redis.clients.jedis.mcf.MultiClusterPooledConnectionProvider
* @see redis.clients.jedis.mcf.HealthCheckStrategy
* @see redis.clients.jedis.mcf.EchoStrategy
* @see redis.clients.jedis.mcf.LagAwareStrategy
Expand Down Expand Up @@ -161,6 +161,12 @@ public static interface StrategySupplier {
/** Default grace period in milliseconds to keep clusters disabled after they become unhealthy. */
private static final long GRACE_PERIOD_DEFAULT = 10000;

/** Default maximum number of failover attempts. */
private static final int MAX_NUM_FAILOVER_ATTEMPTS_DEFAULT = 10;

/** Default delay in milliseconds between failover attempts. */
private static final int DELAY_IN_BETWEEN_FAILOVER_ATTEMPTS_DEFAULT = 12000;

/** Array of cluster configurations defining the available Redis endpoints and their settings. */
private final ClusterConfig[] clusterConfigs;

Expand Down Expand Up @@ -485,6 +491,34 @@ public static interface StrategySupplier {
*/
private boolean fastFailover;

/**
* Maximum number of failover attempts.
* <p>
* This setting controls how many times the system will attempt to failover to a different cluster
* before giving up. For example, if set to 3, the system will make 1 initial attempt plus 2
* failover attempts for a total of 3 attempts.
* </p>
* <p>
* <strong>Default:</strong> {@value #MAX_NUM_FAILOVER_ATTEMPTS_DEFAULT}
* </p>
* @see #getMaxNumFailoverAttempts()
*/
private int maxNumFailoverAttempts;

/**
* Delay in milliseconds between failover attempts.
* <p>
* This setting controls how long the system will wait before attempting to failover to a
* different cluster. For example, if set to 1000, the system will wait 1 second before attempting
* to failover to a different cluster.
* </p>
* <p>
* <strong>Default:</strong> {@value #DELAY_IN_BETWEEN_FAILOVER_ATTEMPTS_DEFAULT} milliseconds
* </p>
* @see #getDelayInBetweenFailoverAttempts()
*/
private int delayInBetweenFailoverAttempts;

/**
* Constructs a new MultiClusterClientConfig with the specified cluster configurations.
* <p>
Expand Down Expand Up @@ -679,6 +713,25 @@ public long getGracePeriod() {
return gracePeriod;
}

/**
* Returns the maximum number of failover attempts.
* @return maximum number of failover attempts
* @see #maxNumFailoverAttempts
*/
public int getMaxNumFailoverAttempts() {
return maxNumFailoverAttempts;

}

/**
* Returns the delay in milliseconds between failover attempts.
* @return delay in milliseconds between failover attempts
* @see #delayInBetweenFailoverAttempts
*/
public int getDelayInBetweenFailoverAttempts() {
return delayInBetweenFailoverAttempts;
}

/**
* Returns whether connections are forcefully terminated during failover.
* @return true if fast failover is enabled, false for graceful failover
Expand Down Expand Up @@ -1090,6 +1143,12 @@ public static class Builder {
/** Whether to forcefully terminate connections during failover. */
private boolean fastFailover = false;

/** Maximum number of failover attempts. */
private int maxNumFailoverAttempts = MAX_NUM_FAILOVER_ATTEMPTS_DEFAULT;

/** Delay in milliseconds between failover attempts. */
private int delayInBetweenFailoverAttempts = DELAY_IN_BETWEEN_FAILOVER_ATTEMPTS_DEFAULT;

/**
* Constructs a new Builder with the specified cluster configurations.
* @param clusterConfigs array of cluster configurations defining available Redis endpoints
Expand Down Expand Up @@ -1460,7 +1519,7 @@ public Builder retryOnFailover(boolean retryOnFailover) {
* <ul>
* <li>Health checks must be enabled on cluster configurations</li>
* <li>Grace period must elapse after cluster becomes unhealthy</li>
* <li>Higher-priority cluster must pass consecutive health checks</li>
* <li>Higher-priority cluster must pass health checks</li>
* </ul>
* @param supported true to enable automatic failback, false for manual failback only
* @return this builder instance for method chaining
Expand Down Expand Up @@ -1539,6 +1598,42 @@ public Builder fastFailover(boolean fastFailover) {
return this;
}

/**
* Sets the maximum number of failover attempts.
* <p>
* This setting controls how many times the system will attempt to failover to a different
* cluster before giving up. For example, if set to 3, the system will make 1 initial attempt
* plus 2 failover attempts for a total of 3 attempts.
* </p>
* <p>
* <strong>Default:</strong> {@value #MAX_NUM_FAILOVER_ATTEMPTS_DEFAULT}
* </p>
* @param maxNumFailoverAttempts maximum number of failover attempts
* @return this builder instance for method chaining
*/
public Builder maxNumFailoverAttempts(int maxNumFailoverAttempts) {
this.maxNumFailoverAttempts = maxNumFailoverAttempts;
return this;
}

/**
* Sets the delay in milliseconds between failover attempts.
* <p>
* This setting controls how long the system will wait before attempting to failover to a
* different cluster. For example, if set to 1000, the system will wait 1 second before
* attempting to failover to a different cluster.
* </p>
* <p>
* <strong>Default:</strong> {@value #DELAY_IN_BETWEEN_FAILOVER_ATTEMPTS_DEFAULT} milliseconds
* </p>
* @param delayInBetweenFailoverAttempts delay in milliseconds between failover attempts
* @return this builder instance for method chaining
*/
public Builder delayInBetweenFailoverAttempts(int delayInBetweenFailoverAttempts) {
this.delayInBetweenFailoverAttempts = delayInBetweenFailoverAttempts;
return this;
}

/**
* Builds and returns a new MultiClusterClientConfig instance with all configured settings.
* <p>
Expand Down Expand Up @@ -1576,6 +1671,8 @@ public MultiClusterClientConfig build() {
config.failbackCheckInterval = this.failbackCheckInterval;
config.gracePeriod = this.gracePeriod;
config.fastFailover = this.fastFailover;
config.maxNumFailoverAttempts = this.maxNumFailoverAttempts;
config.delayInBetweenFailoverAttempts = this.delayInBetweenFailoverAttempts;

return config;
}
Expand Down
14 changes: 12 additions & 2 deletions src/main/java/redis/clients/jedis/SslOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public SSLContext createSslContext() throws IOException, GeneralSecurityExceptio

if (keystoreResource != null) {

KeyStore keyStore = KeyStore.getInstance(keyStoreType);
KeyStore keyStore = KeyStore.getInstance(keyStoreType==null ? KeyStore.getDefaultType() : keyStoreType);
try (InputStream keystoreStream = keystoreResource.get()) {
keyStore.load(keystoreStream, keystorePassword);
}
Expand All @@ -355,7 +355,8 @@ public SSLContext createSslContext() throws IOException, GeneralSecurityExceptio

if (trustManagers == null && truststoreResource != null) {

KeyStore trustStore = KeyStore.getInstance(trustStoreType);

KeyStore trustStore = KeyStore.getInstance(trustStoreType == null ? KeyStore.getDefaultType() : trustStoreType);
try (InputStream truststoreStream = truststoreResource.get()) {
trustStore.load(truststoreStream, truststorePassword);
}
Expand All @@ -379,6 +380,15 @@ public SSLParameters getSslParameters() {
return sslParameters;
}


/**
* Configured ssl verify mode.
* @return {@link SslVerifyMode}
*/
public SslVerifyMode getSslVerifyMode() {
return sslVerifyMode;
}

private static char[] getPassword(char[] chars) {
return chars != null ? Arrays.copyOf(chars, chars.length) : null;
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import redis.clients.jedis.json.JsonObjectMapper;
import redis.clients.jedis.mcf.CircuitBreakerCommandExecutor;
import redis.clients.jedis.mcf.MultiClusterPipeline;
import redis.clients.jedis.mcf.MultiClusterPooledConnectionProvider;
import redis.clients.jedis.mcf.MultiClusterTransaction;
import redis.clients.jedis.params.*;
import redis.clients.jedis.providers.*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import redis.clients.jedis.CommandObject;
import redis.clients.jedis.Connection;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.executors.CommandExecutor;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider.Cluster;
import redis.clients.jedis.mcf.MultiClusterPooledConnectionProvider.Cluster;

/**
* @author Allen Terleto (aterleto)
Expand Down Expand Up @@ -38,7 +38,7 @@ public <T> T executeCommand(CommandObject<T> commandObject) {
supplier.withCircuitBreaker(cluster.getCircuitBreaker());
supplier.withRetry(cluster.getRetry());
supplier.withFallback(provider.getFallbackExceptionList(),
e -> this.handleClusterFailover(commandObject, cluster.getCircuitBreaker()));
e -> this.handleClusterFailover(commandObject, cluster));

return supplier.decorate().get();
}
Expand All @@ -47,7 +47,14 @@ public <T> T executeCommand(CommandObject<T> commandObject) {
* Functional interface wrapped in retry and circuit breaker logic to handle happy path scenarios
*/
private <T> T handleExecuteCommand(CommandObject<T> commandObject, Cluster cluster) {
try (Connection connection = cluster.getConnection()) {
Connection connection;
try {
connection = cluster.getConnection();
} catch (JedisConnectionException e) {
provider.assertOperability();
throw e;
}
try {
return connection.executeCommand(commandObject);
} catch (Exception e) {
if (cluster.retryOnFailover() && !isActiveCluster(cluster)
Expand All @@ -57,6 +64,8 @@ && isCircuitBreakerTrackedException(e, cluster.getCircuitBreaker())) {
}

throw e;
} finally {
connection.close();
}
}

Expand All @@ -73,10 +82,9 @@ private boolean isActiveCluster(Cluster cluster) {
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker
* failure scenarios
*/
private <T> T handleClusterFailover(CommandObject<T> commandObject,
CircuitBreaker circuitBreaker) {
private <T> T handleClusterFailover(CommandObject<T> commandObject, Cluster cluster) {

clusterFailover(circuitBreaker);
clusterFailover(cluster);

// Recursive call to the initiating method so the operation can be retried on the next cluster
// connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
import java.util.concurrent.locks.ReentrantLock;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider;
import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider.Cluster;
import redis.clients.jedis.mcf.MultiClusterPooledConnectionProvider.Cluster;
import redis.clients.jedis.util.IOUtils;

/**
Expand Down Expand Up @@ -38,9 +37,10 @@ public void close() {
* Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker
* failure scenarios
*/
protected void clusterFailover(CircuitBreaker circuitBreaker) {
protected void clusterFailover(Cluster cluster) {
lock.lock();

CircuitBreaker circuitBreaker = cluster.getCircuitBreaker();
try {
// Check state to handle race conditions since iterateActiveCluster() is
// non-idempotent
Expand All @@ -52,34 +52,28 @@ protected void clusterFailover(CircuitBreaker circuitBreaker) {

Cluster activeCluster = provider.getCluster();
// This should be possible only if active cluster is switched from by other reasons than
// circuit
// breaker, just before circuit breaker triggers
if (activeCluster.getCircuitBreaker() != circuitBreaker) {
// circuit breaker, just before circuit breaker triggers
if (activeCluster != cluster) {
return;
}

activeCluster.setGracePeriod();
cluster.setGracePeriod();
circuitBreaker.transitionToForcedOpenState();

// Iterating the active cluster will allow subsequent calls to the executeCommand() to use
// the next
// cluster's connection pool - according to the configuration's prioritization/order/weight
// int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex1();
provider.iterateActiveCluster(SwitchReason.CIRCUIT_BREAKER);
provider.switchToHealthyCluster(SwitchReason.CIRCUIT_BREAKER, cluster);
}
// this check relies on the fact that many failover attempts can hit with the same CB,
// only the first one will trigger a failover, and make the CB FORCED_OPEN.
// when the rest reaches here, the active cluster is already the next one, and should be
// different than
// active CB. If its the same one and there are no more clusters to failover to, then throw an
// exception
else if (circuitBreaker == provider.getCluster().getCircuitBreaker()
&& !provider.canIterateOnceMore()) {
throw new JedisConnectionException(
"Cluster/database endpoint could not failover since the MultiClusterClientConfig was not "
+ "provided with an additional cluster/database endpoint according to its prioritized sequence. "
+ "If applicable, consider failing back OR restarting with an available cluster/database endpoint");
}
else if (cluster == provider.getCluster()) {
provider.switchToHealthyCluster(SwitchReason.CIRCUIT_BREAKER, cluster);
}
// Ignore exceptions since we are already in a failure state
} finally {
lock.unlock();
Expand Down
Loading
Loading