Skip to content

Commit

Permalink
Merge 465e5ba into d2f6712
Browse files Browse the repository at this point in the history
  • Loading branch information
chenshi5012 committed Nov 1, 2023
2 parents d2f6712 + 465e5ba commit 33e2028
Show file tree
Hide file tree
Showing 7 changed files with 449 additions and 235 deletions.
173 changes: 48 additions & 125 deletions src/main/java/redis/clients/jedis/JedisSentinelPool.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package redis.clients.jedis;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
Expand All @@ -14,6 +12,9 @@

import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.sentinel.listener.SentinelActiveDetectListener;
import redis.clients.jedis.sentinel.listener.SentinelListener;
import redis.clients.jedis.sentinel.listener.SentinelSubscribeListener;
import redis.clients.jedis.util.Pool;

public class JedisSentinelPool extends Pool<Jedis> {
Expand All @@ -24,7 +25,7 @@ public class JedisSentinelPool extends Pool<Jedis> {

private final JedisClientConfig sentinelClientConfig;

protected final Collection<MasterListener> masterListeners = new ArrayList<>();
private final Collection<SentinelListener> masterListeners = new ArrayList<>();

private volatile HostAndPort currentHostMaster;

Expand Down Expand Up @@ -181,6 +182,7 @@ public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,

HostAndPort master = initSentinels(sentinels, masterName);
initMaster(master);
initMasterListeners(sentinels, masterName);
}

public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,
Expand All @@ -193,6 +195,47 @@ public JedisSentinelPool(String masterName, Set<HostAndPort> sentinels,

HostAndPort master = initSentinels(sentinels, masterName);
initMaster(master);
initMasterListeners(sentinels, masterName, poolConfig);
}

private void initMasterListeners(Set<HostAndPort> sentinels, String masterName) {
initMasterListeners(sentinels, masterName, null);
}

private void initMasterListeners(Set<HostAndPort> sentinels, String masterName,
GenericObjectPoolConfig<Jedis> poolConfig) {

LOG.info("Starting Sentinel listeners for {}...", masterName);
SentinelPoolConfig jedisSentinelPoolConfig = null;
if (poolConfig instanceof SentinelPoolConfig) {
jedisSentinelPoolConfig = ((SentinelPoolConfig) poolConfig);
} else {
jedisSentinelPoolConfig = new SentinelPoolConfig();
}

for (HostAndPort sentinel : sentinels) {
if (jedisSentinelPoolConfig.isEnableActiveDetectListener()) {
masterListeners.add(
new SentinelActiveDetectListener(currentHostMaster, sentinel, sentinelClientConfig,
masterName, jedisSentinelPoolConfig.getActiveDetectIntervalTimeMillis()) {
@Override
public void onChange(HostAndPort hostAndPort) {
initMaster(hostAndPort);
}
});
}

if (jedisSentinelPoolConfig.isEnableDefaultSubscribeListener()) {
masterListeners.add(new SentinelSubscribeListener(masterName, sentinel,
sentinelClientConfig, jedisSentinelPoolConfig.getSubscribeRetryWaitTimeMillis()) {
@Override
public void onChange(HostAndPort hostAndPort) {
initMaster(hostAndPort);
}
});
}
}
masterListeners.forEach(SentinelListener::start);
}

private static Set<HostAndPort> parseHostAndPorts(Set<String> strings) {
Expand All @@ -201,10 +244,7 @@ private static Set<HostAndPort> parseHostAndPorts(Set<String> strings) {

@Override
public void destroy() {
for (MasterListener m : masterListeners) {
m.shutdown();
}

masterListeners.forEach(SentinelListener::shutdown);
super.destroy();
}

Expand Down Expand Up @@ -271,16 +311,7 @@ private HostAndPort initSentinels(Set<HostAndPort> sentinels, final String maste
}
}

LOG.info("Redis master running at {}, starting Sentinel listeners...", master);

for (HostAndPort sentinel : sentinels) {

MasterListener masterListener = new MasterListener(masterName, sentinel.getHost(), sentinel.getPort());
// whether MasterListener threads are alive or not, process can be stopped
masterListener.setDaemon(true);
masterListeners.add(masterListener);
masterListener.start();
}
LOG.info("Redis master running at {}", master);

return master;
}
Expand Down Expand Up @@ -324,112 +355,4 @@ public void returnResource(final Jedis resource) {
}
}

protected class MasterListener extends Thread {

protected String masterName;
protected String host;
protected int port;
protected long subscribeRetryWaitTimeMillis = 5000;
protected volatile Jedis j;
protected AtomicBoolean running = new AtomicBoolean(false);

protected MasterListener() {
}

public MasterListener(String masterName, String host, int port) {
super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port));
this.masterName = masterName;
this.host = host;
this.port = port;
}

public MasterListener(String masterName, String host, int port,
long subscribeRetryWaitTimeMillis) {
this(masterName, host, port);
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
}

@Override
public void run() {

running.set(true);

while (running.get()) {

try {
// double check that it is not being shutdown
if (!running.get()) {
break;
}

final HostAndPort hostPort = new HostAndPort(host, port);
j = new Jedis(hostPort, sentinelClientConfig);

// code for active refresh
List<String> masterAddr = j.sentinelGetMasterAddrByName(masterName);
if (masterAddr == null || masterAddr.size() != 2) {
LOG.warn("Can not get master addr, master name: {}. Sentinel: {}.", masterName,
hostPort);
} else {
initMaster(toHostAndPort(masterAddr));
}

j.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
LOG.debug("Sentinel {} published: {}.", hostPort, message);

String[] switchMasterMsg = message.split(" ");

if (switchMasterMsg.length > 3) {

if (masterName.equals(switchMasterMsg[0])) {
initMaster(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4])));
} else {
LOG.debug(
"Ignoring message on +switch-master for master name {}, our master name is {}",
switchMasterMsg[0], masterName);
}

} else {
LOG.error("Invalid message received on Sentinel {} on channel +switch-master: {}",
hostPort, message);
}
}
}, "+switch-master");

} catch (JedisException e) {

if (running.get()) {
LOG.error("Lost connection to Sentinel at {}:{}. Sleeping 5000ms and retrying.", host,
port, e);
try {
Thread.sleep(subscribeRetryWaitTimeMillis);
} catch (InterruptedException e1) {
LOG.error("Sleep interrupted: ", e1);
}
} else {
LOG.debug("Unsubscribing from Sentinel at {}:{}", host, port);
}
} finally {
if (j != null) {
j.close();
}
}
}
}

public void shutdown() {
try {
LOG.debug("Shutting down listener on {}:{}", host, port);
running.set(false);
// This isn't good, the Jedis object is not thread safe
if (j != null) {
j.close();
}
} catch (RuntimeException e) {
LOG.error("Caught exception while shutting down: ", e);
}
}
}
}
44 changes: 44 additions & 0 deletions src/main/java/redis/clients/jedis/SentinelPoolConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package redis.clients.jedis;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

public class SentinelPoolConfig extends GenericObjectPoolConfig {

private boolean enableActiveDetectListener = false;
private long activeDetectIntervalTimeMillis = 5 * 1000;

private boolean enableDefaultSubscribeListener = true;
private long subscribeRetryWaitTimeMillis = 5 * 1000;

public boolean isEnableActiveDetectListener() {
return enableActiveDetectListener;
}

public void setEnableActiveDetectListener(boolean enableActiveDetectListener) {
this.enableActiveDetectListener = enableActiveDetectListener;
}

public long getActiveDetectIntervalTimeMillis() {
return activeDetectIntervalTimeMillis;
}

public void setActiveDetectIntervalTimeMillis(long activeDetectIntervalTimeMillis) {
this.activeDetectIntervalTimeMillis = activeDetectIntervalTimeMillis;
}

public boolean isEnableDefaultSubscribeListener() {
return enableDefaultSubscribeListener;
}

public void setEnableDefaultSubscribeListener(boolean enableDefaultSubscribeListener) {
this.enableDefaultSubscribeListener = enableDefaultSubscribeListener;
}

public long getSubscribeRetryWaitTimeMillis() {
return subscribeRetryWaitTimeMillis;
}

public void setSubscribeRetryWaitTimeMillis(long subscribeRetryWaitTimeMillis) {
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
}
}

0 comments on commit 33e2028

Please sign in to comment.