Skip to content

Commit

Permalink
Merge pull request ReactiveX#197 from thegeez/take-while
Browse files Browse the repository at this point in the history
TakeWhile observables do not properly complete
  • Loading branch information
benjchristensen committed Mar 26, 2013
2 parents 178e90a + 933f527 commit 7aee2bd
Showing 1 changed file with 34 additions and 2 deletions.
36 changes: 34 additions & 2 deletions rxjava-core/src/main/java/rx/operators/OperationTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import rx.util.AtomicObservableSubscription;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

import rx.subjects.Subject;
/**
* Returns a specified number of contiguous values from the start of an observable sequence.
*/
Expand Down Expand Up @@ -147,6 +147,7 @@ public void onNext(T args) {
if (predicate.call(args, counter.getAndIncrement())) {
observer.onNext(args);
} else {
observer.onCompleted();
// this will work if the sequence is asynchronous, it will have no effect on a synchronous observable
subscription.unsubscribe();
}
Expand Down Expand Up @@ -178,6 +179,37 @@ public Boolean call(Integer input) {
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testTakeWhileOnSubject1() {
Subject<Integer> s = Subject.create();
Observable<Integer> w = (Observable<Integer>)s;
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer input) {
return input < 3;
}
}));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
take.subscribe(aObserver);

s.onNext(1);
s.onNext(2);
s.onNext(3);
s.onNext(4);
s.onNext(5);
s.onCompleted();

verify(aObserver, times(1)).onNext(1);
verify(aObserver, times(1)).onNext(2);
verify(aObserver, never()).onNext(3);
verify(aObserver, never()).onNext(4);
verify(aObserver, never()).onNext(5);
verify(aObserver, never()).onError(any(Exception.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testTakeWhile2() {
Observable<String> w = Observable.toObservable("one", "two", "three");
Expand Down Expand Up @@ -293,4 +325,4 @@ public void run() {
}
}

}
}

0 comments on commit 7aee2bd

Please sign in to comment.