From 2b3b733a8968726e065717b1a1054cd1b2b3e853 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sat, 12 Oct 2013 15:49:21 +0800 Subject: [PATCH] Fixed issue #428 --- .../java/rx/operators/ChunkedOperation.java | 5 ++- .../java/rx/operators/OperationBuffer.java | 38 +++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java b/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java index 5cd0845ea56..d68501d1221 100644 --- a/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java +++ b/rxjava-core/src/main/java/rx/operators/ChunkedOperation.java @@ -187,8 +187,11 @@ public void emitChunk(Chunk chunk) { return; } - subscription.unsubscribe(); + // Fixed issue 428. + // As unsubscribe will cancel the Future, and the currrent thread's interrupt status + // will be set. So we need to emit the chunk before unsubscribe. super.emitChunk(chunk); + subscription.unsubscribe(); createChunk(); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java index 1cc559fb57a..898260d0efa 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java +++ b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java @@ -15,8 +15,11 @@ */ package rx.operators; +import static org.junit.Assert.assertFalse; + import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.junit.Before; @@ -37,6 +40,7 @@ import rx.util.Opening; import rx.util.Openings; import rx.util.functions.Action0; +import rx.util.functions.Action1; import rx.util.functions.Func0; import rx.util.functions.Func1; @@ -631,6 +635,40 @@ public Subscription onSubscribe(Observer observer) { inOrder.verify(observer, Mockito.times(1)).onCompleted(); } + @Test + public void testLongTimeAction() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + LongTimeAction action = new LongTimeAction(latch); + Observable.from(1).buffer(10, TimeUnit.MILLISECONDS, 10) + .subscribe(action); + latch.await(); + assertFalse(action.fail); + } + + private static class LongTimeAction implements Action1> { + + CountDownLatch latch; + boolean fail = false; + + public LongTimeAction(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void call(List t1) { + try { + if (fail) { + return; + } + Thread.sleep(200); + } catch (InterruptedException e) { + fail = true; + } finally { + latch.countDown(); + } + } + } + private List list(String... args) { List list = new ArrayList(); for (String arg : args) {