Skip to content

Commit

Permalink
Merge pull request ReactiveX#430 from zsxwing/issue-428
Browse files Browse the repository at this point in the history
Fixed issue ReactiveX#428
  • Loading branch information
benjchristensen committed Oct 16, 2013
2 parents be97188 + 2b3b733 commit 229914e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
5 changes: 4 additions & 1 deletion rxjava-core/src/main/java/rx/operators/ChunkedOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,11 @@ public void emitChunk(Chunk<T, C> chunk) {
return;
}

subscription.unsubscribe();
// Fixed issue 428.
// As unsubscribe will cancel the Future, and the currrent thread's interrupt status
// will be set. So we need to emit the chunk before unsubscribe.
super.emitChunk(chunk);
subscription.unsubscribe();
createChunk();
}

Expand Down
38 changes: 38 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
*/
package rx.operators;

import static org.junit.Assert.assertFalse;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.Before;
Expand All @@ -37,6 +40,7 @@
import rx.util.Opening;
import rx.util.Openings;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

Expand Down Expand Up @@ -631,6 +635,40 @@ public Subscription onSubscribe(Observer<? super Closing> observer) {
inOrder.verify(observer, Mockito.times(1)).onCompleted();
}

@Test
public void testLongTimeAction() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
LongTimeAction action = new LongTimeAction(latch);
Observable.from(1).buffer(10, TimeUnit.MILLISECONDS, 10)
.subscribe(action);
latch.await();
assertFalse(action.fail);
}

private static class LongTimeAction implements Action1<List<Integer>> {

CountDownLatch latch;
boolean fail = false;

public LongTimeAction(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void call(List<Integer> t1) {
try {
if (fail) {
return;
}
Thread.sleep(200);
} catch (InterruptedException e) {
fail = true;
} finally {
latch.countDown();
}
}
}

private List<String> list(String... args) {
List<String> list = new ArrayList<String>();
for (String arg : args) {
Expand Down

0 comments on commit 229914e

Please sign in to comment.