Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Aug 11, 2016
1 parent 984fda8 commit 2874e75
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ private Promise<V> wrapTakeFuture(final Future<V> takeFuture) {
final Promise<V> result = new PromiseDelegator<V>(commandExecutor.getConnectionManager().<V>newPromise()) {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
super.cancel(mayInterruptIfRunning);
return takeFuture.cancel(mayInterruptIfRunning);
};
};
Expand All @@ -145,14 +146,14 @@ public boolean cancel(boolean mayInterruptIfRunning) {
@Override
public void operationComplete(Future<V> future) throws Exception {
if (!future.isSuccess()) {
result.setFailure(future.cause());
result.tryFailure(future.cause());
return;
}

createSemaphore(null).releaseAsync().addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
result.setSuccess(takeFuture.getNow());
result.trySuccess(takeFuture.getNow());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public void operationComplete(Future<RemoteServiceCancelRequest> future) throws
private <T> void invokeMethod(final Class<T> remoteInterface,
final RBlockingQueue<RemoteServiceRequest> requestQueue, final RemoteServiceRequest request,
RemoteServiceMethod method, String responseName, final ExecutorService executor,
Future<RemoteServiceCancelRequest> cancelRequestFuture, AtomicReference<RRemoteServiceResponse> responseHolder) {
Future<RemoteServiceCancelRequest> cancelRequestFuture, final AtomicReference<RRemoteServiceResponse> responseHolder) {
try {
Object result = method.getMethod().invoke(method.getBean(), request.getArgs());

Expand Down Expand Up @@ -454,7 +454,7 @@ public void operationComplete(Future<Boolean> future) throws Exception {
}

if (optionsCopy.isAckExpected()) {
RBlockingQueue<RemoteServiceAck> responseQueue = redisson.getBlockingQueue(responseName, getCodec());
final RBlockingQueue<RemoteServiceAck> responseQueue = redisson.getBlockingQueue(responseName, getCodec());
Future<RemoteServiceAck> ackFuture = responseQueue.pollAsync(optionsCopy.getAckTimeoutInMillis(), TimeUnit.MILLISECONDS);
ackFuture.addListener(new FutureListener<RemoteServiceAck>() {
@Override
Expand Down

0 comments on commit 2874e75

Please sign in to comment.