Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Koksharov committed Jan 20, 2022
1 parent cdb1db8 commit f31d4bb
Showing 1 changed file with 24 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,31 @@ private void subscribe(Codec codec, ChannelName channelName, MasterSlaveEntry en
}
freePubSubLock.release();

addListeners(channelName, promise, type, lock, freeEntry, listeners);
CompletableFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners);

ChannelFuture future;
if (PubSubType.PSUBSCRIBE == type) {
freeEntry.psubscribe(codec, channelName);
future = freeEntry.psubscribe(codec, channelName);
} else {
freeEntry.subscribe(codec, channelName);
future = freeEntry.subscribe(codec, channelName);
}

future.addListener((ChannelFutureListener) future1 -> {
if (!future1.isSuccess()) {
if (!promise.isDone()) {
subscribeFuture.cancel(false);
}
return;
}

connectionManager.newTimeout(timeout -> {
if (subscribeFuture.completeExceptionally(new RedisTimeoutException(
"Subscription timeout after " + config.getTimeout() + "ms. " +
"Check network and/or increase 'timeout' parameter."))) {
unsubscribe(channelName, type);
}
}, config.getTimeout(), TimeUnit.MILLISECONDS);
});
});
}

Expand All @@ -302,7 +321,7 @@ private MasterSlaveEntry getEntry(ChannelName channelName) {
return connectionManager.getEntry(slot);
}

private void addListeners(ChannelName channelName, CompletableFuture<PubSubConnectionEntry> promise,
private CompletableFuture<Void> addListeners(ChannelName channelName, CompletableFuture<PubSubConnectionEntry> promise,
PubSubType type, AsyncSemaphore lock, PubSubConnectionEntry connEntry,
RedisPubSubListener<?>... listeners) {
for (RedisPubSubListener<?> listener : listeners) {
Expand Down Expand Up @@ -334,14 +353,7 @@ private void addListeners(ChannelName channelName, CompletableFuture<PubSubConne
lock.release();
}
});

connectionManager.newTimeout(timeout -> {
if (subscribeFuture.completeExceptionally(new RedisTimeoutException(
"Subscription timeout after " + config.getTimeout() + "ms. " +
"Check network and/or increase 'timeout' parameter."))) {
unsubscribe(channelName, type);
}
}, config.getTimeout(), TimeUnit.MILLISECONDS);
return subscribeFuture;
}

private CompletableFuture<RedisPubSubConnection> nextPubSubConnection(MasterSlaveEntry entry, ChannelName channelName) {
Expand Down

0 comments on commit f31d4bb

Please sign in to comment.