Skip to content

Commit

Permalink
Fix scheduled executor leak
Browse files Browse the repository at this point in the history
The workQueue inside LoggingScheduledExecutor contains wrapped
futures of scheduled tasks. This is because we decorate tasks in
LoggingScheduledExecutor.decorateTask.
When we were cancelling the task, we delegated cancellation to the
delegate which tries to remove itself from the workQueue. The
workQueue though doesn't contain the delegate but rather contains the
decorated delegate. This causes the remove to fail without any
message and causes the leak.
To fix this, after we call cancel() on the delegate, we manually
remove the decorator from the workQueue.

Fixes : hazelcast#11221
  • Loading branch information
Matko Medenjak committed Aug 28, 2017
1 parent 53f1d38 commit 24a06fa
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
Expand Up @@ -66,12 +66,12 @@ public LoggingScheduledExecutor(ILogger logger, int corePoolSize, ThreadFactory

@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return new LoggingDelegatingFuture<V>(runnable, task);
return new LoggingDelegatingFuture<V>(runnable, task, this);
}

@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
return new LoggingDelegatingFuture<V>(callable, task);
return new LoggingDelegatingFuture<V>(callable, task, this);
}

@Override
Expand Down Expand Up @@ -117,10 +117,12 @@ static class LoggingDelegatingFuture<V> implements RunnableScheduledFuture<V> {

private final Object task;
private final RunnableScheduledFuture<V> delegate;
private final LoggingScheduledExecutor executor;

LoggingDelegatingFuture(Object task, RunnableScheduledFuture<V> delegate) {
LoggingDelegatingFuture(Object task, RunnableScheduledFuture<V> delegate, LoggingScheduledExecutor executor) {
this.task = task;
this.delegate = delegate;
this.executor = executor;
}

@Override
Expand Down Expand Up @@ -162,7 +164,11 @@ public int hashCode() {

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return delegate.cancel(mayInterruptIfRunning);
boolean cancelled = delegate.cancel(mayInterruptIfRunning);
if (cancelled) {
executor.remove(this);
}
return cancelled;
}

@Override
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
Expand All @@ -41,6 +42,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;

Expand Down Expand Up @@ -72,6 +74,27 @@ public void tearDown() throws Exception {
}
}

@Test
public void no_remaining_task_after_cancel() throws Exception {
executor = new LoggingScheduledExecutor(logger, 1, factory);

for (int i = 0; i < 10; i++) {
Future<Integer> future = executor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
TimeUnit.HOURS.sleep(1);
return null;
}
});

future.cancel(true);
}

BlockingQueue<Runnable> workQueue = ((LoggingScheduledExecutor) executor).getQueue();

assertEquals(0, workQueue.size());
}

@Test
public void testConstructor_withRejectedExecutionHandler() {
RejectedExecutionHandler handler = new RejectedExecutionHandler() {
Expand Down

0 comments on commit 24a06fa

Please sign in to comment.