Skip to content

Commit

Permalink
fix #1142 Dangling thread when calling WorkQueueProcessor.forceShutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Mar 29, 2018
1 parent 63debe7 commit c431c8b
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,6 @@ boolean isRunning() {
processor.ringBuffer.getAsLong() > sequence.getAsLong());
}


/**
* It is ok to have another thread rerun this method after a halt().
*/
Expand All @@ -474,12 +473,17 @@ public void run() {
if(!running.get()){
return;
}
if(processor.terminated == 1 && processor.ringBuffer.getAsLong() == -1L) {
if (processor.error != null) {
subscriber.onError(processor.error);
if(processor.terminated == SHUTDOWN) {
if (processor.ringBuffer.getAsLong() == -1L) {
if (processor.error != null) {
subscriber.onError(processor.error);
return;
}
subscriber.onComplete();
return;
}
subscriber.onComplete();
}
else if (processor.terminated == FORCED_SHUTDOWN) {
return;
}
}
Expand All @@ -499,7 +503,7 @@ public void run() {
do {
nextSequence = processor.workSequence.getAsLong() + 1L;
while ((!unbounded && pendingRequest.getAsLong() == 0L)) {
if (!isRunning()) {
if (!isRunning() || processor.isTerminated()) {
WaitStrategy.alert();
}
LockSupport.parkNanos(1L);
Expand Down Expand Up @@ -536,6 +540,9 @@ public void run() {

}
catch (InterruptedException | RuntimeException ce) {
if (ce instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
if (Exceptions.isCancel(ce)){
reschedule(event);
break;
Expand All @@ -548,7 +555,7 @@ public void run() {
if (!running.get()) {
break;
}
if(processor.terminated == 1) {
if(processor.terminated == SHUTDOWN) {
if (processor.error != null) {
processedSequence = true;
subscriber.onError(processor.error);
Expand All @@ -560,6 +567,9 @@ public void run() {
break;
}
}
else if (processor.terminated == FORCED_SHUTDOWN) {
break;
}
//processedSequence = true;
//continue event-loop

Expand Down Expand Up @@ -639,6 +649,7 @@ boolean replay(final boolean unbounded) {
}
catch (InterruptedException e) {
running.set(false);
Thread.currentThread().interrupt();
return true;
}
finally {
Expand All @@ -664,7 +675,7 @@ boolean reschedule(@Nullable Slot<T> event) {
void readNextEvent(final boolean unbounded) {
//pause until request
while ((!unbounded && getAndSub(pendingRequest, 1L) == 0L)) {
if (!isRunning()) {
if (!isRunning() || processor.isTerminated()) {
WaitStrategy.alert();
}
//Todo Use WaitStrategy?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
Expand All @@ -46,6 +47,7 @@
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
Expand Down Expand Up @@ -1528,4 +1530,29 @@ public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
}

@Test
public void testForceShutdownAfterShutdown() throws InterruptedException {
WorkQueueProcessor<String> processor = WorkQueueProcessor.<String>builder()
.name("processor").bufferSize(4)
.waitStrategy(WaitStrategy.phasedOffLiteLock(200, 100, TimeUnit.MILLISECONDS)) //eliminate the waitstrategy diff
.build();
Publisher<String> publisher = Flux.fromArray(new String[] { "1", "2", "3", "4", "5" });
publisher.subscribe(processor);

AssertSubscriber<String> subscriber = AssertSubscriber.create(0);
processor.subscribe(subscriber);

subscriber.request(1);

Thread.sleep(250);

processor.shutdown();

assertFalse(processor.awaitAndShutdown(Duration.ofMillis(400)));

processor.forceShutdown();

assertTrue(processor.awaitAndShutdown(Duration.ofMillis(400)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ public class TopicProcessorVerification extends AbstractProcessorVerification {

@Override
public Processor<Long, Long> createIdentityProcessor(int bufferSize) {
return TopicProcessor.<Long>builder().name("rb-async")
.bufferSize(bufferSize)
.build();
return TopicProcessor.<Long>builder()
.name("topicProcessorVerification")
.bufferSize(bufferSize)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ public class WorkQueueProcessorVerification extends AbstractProcessorVerificatio

@Override
public Processor<Long, Long> createIdentityProcessor(int bufferSize) {
return WorkQueueProcessor.<Long>builder().name("rb-work").bufferSize(bufferSize).build();
return WorkQueueProcessor.<Long>builder()
.name("workQueueProcessorVerification")
.bufferSize(bufferSize)
.build();
}

@Override
public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo()
throws Throwable {
throw new SkipException("WorkQueueProcessor cannot do that given its " +
"distributing nature");
throw new SkipException("WorkQueueProcessor cannot do that given its distributing nature");
}
}

0 comments on commit c431c8b

Please sign in to comment.