Skip to content

Commit

Permalink
Scheduled tasks memory consumption reduced. #1158
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Dec 7, 2017
1 parent 6d39a91 commit a5c26a1
Show file tree
Hide file tree
Showing 11 changed files with 310 additions and 210 deletions.
136 changes: 82 additions & 54 deletions redisson/src/main/java/org/redisson/BaseRemoteService.java

Large diffs are not rendered by default.

Expand Up @@ -69,6 +69,7 @@
import org.redisson.misc.PromiseDelegator;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -221,16 +222,15 @@ protected RFuture<Long> pushTaskAsync() {
"local expiredTaskIds = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
+ "if #expiredTaskIds > 0 then "
+ "redis.call('zrem', KEYS[2], unpack(expiredTaskIds));"
+ "local expiredTasks = redis.call('hmget', KEYS[3], unpack(expiredTaskIds));"
+ "redis.call('rpush', KEYS[1], unpack(expiredTasks));"
+ "redis.call('rpush', KEYS[1], unpack(expiredTaskIds));"
+ "end; "
// get startTime from scheduler queue head task
+ "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
+ "if v[1] ~= nil then "
+ "return v[2]; "
+ "end "
+ "return nil;",
Arrays.<Object>asList(requestQueueName, schedulerQueueName, tasksName),
Arrays.<Object>asList(requestQueueName, schedulerQueueName),
System.currentTimeMillis(), 100);
}
};
Expand Down Expand Up @@ -769,7 +769,7 @@ public boolean cancelScheduledTask(String taskId) {

@Override
public boolean cancelTask(String taskId) {
RFuture<Boolean> scheduledFuture = scheduledRemoteService.cancelExecutionAsync(taskId);
RFuture<Boolean> scheduledFuture = scheduledRemoteService.cancelExecutionAsync(new RequestId(taskId));
return commandExecutor.get(scheduledFuture);
}

Expand Down
145 changes: 84 additions & 61 deletions redisson/src/main/java/org/redisson/RedissonRemoteService.java
Expand Up @@ -33,7 +33,9 @@
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.codec.CompositeCodec;
import org.redisson.command.CommandExecutor;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
Expand All @@ -45,6 +47,7 @@
import org.redisson.remote.RemoteServiceMethod;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RemoteServiceResponse;
import org.redisson.remote.RequestId;
import org.redisson.remote.ResponseEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -63,7 +66,7 @@ public class RedissonRemoteService extends BaseRemoteService implements RRemoteS
private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class);

private final Map<RemoteServiceKey, RemoteServiceMethod> beans = PlatformDependent.newConcurrentHashMap();
private final Map<Class<?>, Set<RFuture<RemoteServiceRequest>>> futures = PlatformDependent.newConcurrentHashMap();
private final Map<Class<?>, Set<RFuture<String>>> futures = PlatformDependent.newConcurrentHashMap();

public RedissonRemoteService(Codec codec, RedissonClient redisson, String name, CommandExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
super(codec, redisson, name, commandExecutor, executorId, responses);
Expand All @@ -81,19 +84,19 @@ public <T> void deregister(Class<T> remoteInterface) {
beans.remove(key);
}

Set<RFuture<RemoteServiceRequest>> removedFutures = futures.remove(remoteInterface);
Set<RFuture<String>> removedFutures = futures.remove(remoteInterface);
if (removedFutures == null) {
return;
}

for (RFuture<RemoteServiceRequest> future : removedFutures) {
for (RFuture<String> future : removedFutures) {
future.cancel(false);
}
}

@Override
public int getFreeWorkers(Class<?> remoteInterface) {
Set<RFuture<RemoteServiceRequest>> futuresSet = futures.get(remoteInterface);
Set<RFuture<String>> futuresSet = futures.get(remoteInterface);
return futuresSet.size();
}

Expand All @@ -115,28 +118,28 @@ public <T> void register(Class<T> remoteInterface, T object, int workers, Execut
}
}

Set<RFuture<RemoteServiceRequest>> values = Collections.newSetFromMap(PlatformDependent.<RFuture<RemoteServiceRequest>, Boolean>newConcurrentHashMap());
Set<RFuture<String>> values = Collections.newSetFromMap(PlatformDependent.<RFuture<String>, Boolean>newConcurrentHashMap());
futures.put(remoteInterface, values);

String requestQueueName = getRequestQueueName(remoteInterface);
RBlockingQueue<RemoteServiceRequest> requestQueue = redisson.getBlockingQueue(requestQueueName, codec);
RBlockingQueue<String> requestQueue = redisson.getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
for (int i = 0; i < workers; i++) {
subscribe(remoteInterface, requestQueue, executor);
}
}

private <T> void subscribe(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue,
private <T> void subscribe(final Class<T> remoteInterface, final RBlockingQueue<String> requestQueue,
final ExecutorService executor) {
Set<RFuture<RemoteServiceRequest>> futuresSet = futures.get(remoteInterface);
Set<RFuture<String>> futuresSet = futures.get(remoteInterface);
if (futuresSet == null) {
return;
}
final RFuture<RemoteServiceRequest> take = requestQueue.takeAsync();
final RFuture<String> take = requestQueue.takeAsync();
futuresSet.add(take);
take.addListener(new FutureListener<RemoteServiceRequest>() {
take.addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<RemoteServiceRequest> future) throws Exception {
Set<RFuture<RemoteServiceRequest>> futuresSet = futures.get(remoteInterface);
public void operationComplete(Future<String> future) throws Exception {
Set<RFuture<String>> futuresSet = futures.get(remoteInterface);
if (futuresSet == null) {
return;
}
Expand All @@ -156,63 +159,83 @@ public void operationComplete(Future<RemoteServiceRequest> future) throws Except
// https://github.com/mrniko/redisson/issues/493
// subscribe(remoteInterface, requestQueue);

final RemoteServiceRequest request = future.getNow();
// check the ack only if expected
if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request
.getOptions().getAckTimeoutInMillis()) {
log.debug("request: {} has been skipped due to ackTimeout");
// re-subscribe after a skipped ackTimeout
subscribe(remoteInterface, requestQueue, executor);
return;
}
final String requestId = future.getNow();
RMap<String, RemoteServiceRequest> tasks = redisson.getMap(requestQueue.getName() + ":tasks", new CompositeCodec(StringCodec.INSTANCE, codec, codec));
RFuture<RemoteServiceRequest> taskFuture = tasks.getAsync(requestId);
taskFuture.addListener(new FutureListener<RemoteServiceRequest>() {

@Override
public void operationComplete(Future<RemoteServiceRequest> future) throws Exception {
if (!future.isSuccess()) {
if (future.cause() instanceof RedissonShutdownException) {
return;
}
log.error("Can't process the remote service request with id " + requestId, future.cause());
// re-subscribe after a failed takeAsync
subscribe(remoteInterface, requestQueue, executor);
return;
}

final RemoteServiceRequest request = future.getNow();
// check the ack only if expected
if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request
.getOptions().getAckTimeoutInMillis()) {
log.debug("request: {} has been skipped due to ackTimeout");
// re-subscribe after a skipped ackTimeout
subscribe(remoteInterface, requestQueue, executor);
return;
}

final String responseName = getResponseQueueName(request.getExecutorId());
final String responseName = getResponseQueueName(request.getExecutorId());

// send the ack only if expected
if (request.getOptions().isAckExpected()) {
String ackName = getAckName(request.getId());
RFuture<Boolean> ackClientsFuture = commandExecutor.evalWriteAsync(responseName,
LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[2]);"
+ "redis.call('rpush', KEYS[2], ARGV[1]);"
// + "redis.call('pexpire', KEYS[2], ARGV[2]);"
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(ackName, responseName),
encode(new RemoteServiceAck(request.getId())), request.getOptions().getAckTimeoutInMillis());
// send the ack only if expected
if (request.getOptions().isAckExpected()) {
String ackName = getAckName(request.getId());
RFuture<Boolean> ackClientsFuture = commandExecutor.evalWriteAsync(responseName,
LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if redis.call('setnx', KEYS[1], 1) == 1 then "
+ "redis.call('pexpire', KEYS[1], ARGV[2]);"
+ "redis.call('rpush', KEYS[2], ARGV[1]);"
// + "redis.call('pexpire', KEYS[2], ARGV[2]);"
+ "return 1;"
+ "end;"
+ "return 0;",
Arrays.<Object>asList(ackName, responseName),
encode(new RemoteServiceAck(request.getId())), request.getOptions().getAckTimeoutInMillis());

ackClientsFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
if (future.cause() instanceof RedissonShutdownException) {
return;
}
log.error("Can't send ack for request: " + request, future.cause());
// re-subscribe after a failed send (ack)
subscribe(remoteInterface, requestQueue, executor);
return;
}
ackClientsFuture.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
if (future.cause() instanceof RedissonShutdownException) {
return;
}
log.error("Can't send ack for request: " + request, future.cause());
// re-subscribe after a failed send (ack)
subscribe(remoteInterface, requestQueue, executor);
return;
}

if (!future.getNow()) {
subscribe(remoteInterface, requestQueue, executor);
return;
}
if (!future.getNow()) {
subscribe(remoteInterface, requestQueue, executor);
return;
}

executeMethod(remoteInterface, requestQueue, executor, request);
}
});
} else {
executeMethod(remoteInterface, requestQueue, executor, request);
}
executeMethod(remoteInterface, requestQueue, executor, request);
}
});
} else {
executeMethod(remoteInterface, requestQueue, executor, request);
}
}
});

}

});
}

private <T> void executeMethod(final Class<T> remoteInterface, final RBlockingQueue<RemoteServiceRequest> requestQueue,
private <T> void executeMethod(final Class<T> remoteInterface, final RBlockingQueue<String> requestQueue,
final ExecutorService executor, final RemoteServiceRequest request) {
final RemoteServiceMethod method = beans.get(new RemoteServiceKey(remoteInterface, request.getMethodName(), request.getSignatures()));
final String responseName = getResponseQueueName(request.getExecutorId());
Expand All @@ -221,7 +244,7 @@ private <T> void executeMethod(final Class<T> remoteInterface, final RBlockingQu
final AtomicReference<RRemoteServiceResponse> responseHolder = new AtomicReference<RRemoteServiceResponse>();

final RPromise<RemoteServiceCancelRequest> cancelRequestFuture = new RedissonPromise<RemoteServiceCancelRequest>();
scheduleCheck(cancelRequestMapName, request.getId(), cancelRequestFuture);
scheduleCheck(cancelRequestMapName, new RequestId(request.getId()), cancelRequestFuture);

final java.util.concurrent.Future<?> submitFuture = executor.submit(new Runnable() {
@Override
Expand Down Expand Up @@ -257,7 +280,7 @@ public void operationComplete(Future<RemoteServiceCancelRequest> future) throws
}

private <T> void invokeMethod(final Class<T> remoteInterface,
final RBlockingQueue<RemoteServiceRequest> requestQueue, final RemoteServiceRequest request,
final RBlockingQueue<String> requestQueue, final RemoteServiceRequest request,
RemoteServiceMethod method, String responseName, final ExecutorService executor,
RFuture<RemoteServiceCancelRequest> cancelRequestFuture, final AtomicReference<RRemoteServiceResponse> responseHolder) {
try {
Expand Down
Expand Up @@ -18,6 +18,7 @@
import org.redisson.api.RExecutorFuture;
import org.redisson.misc.PromiseDelegator;
import org.redisson.misc.RPromise;
import org.redisson.remote.RequestId;

/**
*
Expand All @@ -27,16 +28,16 @@
*/
public class RedissonExecutorFuture<V> extends PromiseDelegator<V> implements RExecutorFuture<V> {

private final String taskId;
private final RequestId taskId;

public RedissonExecutorFuture(RPromise<V> promise, String taskId) {
public RedissonExecutorFuture(RPromise<V> promise, RequestId taskId) {
super(promise);
this.taskId = taskId;
}

@Override
public String getTaskId() {
return taskId;
return taskId.toString();
}

}
Expand Up @@ -20,6 +20,7 @@

import org.redisson.api.RScheduledFuture;
import org.redisson.misc.PromiseDelegator;
import org.redisson.remote.RequestId;

/**
*
Expand All @@ -30,7 +31,7 @@
public class RedissonScheduledFuture<V> extends PromiseDelegator<V> implements RScheduledFuture<V> {

private final long scheduledExecutionTime;
private final String taskId;
private final RequestId taskId;

public RedissonScheduledFuture(RemotePromise<V> promise, long scheduledExecutionTime) {
super(promise);
Expand Down Expand Up @@ -62,7 +63,7 @@ public long getDelay(TimeUnit unit) {

@Override
public String getTaskId() {
return taskId;
return taskId.toString();
}

}
13 changes: 10 additions & 3 deletions redisson/src/main/java/org/redisson/executor/RemotePromise.java
Expand Up @@ -17,6 +17,7 @@

import org.redisson.api.RFuture;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.RequestId;

/**
*
Expand All @@ -25,15 +26,21 @@
*/
public class RemotePromise<T> extends RedissonPromise<T> {

private String requestId;
private final Object param;
private final RequestId requestId;
private RFuture<Boolean> addFuture;

public RemotePromise(String requestId) {
public RemotePromise(RequestId requestId, Object param) {
super();
this.requestId = requestId;
this.param = param;
}

public String getRequestId() {
public <P> P getParam() {
return (P) param;
}

public RequestId getRequestId() {
return requestId;
}

Expand Down

0 comments on commit a5c26a1

Please sign in to comment.