Skip to content

Commit

Permalink
PubSub connection re-subscription doesn't work in some cases of when …
Browse files Browse the repository at this point in the history
…there are only one slave available. #663
  • Loading branch information
Nikita committed Oct 14, 2016
1 parent e5b996e commit 1173c95
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 67 deletions.
Expand Up @@ -109,9 +109,9 @@ public interface ConnectionManager {

Codec unsubscribe(String channelName, AsyncSemaphore lock);

Codec unsubscribe(String channelName);
RFuture<Codec> unsubscribe(String channelName, boolean temporaryDown);

Codec punsubscribe(String channelName);
RFuture<Codec> punsubscribe(String channelName, boolean temporaryDown);

Codec punsubscribe(String channelName, AsyncSemaphore lock);

Expand Down
Expand Up @@ -53,7 +53,6 @@
import org.redisson.misc.InfinitySemaphoreLatch;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.misc.RedissonThreadFactory;
import org.redisson.pubsub.AsyncSemaphore;
import org.redisson.pubsub.TransferListener;
import org.slf4j.Logger;
Expand Down Expand Up @@ -540,16 +539,32 @@ public boolean onStatus(PubSubType type, String channel) {
}

@Override
public Codec unsubscribe(String channelName) {
public RFuture<Codec> unsubscribe(final String channelName, boolean temporaryDown) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return null;
}
freePubSubConnections.remove(entry);

Codec entryCodec = entry.getConnection().getChannels().get(channelName);
final Codec entryCodec = entry.getConnection().getChannels().get(channelName);
if (temporaryDown) {
final RPromise<Codec> result = newPromise();
entry.unsubscribe(channelName, new BaseRedisPubSubListener() {

@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.UNSUBSCRIBE && channel.equals(channelName)) {
result.trySuccess(entryCodec);
return true;
}
return false;
}

});
return result;
}
entry.unsubscribe(channelName, null);

return entryCodec;
return newSucceededFuture(entryCodec);
}

public Codec punsubscribe(final String channelName, final AsyncSemaphore lock) {
Expand Down Expand Up @@ -583,16 +598,32 @@ public boolean onStatus(PubSubType type, String channel) {


@Override
public Codec punsubscribe(final String channelName) {
public RFuture<Codec> punsubscribe(final String channelName, boolean temporaryDown) {
final PubSubConnectionEntry entry = name2PubSubConnection.remove(channelName);
if (entry == null) {
return null;
}
freePubSubConnections.remove(entry);

Codec entryCodec = entry.getConnection().getPatternChannels().get(channelName);
final Codec entryCodec = entry.getConnection().getChannels().get(channelName);
if (temporaryDown) {
final RPromise<Codec> result = newPromise();
entry.punsubscribe(channelName, new BaseRedisPubSubListener() {

@Override
public boolean onStatus(PubSubType type, String channel) {
if (type == PubSubType.PUNSUBSCRIBE && channel.equals(channelName)) {
result.trySuccess(entryCodec);
return true;
}
return false;
}

});
return result;
}
entry.punsubscribe(channelName, null);

return entryCodec;
return newSucceededFuture(entryCodec);
}

@Override
Expand Down
119 changes: 74 additions & 45 deletions redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java
Expand Up @@ -108,7 +108,7 @@ private boolean slaveDown(ClientConnectionsEntry entry, FreezeReason freezeReaso
return false;
}

return slaveDown(e);
return slaveDown(e, freezeReason == FreezeReason.SYSTEM);
}

public boolean slaveDown(String host, int port, FreezeReason freezeReason) {
Expand All @@ -117,10 +117,10 @@ public boolean slaveDown(String host, int port, FreezeReason freezeReason) {
return false;
}

return slaveDown(entry);
return slaveDown(entry, freezeReason == FreezeReason.SYSTEM);
}

private boolean slaveDown(ClientConnectionsEntry entry) {
private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) {
// add master as slave if no more slaves available
if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() == 0) {
InetSocketAddress addr = masterEntry.getClient().getAddr();
Expand Down Expand Up @@ -154,76 +154,100 @@ public void operationComplete(ChannelFuture future) throws Exception {
}

for (RedisPubSubConnection connection : entry.getAllSubscribeConnections()) {
reattachPubSub(connection);
reattachPubSub(connection, temporaryDown);
}
entry.getAllSubscribeConnections().clear();

return true;
}

private void reattachPubSub(RedisPubSubConnection redisPubSubConnection) {
private void reattachPubSub(RedisPubSubConnection redisPubSubConnection, boolean temporaryDown) {
for (String channelName : redisPubSubConnection.getChannels().keySet()) {
PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
Collection<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(channelName);
reattachPubSubListeners(channelName, listeners);
reattachPubSubListeners(channelName, listeners, temporaryDown);
}

for (String channelName : redisPubSubConnection.getPatternChannels().keySet()) {
PubSubConnectionEntry pubSubEntry = connectionManager.getPubSubEntry(channelName);
Collection<RedisPubSubListener<?>> listeners = pubSubEntry.getListeners(channelName);
reattachPatternPubSubListeners(channelName, listeners);
reattachPatternPubSubListeners(channelName, listeners, temporaryDown);
}
}

private void reattachPubSubListeners(final String channelName, final Collection<RedisPubSubListener<?>> listeners) {
Codec subscribeCodec = connectionManager.unsubscribe(channelName);
private void reattachPubSubListeners(final String channelName, final Collection<RedisPubSubListener<?>> listeners, boolean temporaryDown) {
RFuture<Codec> subscribeCodec = connectionManager.unsubscribe(channelName, temporaryDown);
if (listeners.isEmpty()) {
return;
}

subscribeCodec.addListener(new FutureListener<Codec>() {
@Override
public void operationComplete(Future<Codec> future) throws Exception {
Codec subscribeCodec = future.get();
subscribe(channelName, listeners, subscribeCodec);
}

});
}

private void subscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners,
final Codec subscribeCodec) {
RFuture<PubSubConnectionEntry> subscribeFuture = connectionManager.subscribe(subscribeCodec, channelName, null);
subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {

@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
log.error("Can't resubscribe topic channel: " + channelName);
subscribe(channelName, listeners, subscribeCodec);
return;
}
PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener<?> redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel", channelName);
log.debug("resubscribed listeners of '{}' channel to {}", channelName, newEntry.getConnection().getRedisClient());
}
});
}

private void reattachPatternPubSubListeners(final String channelName,
final Collection<RedisPubSubListener<?>> listeners) {
Codec subscribeCodec = connectionManager.punsubscribe(channelName);
if (!listeners.isEmpty()) {
RFuture<PubSubConnectionEntry> future = connectionManager.psubscribe(channelName, subscribeCodec, null);
future.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
log.error("Can't resubscribe topic channel: " + channelName);
return;
}

PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener<?> redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel-pattern", channelName);
}
});
private void reattachPatternPubSubListeners(final String channelName, final Collection<RedisPubSubListener<?>> listeners, boolean temporaryDown) {
RFuture<Codec> subscribeCodec = connectionManager.punsubscribe(channelName, temporaryDown);
if (listeners.isEmpty()) {
return;
}

subscribeCodec.addListener(new FutureListener<Codec>() {
@Override
public void operationComplete(Future<Codec> future) throws Exception {
Codec subscribeCodec = future.get();
psubscribe(channelName, listeners, subscribeCodec);
}
});
}

private void psubscribe(final String channelName, final Collection<RedisPubSubListener<?>> listeners,
final Codec subscribeCodec) {
RFuture<PubSubConnectionEntry> subscribeFuture = connectionManager.psubscribe(channelName, subscribeCodec, null);
subscribeFuture.addListener(new FutureListener<PubSubConnectionEntry>() {
@Override
public void operationComplete(Future<PubSubConnectionEntry> future)
throws Exception {
if (!future.isSuccess()) {
psubscribe(channelName, listeners, subscribeCodec);
return;
}

PubSubConnectionEntry newEntry = future.getNow();
for (RedisPubSubListener<?> redisPubSubListener : listeners) {
newEntry.addListener(channelName, redisPubSubListener);
}
log.debug("resubscribed listeners for '{}' channel-pattern", channelName);
}
});
}

private void reattachBlockingQueue(RedisConnection connection) {
final CommandData<?, ?> commandData = connection.getCurrentCommand();

Expand Down Expand Up @@ -272,7 +296,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
public RFuture<Void> addSlave(String host, int port) {
return addSlave(host, port, true, NodeType.SLAVE);
}

private RFuture<Void> addSlave(String host, int port, boolean freezed, NodeType mode) {
RedisClient client = connectionManager.createClient(NodeType.SLAVE, host, port);
ClientConnectionsEntry entry = new ClientConnectionsEntry(client,
Expand Down Expand Up @@ -316,18 +340,23 @@ public boolean slaveUp(String host, int port, FreezeReason freezeReason) {
* @param host of Redis
* @param port of Redis
*/
public void changeMaster(String host, int port) {
ClientConnectionsEntry oldMaster = masterEntry;
setupMasterEntry(host, port);
writeConnectionHolder.remove(oldMaster);
slaveDown(oldMaster, FreezeReason.MANAGER);

// more than one slave available, so master can be removed from slaves
if (config.getReadMode() == ReadMode.SLAVE
&& slaveBalancer.getAvailableClients() > 1) {
slaveDown(host, port, FreezeReason.SYSTEM);
}
connectionManager.shutdownAsync(oldMaster.getClient());
public void changeMaster(final String host, final int port) {
final ClientConnectionsEntry oldMaster = masterEntry;
RFuture<Void> future = setupMasterEntry(host, port);
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
writeConnectionHolder.remove(oldMaster);
slaveDown(oldMaster, FreezeReason.MANAGER);

// more than one slave available, so master can be removed from slaves
if (config.getReadMode() == ReadMode.SLAVE
&& slaveBalancer.getAvailableClients() > 1) {
slaveDown(host, port, FreezeReason.SYSTEM);
}
connectionManager.shutdownAsync(oldMaster.getClient());
}
});
}

public boolean isFreezed() {
Expand Down
Expand Up @@ -34,8 +34,6 @@

public class PubSubConnectionEntry {

public enum Status {ACTIVE, INACTIVE}

private final AtomicInteger subscribedChannelsAmount;
private final RedisPubSubConnection conn;

Expand Down
Expand Up @@ -218,8 +218,9 @@ protected void onSlaveAdded(URI addr, String msg) {
final String slaveAddr = ip + ":" + port;

// to avoid addition twice
if (slaves.putIfAbsent(slaveAddr, true) == null && config.getReadMode() != ReadMode.MASTER) {
RFuture<Void> future = getEntry(singleSlotRange.getStartSlot()).addSlave(ip, Integer.valueOf(port));
if (slaves.putIfAbsent(slaveAddr, true) == null) {
final MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
RFuture<Void> future = entry.addSlave(ip, Integer.valueOf(port));
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
Expand All @@ -229,7 +230,7 @@ public void operationComplete(Future<Void> future) throws Exception {
return;
}

if (getEntry(singleSlotRange.getStartSlot()).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) {
if (entry.slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) {
String slaveAddr = ip + ":" + port;
log.info("slave: {} added", slaveAddr);
}
Expand Down Expand Up @@ -266,12 +267,14 @@ private void onNodeDown(URI sentinelAddr, String msg) {
String ip = parts[2];
String port = parts[3];

MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
if (entry.getFreezeReason() != FreezeReason.MANAGER) {
entry.freeze();
String addr = ip + ":" + port;
log.warn("master: {} has down", addr);
}
// should be resolved by master switch event
//
// MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
// if (entry.getFreezeReason() != FreezeReason.MANAGER) {
// entry.freeze();
// String addr = ip + ":" + port;
// log.warn("master: {} has down", addr);
// }
}
} else {
log.warn("onSlaveDown. Invalid message: {} from Sentinel {}:{}", msg, sentinelAddr.getHost(), sentinelAddr.getPort());
Expand Down

0 comments on commit 1173c95

Please sign in to comment.