Skip to content

Commit

Permalink
Fixed out of connections problem on high load during RemoteExecutorSe…
Browse files Browse the repository at this point in the history
…rvice/ExecutorService usage. #1087
  • Loading branch information
Nikita committed Oct 20, 2017
1 parent dcbfe3b commit 8ffedea
Show file tree
Hide file tree
Showing 22 changed files with 497 additions and 327 deletions.
281 changes: 186 additions & 95 deletions redisson/src/main/java/org/redisson/BaseRemoteService.java

Large diffs are not rendered by default.

20 changes: 14 additions & 6 deletions redisson/src/main/java/org/redisson/Redisson.java
Expand Up @@ -79,6 +79,7 @@
import org.redisson.liveobject.provider.ResolverProvider;
import org.redisson.misc.RedissonObjectFactory;
import org.redisson.pubsub.SemaphorePubSub;
import org.redisson.remote.ResponseEntry;

import io.netty.util.internal.PlatformDependent;

Expand Down Expand Up @@ -107,6 +108,7 @@ public class Redisson implements RedissonClient {
protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub();

protected final UUID id = UUID.randomUUID();
protected final ConcurrentMap<String, ResponseEntry> responses = PlatformDependent.newConcurrentHashMap();

protected Redisson(Config config) {
this.config = config;
Expand Down Expand Up @@ -373,7 +375,7 @@ public RScript getScript() {

@Override
public RScheduledExecutorService getExecutorService(String name) {
return new RedissonExecutorService(connectionManager.getCodec(), connectionManager.getCommandExecutor(), this, name, queueTransferService);
return new RedissonExecutorService(connectionManager.getCodec(), connectionManager.getCommandExecutor(), this, name, queueTransferService, responses, id.toString());
}

@Override
Expand All @@ -384,27 +386,33 @@ public RScheduledExecutorService getExecutorService(Codec codec, String name) {

@Override
public RScheduledExecutorService getExecutorService(String name, Codec codec) {
return new RedissonExecutorService(codec, connectionManager.getCommandExecutor(), this, name, queueTransferService);
return new RedissonExecutorService(codec, connectionManager.getCommandExecutor(), this, name, queueTransferService, responses, id.toString());
}

@Override
public RRemoteService getRemoteService() {
return new RedissonRemoteService(this, connectionManager.getCommandExecutor());
return getRemoteService("redisson_rs", connectionManager.getCodec());
}

@Override
public RRemoteService getRemoteService(String name) {
return new RedissonRemoteService(this, name, connectionManager.getCommandExecutor());
return getRemoteService(name, connectionManager.getCodec());
}

@Override
public RRemoteService getRemoteService(Codec codec) {
return new RedissonRemoteService(codec, this, connectionManager.getCommandExecutor());
return getRemoteService("redisson_rs", codec);
}

@Override
public RRemoteService getRemoteService(String name, Codec codec) {
return new RedissonRemoteService(codec, this, name, connectionManager.getCommandExecutor());
String executorId;
if (codec == connectionManager.getCodec()) {
executorId = id.toString();
} else {
executorId = id + ":" + name;
}
return new RedissonRemoteService(codec, this, name, connectionManager.getCommandExecutor(), executorId, responses);
}

@Override
Expand Down
47 changes: 29 additions & 18 deletions redisson/src/main/java/org/redisson/RedissonExecutorService.java
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -68,6 +69,7 @@
import org.redisson.misc.PromiseDelegator;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
import org.redisson.remote.ResponseEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -121,15 +123,24 @@ public class RedissonExecutorService implements RScheduledExecutorService {
private final String name;
private final String requestQueueName;
private final QueueTransferService queueTransferService;
private final String executorId;
private final ConcurrentMap<String, ResponseEntry> responses;

public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name, QueueTransferService queueTransferService) {
public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Redisson redisson, String name, QueueTransferService queueTransferService, ConcurrentMap<String, ResponseEntry> responses, String redissonId) {
super();
this.codec = codec;
this.commandExecutor = commandExecutor;
this.connectionManager = commandExecutor.getConnectionManager();
this.name = name;
this.redisson = redisson;
this.queueTransferService = queueTransferService;
this.responses = responses;

if (codec == connectionManager.getCodec()) {
this.executorId = redissonId;
} else {
this.executorId = redissonId + ":" + RemoteExecutorServiceAsync.class.getName() + ":" + name;
}

requestQueueName = "{" + name + ":"+ RemoteExecutorService.class.getName() + "}";
String objectName = requestQueueName;
Expand All @@ -149,15 +160,15 @@ public RedissonExecutorService(Codec codec, CommandExecutor commandExecutor, Red
remoteService = redisson.getRemoteService(name, codec);
workersTopic = redisson.getTopic(workersChannelName);

TasksService executorRemoteService = new TasksService(codec, redisson, name, commandExecutor);
TasksService executorRemoteService = new TasksService(codec, redisson, name, commandExecutor, executorId, responses);
executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
executorRemoteService.setTasksCounterName(tasksCounterName);
executorRemoteService.setStatusName(statusName);
executorRemoteService.setTasksName(tasksName);
asyncService = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().expectResultWithin(1, TimeUnit.DAYS));
asyncServiceWithoutResult = executorRemoteService.get(RemoteExecutorServiceAsync.class, RemoteInvocationOptions.defaults().noAck().noResult());

scheduledRemoteService = new ScheduledTasksService(codec, redisson, name, commandExecutor);
scheduledRemoteService = new ScheduledTasksService(codec, redisson, name, commandExecutor, executorId, responses);
scheduledRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
scheduledRemoteService.setTasksCounterName(tasksCounterName);
scheduledRemoteService.setStatusName(statusName);
Expand Down Expand Up @@ -226,7 +237,7 @@ protected RFuture<Long> pushTaskAsync() {
queueTransferService.schedule(getName(), task);

TasksRunnerService service =
new TasksRunnerService(commandExecutor, redisson, codec, requestQueueName);
new TasksRunnerService(commandExecutor, redisson, codec, requestQueueName, responses);
service.setStatusName(statusName);
service.setTasksCounterName(tasksCounterName);
service.setTasksName(tasksName);
Expand All @@ -249,7 +260,7 @@ public void execute(Runnable task) {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncServiceWithoutResult.executeRunnable(task.getClass().getName(), classBody, state);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncServiceWithoutResult.executeRunnable(task.getClass().getName(), classBody, state, null);
execute(promise);
}

Expand All @@ -265,7 +276,7 @@ public void execute(Runnable ...tasks) {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
asyncServiceWithoutResult.executeRunnable(task.getClass().getName(), classBody, state);
asyncServiceWithoutResult.executeRunnable(task.getClass().getName(), classBody, state, null);
}

List<Boolean> result = (List<Boolean>) executorRemoteService.executeAdd();
Expand All @@ -275,7 +286,7 @@ public void execute(Runnable ...tasks) {
}

private TasksBatchService createBatchService() {
TasksBatchService executorRemoteService = new TasksBatchService(codec, redisson, name, commandExecutor);
TasksBatchService executorRemoteService = new TasksBatchService(codec, redisson, name, commandExecutor, executorId, responses);
executorRemoteService.setTerminationTopicName(terminationTopic.getChannelNames().get(0));
executorRemoteService.setTasksCounterName(tasksCounterName);
executorRemoteService.setStatusName(statusName);
Expand Down Expand Up @@ -435,7 +446,7 @@ public <T> RExecutorFuture<T> submitAsync(Callable<T> task) {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<T> result = (RemotePromise<T>) asyncService.executeCallable(task.getClass().getName(), classBody, state);
RemotePromise<T> result = (RemotePromise<T>) asyncService.executeCallable(task.getClass().getName(), classBody, state, null);
addListener(result);
return new RedissonExecutorFuture<T>(result, result.getRequestId());
}
Expand All @@ -453,7 +464,7 @@ public RExecutorBatchFuture submit(Callable<?> ...tasks) {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(task.getClass().getName(), classBody, state);
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(task.getClass().getName(), classBody, state, null);
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise, promise.getRequestId());
result.add(executorFuture);
}
Expand All @@ -479,7 +490,7 @@ public RExecutorBatchFuture submitAsync(Callable<?> ...tasks) {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(task.getClass().getName(), classBody, state);
RemotePromise<?> promise = (RemotePromise<?>)asyncService.executeCallable(task.getClass().getName(), classBody, state, null);
RedissonExecutorFuture<?> executorFuture = new RedissonExecutorFuture(promise, promise.getRequestId());
result.add(executorFuture);
}
Expand Down Expand Up @@ -581,7 +592,7 @@ public RExecutorBatchFuture submit(Runnable ...tasks) {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(task.getClass().getName(), classBody, state);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(task.getClass().getName(), classBody, state, null);
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise, promise.getRequestId());
result.add(executorFuture);
}
Expand All @@ -607,7 +618,7 @@ public RExecutorBatchFuture submitAsync(Runnable ...tasks) {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(task.getClass().getName(), classBody, state);
RemotePromise<Void> promise = (RemotePromise<Void>)asyncService.executeRunnable(task.getClass().getName(), classBody, state, null);
RedissonExecutorFuture<Void> executorFuture = new RedissonExecutorFuture<Void>(promise, promise.getRequestId());
result.add(executorFuture);
}
Expand Down Expand Up @@ -651,7 +662,7 @@ public RExecutorFuture<?> submitAsync(Runnable task) {
check(task);
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(task.getClass().getName(), classBody, state);
RemotePromise<Void> result = (RemotePromise<Void>) asyncService.executeRunnable(task.getClass().getName(), classBody, state, null);
addListener(result);
return new RedissonExecutorFuture<Void>(result, result.getRequestId());
}
Expand All @@ -669,7 +680,7 @@ public RScheduledFuture<?> scheduleAsync(Runnable task, long delay, TimeUnit uni
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(delay);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledService.scheduleRunnable(task.getClass().getName(), classBody, state, startTime);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledService.scheduleRunnable(task.getClass().getName(), classBody, state, startTime, null);
addListener(result);
return new RedissonScheduledFuture<Void>(result, startTime);
}
Expand All @@ -687,7 +698,7 @@ public <V> RScheduledFuture<V> scheduleAsync(Callable<V> task, long delay, TimeU
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(delay);
RemotePromise<V> result = (RemotePromise<V>) asyncScheduledService.scheduleCallable(task.getClass().getName(), classBody, state, startTime);
RemotePromise<V> result = (RemotePromise<V>) asyncScheduledService.scheduleCallable(task.getClass().getName(), classBody, state, startTime, null);
addListener(result);
return new RedissonScheduledFuture<V>(result, startTime);
}
Expand All @@ -705,7 +716,7 @@ public RScheduledFuture<?> scheduleAtFixedRateAsync(Runnable task, long initialD
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.scheduleAtFixedRate(task.getClass().getName(), classBody, state, startTime, unit.toMillis(period));
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.scheduleAtFixedRate(task.getClass().getName(), classBody, state, startTime, unit.toMillis(period), executorId, null);
addListener(result);
return new RedissonScheduledFuture<Void>(result, startTime);
}
Expand All @@ -724,7 +735,7 @@ public RScheduledFuture<?> scheduleAsync(Runnable task, CronSchedule cronSchedul
byte[] state = encode(task);
final Date startDate = cronSchedule.getExpression().getNextValidTimeAfter(new Date());
long startTime = startDate.getTime();
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.schedule(task.getClass().getName(), classBody, state, startTime, cronSchedule.getExpression().getCronExpression());
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.schedule(task.getClass().getName(), classBody, state, startTime, cronSchedule.getExpression().getCronExpression(), executorId, null);
addListener(result);
return new RedissonScheduledFuture<Void>(result, startTime) {
public long getDelay(TimeUnit unit) {
Expand All @@ -746,7 +757,7 @@ public RScheduledFuture<?> scheduleWithFixedDelayAsync(Runnable task, long initi
byte[] classBody = getClassBody(task);
byte[] state = encode(task);
long startTime = System.currentTimeMillis() + unit.toMillis(initialDelay);
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.scheduleWithFixedDelay(task.getClass().getName(), classBody, state, startTime, unit.toMillis(delay));
RemotePromise<Void> result = (RemotePromise<Void>) asyncScheduledServiceAtFixed.scheduleWithFixedDelay(task.getClass().getName(), classBody, state, startTime, unit.toMillis(delay), executorId, null);
addListener(result);
return new RedissonScheduledFuture<Void>(result, startTime);
}
Expand Down

0 comments on commit 8ffedea

Please sign in to comment.