Skip to content

Commit

Permalink
fix #342 Fix fusion request handling for SubscribeOnValue and add THR…
Browse files Browse the repository at this point in the history
…EAD_BARRIER to flatMap/mergeSequential
  • Loading branch information
Stephane Maldini committed Jan 9, 2017
1 parent 4bd312f commit 74abd03
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/main/java/reactor/core/publisher/FluxFlatMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ public void onSubscribe(Subscription s) {
if (s instanceof Fuseable.QueueSubscription) {
@SuppressWarnings("unchecked") Fuseable.QueueSubscription<R> f =
(Fuseable.QueueSubscription<R>) s;
int m = f.requestFusion(Fuseable.ANY);
int m = f.requestFusion(Fuseable.ANY | Fuseable.THREAD_BARRIER);
if (m == Fuseable.SYNC) {
sourceMode = Fuseable.SYNC;
queue = f;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ public void onSubscribe(Subscription s) {
@SuppressWarnings("unchecked")
QueueSubscription<R> qs = (QueueSubscription<R>) s;

int m = qs.requestFusion(Fuseable.ANY);
int m = qs.requestFusion(Fuseable.ANY | Fuseable.THREAD_BARRIER);
if (m == Fuseable.SYNC) {
fusionMode = m;
queue = qs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public int requestFusion(int requestedMode) {
fusionState = NO_VALUE;
return Fuseable.ASYNC;
}
return Fuseable.ASYNC;
return Fuseable.NONE;
}

@Override
Expand Down Expand Up @@ -298,10 +298,7 @@ public Object downstream() {

@Override
public int requestFusion(int requestedMode) {
if ((requestedMode & Fuseable.ASYNC) != 0) {
return Fuseable.ASYNC;
}
return Fuseable.ASYNC;
return requestedMode & Fuseable.ASYNC;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,32 @@
package reactor.core.publisher;

import org.junit.Test;
import reactor.core.Exceptions;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

public class FluxSubscribeOnValueTest {

@Test
public void normal() {
public void testSubscribeOnValueFusion() {

StepVerifier.create(Flux.range(1, 100)
.flatMap(f -> Flux.just(f)
.subscribeOn(Schedulers.parallel())
.log()
.map(this::slow)))
.expectNextCount(100)
.verifyComplete();

}

int slow(int slow){
try {
Thread.sleep(10);
return slow;
}
catch (InterruptedException e) {
throw Exceptions.bubble(e);
}
}
}

0 comments on commit 74abd03

Please sign in to comment.