Skip to content

Commit

Permalink
Ability to cancel task in running state. #208
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Aug 11, 2016
1 parent a2393b0 commit f8d9d51
Show file tree
Hide file tree
Showing 21 changed files with 453 additions and 206 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -106,7 +106,7 @@ public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Red
remoteService.setTasksCounterName(tasksCounter.getName()); remoteService.setTasksCounterName(tasksCounter.getName());
remoteService.setStatusName(status.getName()); remoteService.setStatusName(status.getName());


asyncService = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck()); asyncService = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(Integer.MAX_VALUE * 2));
asyncServiceWithoutResult = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult()); asyncServiceWithoutResult = remoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());
} }


Expand All @@ -116,7 +116,7 @@ public void registerWorkers(int executors) {
} }


@Override @Override
public void registerWorkers(int executors, Executor executor) { public void registerWorkers(int executors, ExecutorService executor) {
RemoteExecutorServiceImpl service = new RemoteExecutorServiceImpl(commandExecutor, redisson, codec, requestQueueName); RemoteExecutorServiceImpl service = new RemoteExecutorServiceImpl(commandExecutor, redisson, codec, requestQueueName);
service.setStatusName(status.getName()); service.setStatusName(status.getName());
service.setTasksCounterName(tasksCounter.getName()); service.setTasksCounterName(tasksCounter.getName());
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;


import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandExecutor; import org.redisson.command.CommandExecutor;
import org.redisson.pubsub.SemaphorePubSub; import org.redisson.pubsub.SemaphorePubSub;


import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;


/**
*
* @author Nikita Koksharov
*
*/
public class RedissonQueueSemaphore extends RedissonSemaphore { public class RedissonQueueSemaphore extends RedissonSemaphore {


private String queueName; private String queueName;
Expand Down
424 changes: 254 additions & 170 deletions redisson/src/main/java/org/redisson/RedissonRemoteService.java

Large diffs are not rendered by default.

Original file line number Original file line Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ public interface RExecutorService extends ExecutorService {
* *
* @param workers - workers amount * @param workers - workers amount
*/ */
void registerWorkers(int workers, Executor executor); void registerWorkers(int workers, ExecutorService executor);


} }
3 changes: 2 additions & 1 deletion redisson/src/main/java/org/redisson/api/RRemoteService.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.redisson.api; package org.redisson.api;


import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


/** /**
Expand Down Expand Up @@ -83,7 +84,7 @@ public interface RRemoteService {
* @param object * @param object
* @param workersAmount * @param workersAmount
*/ */
<T> void register(Class<T> remoteInterface, T object, int workersAmount, Executor executor); <T> void register(Class<T> remoteInterface, T object, int workersAmount, ExecutorService executor);


/** /**
* Get remote service object for remote invocations. * Get remote service object for remote invocations.
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public RemoteInvocationOptions(RemoteInvocationOptions copy) {
public static RemoteInvocationOptions defaults() { public static RemoteInvocationOptions defaults() {
return new RemoteInvocationOptions() return new RemoteInvocationOptions()
.expectAckWithin(1, TimeUnit.SECONDS) .expectAckWithin(1, TimeUnit.SECONDS)
.expectResultWithin(20, TimeUnit.SECONDS); .expectResultWithin(30, TimeUnit.SECONDS);
} }


public Long getAckTimeoutInMillis() { public Long getAckTimeoutInMillis() {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ public static <C extends RedisConnection> C getFrom(Channel channel) {
return (C) channel.attr(RedisConnection.CONNECTION).get(); return (C) channel.attr(RedisConnection.CONNECTION).get();
} }


public void removeCurrentCommand() {
channel.attr(CommandsQueue.CURRENT_COMMAND).remove();
}

public CommandData getCurrentCommand() { public CommandData getCurrentCommand() {
QueueCommand command = channel.attr(CommandsQueue.CURRENT_COMMAND).get(); QueueCommand command = channel.attr(CommandsQueue.CURRENT_COMMAND).get();
if (command instanceof CommandData) { if (command instanceof CommandData) {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
*/ */
package org.redisson.client; package org.redisson.client;


/**
*
* @author Nikita Koksharov
*
*/
public class RedisException extends RuntimeException { public class RedisException extends RuntimeException {


private static final long serialVersionUID = 3389820652701696154L; private static final long serialVersionUID = 3389820652701696154L;
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ private void refresh(RedisConnection connection, Channel channel) {


private void reattachBlockingQueue(RedisConnection connection, final CommandData<?, ?> commandData) { private void reattachBlockingQueue(RedisConnection connection, final CommandData<?, ?> commandData) {
if (commandData == null if (commandData == null
|| !commandData.isBlockingCommand()) { || !commandData.isBlockingCommand()
|| commandData.getPromise().isDone()) {
return; return;
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -89,16 +89,26 @@ public void operationComplete(Future<V> future) throws Exception {
l.countDown(); l.countDown();
} }
}); });
try {
l.await(); boolean interrupted = false;
} catch (InterruptedException e) { while (!future.isDone()) {
try {
l.await();
} catch (InterruptedException e) {
interrupted = true;
}
}

if (interrupted) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }

// commented out due to blocking issues up to 200 ms per minute for each thread // commented out due to blocking issues up to 200 ms per minute for each thread
// future.awaitUninterruptibly(); // future.awaitUninterruptibly();
if (future.isSuccess()) { if (future.isSuccess()) {
return future.getNow(); return future.getNow();
} }

throw convertException(future); throw convertException(future);
} }


Expand Down Expand Up @@ -509,9 +519,9 @@ private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final Red


details.getTimeout().cancel(); details.getTimeout().cancel();


int timeoutTime = connectionManager.getConfig().getTimeout(); long timeoutTime = connectionManager.getConfig().getTimeout();
if (QueueCommand.TIMEOUTLESS_COMMANDS.contains(details.getCommand().getName())) { if (QueueCommand.TIMEOUTLESS_COMMANDS.contains(details.getCommand().getName())) {
Integer popTimeout = Integer.valueOf(details.getParams()[details.getParams().length - 1].toString()); Long popTimeout = Long.valueOf(details.getParams()[details.getParams().length - 1].toString());
handleBlockingOperations(details, connection, popTimeout); handleBlockingOperations(details, connection, popTimeout);
if (popTimeout == 0) { if (popTimeout == 0) {
return; return;
Expand All @@ -521,7 +531,7 @@ private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final Red
timeoutTime += 1000; timeoutTime += 1000;
} }


final int timeoutAmount = timeoutTime; final long timeoutAmount = timeoutTime;
TimerTask timeoutTask = new TimerTask() { TimerTask timeoutTask = new TimerTask() {
@Override @Override
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
Expand All @@ -535,7 +545,7 @@ public void run(Timeout timeout) throws Exception {
details.setTimeout(timeout); details.setTimeout(timeout);
} }


private <R, V> void handleBlockingOperations(final AsyncDetails<V, R> details, final RedisConnection connection, Integer popTimeout) { private <R, V> void handleBlockingOperations(final AsyncDetails<V, R> details, final RedisConnection connection, Long popTimeout) {
final FutureListener<Boolean> listener = new FutureListener<Boolean>() { final FutureListener<Boolean> listener = new FutureListener<Boolean>() {
@Override @Override
public void operationComplete(Future<Boolean> future) throws Exception { public void operationComplete(Future<Boolean> future) throws Exception {
Expand All @@ -551,7 +561,7 @@ public void operationComplete(Future<Boolean> future) throws Exception {
scheduledFuture = connectionManager.getGroup().schedule(new Runnable() { scheduledFuture = connectionManager.getGroup().schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
// there is no re-connection was made // re-connection wasn't made
// and connection is still active // and connection is still active
if (orignalChannel == connection.getChannel() if (orignalChannel == connection.getChannel()
&& connection.isActive()) { && connection.isActive()) {
Expand Down Expand Up @@ -590,17 +600,6 @@ public void operationComplete(Future<R> future) throws Exception {
} }
}); });


details.getAttemptPromise().addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
if (future.isCancelled()) {
// command should be removed due to
// ConnectionWatchdog blockingQueue reconnection logic
connection.removeCurrentCommand();
}
}
});

synchronized (listener) { synchronized (listener) {
if (!details.getMainPromise().isDone()) { if (!details.getMainPromise().isDone()) {
connectionManager.getShutdownPromise().addListener(listener); connectionManager.getShutdownPromise().addListener(listener);
Expand Down Expand Up @@ -717,7 +716,7 @@ private <R, V> void checkAttemptFuture(final NodeSource source, final AsyncDetai
} }
((RedisClientResult)res).setRedisClient(addr); ((RedisClientResult)res).setRedisClient(addr);
} }
details.getMainPromise().setSuccess(res); details.getMainPromise().trySuccess(res);
} else { } else {
details.getMainPromise().tryFailure(future.cause()); details.getMainPromise().tryFailure(future.cause());
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -40,5 +40,9 @@ public void setAddFuture(Future<Boolean> addFuture) {
public Future<Boolean> getAddFuture() { public Future<Boolean> getAddFuture() {
return addFuture; return addFuture;
} }

public void doCancel() {
super.cancel(true);
}


} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@


import java.io.Serializable; import java.io.Serializable;


/**
*
* @author Nikita Koksharov
*
*/
public interface RRemoteServiceResponse extends Serializable { public interface RRemoteServiceResponse extends Serializable {


} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@
*/ */
public class RemoteServiceAck implements RRemoteServiceResponse, Serializable { public class RemoteServiceAck implements RRemoteServiceResponse, Serializable {


private static final long serialVersionUID = -6332680404562746984L;

} }
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.remote;

import java.io.Serializable;

/**
*
* @author Nikita Koksharov
*
*/
public class RemoteServiceCancelRequest implements Serializable {

private static final long serialVersionUID = -4800574267648904260L;

private boolean mayInterruptIfRunning;

public RemoteServiceCancelRequest() {
}

public RemoteServiceCancelRequest(boolean mayInterruptIfRunning) {
super();
this.mayInterruptIfRunning = mayInterruptIfRunning;
}

public boolean isMayInterruptIfRunning() {
return mayInterruptIfRunning;
}

}
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Copyright 2016 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.remote;

import java.io.Serializable;

/**
*
* @author Nikita Koksharov
*
*/
public class RemoteServiceCancelResponse implements RRemoteServiceResponse, Serializable {

private static final long serialVersionUID = -4356901222132702182L;

}
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
*/ */
package org.redisson.remote; package org.redisson.remote;


/**
*
* @author Nikita Koksharov
*
*/
public class RemoteServiceKey { public class RemoteServiceKey {


private final Class<?> serviceInterface; private final Class<?> serviceInterface;
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@


import java.lang.reflect.Method; import java.lang.reflect.Method;


/**
*
* @author Nikita Koksharov
*
*/
public class RemoteServiceMethod { public class RemoteServiceMethod {


private final Object bean; private final Object bean;
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@


import org.redisson.api.RemoteInvocationOptions; import org.redisson.api.RemoteInvocationOptions;


/**
*
* @author Nikita Koksharov
*
*/
public class RemoteServiceRequest implements Serializable { public class RemoteServiceRequest implements Serializable {


private static final long serialVersionUID = -1711385312384040075L;

private String requestId; private String requestId;
private String methodName; private String methodName;
private Object[] args; private Object[] args;
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@


import java.io.Serializable; import java.io.Serializable;


/**
*
* @author Nikita Koksharov
*
*/
public class RemoteServiceResponse implements RRemoteServiceResponse, Serializable { public class RemoteServiceResponse implements RRemoteServiceResponse, Serializable {


private static final long serialVersionUID = -1958922748139674253L;

private Object result; private Object result;
private Throwable error; private Throwable error;


Expand Down
Loading

0 comments on commit f8d9d51

Please sign in to comment.