Skip to content

Commit

Permalink
RemoteSerivce shutdown process optimization. #446
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Mar 22, 2016
1 parent 7884c07 commit 5e2cbe6
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 27 deletions.
5 changes: 1 addition & 4 deletions src/main/java/org/redisson/Redisson.java
Expand Up @@ -87,7 +87,6 @@ public class Redisson implements RedissonClient {
private final CommandExecutor commandExecutor;
private final ConnectionManager connectionManager;
private final Config config;
private final RedissonRemoteService remoteService;

private final UUID id = UUID.randomUUID();

Expand Down Expand Up @@ -115,7 +114,6 @@ public class Redisson implements RedissonClient {
}
commandExecutor = new CommandSyncService(connectionManager);
evictionScheduler = new EvictionScheduler(commandExecutor);
remoteService = new RedissonRemoteService(this);
}

private void validate(SingleServerConfig config) {
Expand Down Expand Up @@ -372,7 +370,7 @@ public RScript getScript() {
}

public RRemoteService getRemoteSerivce() {
return remoteService;
return new RedissonRemoteService(this);
}

@Override
Expand Down Expand Up @@ -507,7 +505,6 @@ public RBatch createBatch() {

@Override
public void shutdown() {
remoteService.shutdown();
connectionManager.shutdown();
}

Expand Down
11 changes: 0 additions & 11 deletions src/main/java/org/redisson/RedissonRemoteService.java
Expand Up @@ -19,8 +19,6 @@
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -43,7 +41,6 @@ public class RedissonRemoteService implements RRemoteService {
private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class);

private final Map<RemoteServiceKey, RemoteServiceMethod> beans = PlatformDependent.newConcurrentHashMap();
private final Queue<Future<RemoteServiceRequest>> futures = new ConcurrentLinkedQueue<Future<RemoteServiceRequest>>();

private final Redisson redisson;

Expand Down Expand Up @@ -78,7 +75,6 @@ public <T> void register(Class<T> remoteInterface, T object, int executorsAmount

private <T> void subscribe(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue) {
Future<RemoteServiceRequest> take = requestQueue.takeAsync();
futures.add(take);
take.addListener(new FutureListener<RemoteServiceRequest>() {
@Override
public void operationComplete(Future<RemoteServiceRequest> future) throws Exception {
Expand All @@ -104,7 +100,6 @@ public void operationComplete(Future<RemoteServiceRequest> future) throws Except
log.error("None of clients has not received a response for: {}", request);
}

futures.remove(future);
subscribe(remoteInterface, requestQueue);
}
});
Expand Down Expand Up @@ -161,10 +156,4 @@ private String generateRequestId() {
return ByteBufUtil.hexDump(id);
}

public void shutdown() {
for (Future<RemoteServiceRequest> future : futures) {
future.cancel(true);
}
}

}
36 changes: 24 additions & 12 deletions src/main/java/org/redisson/command/CommandAsyncService.java
Expand Up @@ -462,18 +462,7 @@ private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final Red
int timeoutTime = connectionManager.getConfig().getTimeout();
if (skipTimeout.contains(details.getCommand().getName())) {
Integer popTimeout = Integer.valueOf(details.getParams()[details.getParams().length - 1].toString());
details.getMainPromise().addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
if (!future.isCancelled()) {
return;
}
// cancel handling for commands from skipTimeout collection
if (details.getAttemptPromise().cancel(true)) {
connection.forceReconnectAsync();
}
}
});
handleBlockingOperations(details, connection);
if (popTimeout == 0) {
return;
}
Expand All @@ -494,6 +483,29 @@ public void run(Timeout timeout) throws Exception {
details.setTimeout(timeout);
}

private <R, V> void handleBlockingOperations(final AsyncDetails<V, R> details, final RedisConnection connection) {
final FutureListener<Boolean> listener = new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
details.getMainPromise().cancel(true);
}
};
details.getMainPromise().addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
if (!future.isCancelled()) {
connectionManager.getShutdownPromise().removeListener(listener);
return;
}
// cancel handling for commands from skipTimeout collection
if (details.getAttemptPromise().cancel(true)) {
connection.forceReconnectAsync();
}
}
});
connectionManager.getShutdownPromise().addListener(listener);
}

private <R, V> void checkConnectionFuture(final NodeSource source,
final AsyncDetails<V, R> details) {
if (details.getAttemptPromise().isDone() || details.getMainPromise().isCancelled() || details.getConnectionFuture().isCancelled()) {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/redisson/connection/ConnectionManager.java
Expand Up @@ -107,5 +107,7 @@ public interface ConnectionManager {
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);

InfinitySemaphoreLatch getShutdownLatch();

Future<Boolean> getShutdownPromise();

}
Expand Up @@ -122,6 +122,8 @@ public boolean cancel() {

protected final Map<ClusterSlotRange, MasterSlaveEntry> entries = PlatformDependent.newConcurrentHashMap();

private final Promise<Boolean> shutdownPromise;

private final InfinitySemaphoreLatch shutdownLatch = new InfinitySemaphoreLatch();

private final Set<RedisClientEntry> clients = Collections.newSetFromMap(PlatformDependent.<RedisClientEntry, Boolean>newConcurrentHashMap());
Expand Down Expand Up @@ -156,6 +158,7 @@ public MasterSlaveConnectionManager(Config cfg) {
this.socketChannelClass = NioSocketChannel.class;
}
this.codec = cfg.getCodec();
this.shutdownPromise = group.next().newPromise();
this.isClusterMode = cfg.isClusterConfig();
}

Expand Down Expand Up @@ -674,6 +677,7 @@ public void releaseRead(NodeSource source, RedisConnection connection) {

@Override
public void shutdown() {
shutdownPromise.trySuccess(true);
shutdownLatch.closeAndAwaitUninterruptibly();
for (MasterSlaveEntry entry : entries.values()) {
entry.shutdown();
Expand Down Expand Up @@ -731,6 +735,11 @@ public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
public InfinitySemaphoreLatch getShutdownLatch() {
return shutdownLatch;
}

@Override
public Future<Boolean> getShutdownPromise() {
return shutdownPromise;
}

@Override
public ConnectionEventsHub getConnectionEventsHub() {
Expand Down

0 comments on commit 5e2cbe6

Please sign in to comment.