Skip to content

Commit

Permalink
Merge pull request ReactiveX#146 from benjchristensen/issue-87
Browse files Browse the repository at this point in the history
Merge of Pull ReactiveX#125 for Issue ReactiveX#87 Operator TakeWhile
  • Loading branch information
benjchristensen committed Feb 14, 2013
2 parents 80ee58a + a366ae9 commit d791e87
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@
(import rx.Observable))

;; still need to get this wired up in build.gradle to run as tests
; (-> (rx.Observable/toObservable [\"one\" \"two\" \"three\"]) (.take 2) (.subscribe (fn [arg] (println arg))))
; (-> (rx.Observable/toObservable ["one" "two" "three"]) (.take 2) (.subscribe (fn [arg] (println arg))))

; (-> (rx.Observable/toObservable [1 2 3]) (.takeWhile (fn [x i] (< x 2))) (.subscribe (fn [arg] (println arg))))
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,22 @@ def class ObservableTests {
verify(a, times(0)).received(3);
}

@Test
public void testTakeWhileViaGroovy() {
Observable.takeWhile(Observable.toObservable(1, 2, 3), { x -> x < 3}).subscribe({ result -> a.received(result)});
verify(a, times(1)).received(1);
verify(a, times(1)).received(2);
verify(a, times(0)).received(3);
}

@Test
public void testTakeWhileWithIndexViaGroovy() {
Observable.takeWhileWithIndex(Observable.toObservable(1, 2, 3), { x, i -> i < 2}).subscribe({ result -> a.received(result)});
verify(a, times(1)).received(1);
verify(a, times(1)).received(2);
verify(a, times(0)).received(3);
}

@Test
public void testToSortedList() {
new TestFactory().getNumbers().toSortedList().subscribe({ result -> a.received(result)});
Expand Down
92 changes: 92 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,57 @@ public static <T> Observable<T> takeLast(final Observable<T> items, final int co
return _create(OperationTakeLast.takeLast(items, count));
}

/**
* Returns a specified number of contiguous values from the start of an observable sequence.
*
* @param items
* @param predicate a function to test each source element for a condition
* @return
*/
public static <T> Observable<T> takeWhile(final Observable<T> items, Func1<T, Boolean> predicate) {
return create(OperationTake.takeWhile(items, predicate));
}

/**
* Returns a specified number of contiguous values from the start of an observable sequence.
*
* @param items
* @param predicate a function to test each source element for a condition
* @return
*/
public static <T> Observable<T> takeWhile(final Observable<T> items, Object predicate) {
final FuncN _f = Functions.from(predicate);

return takeWhile(items, new Func1<T, Boolean>() {
@Override
public Boolean call(T t) {
return (Boolean) _f.call(t);
}
});
}

/**
* Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values.
*
* @param items
* @param predicate a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
* @return
*/
public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Func2<T, Integer, Boolean> predicate) {
return create(OperationTake.takeWhileWithIndex(items, predicate));
}

public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Object predicate) {
final FuncN _f = Functions.from(predicate);

return create(OperationTake.takeWhileWithIndex(items, new Func2<T, Integer, Boolean>() {
@Override
public Boolean call(T t, Integer integer) {
return (Boolean) _f.call(t, integer);
}
}));
}

/**
* Returns an Observable that emits a single item, a list composed of all the items emitted by
* the source Observable.
Expand Down Expand Up @@ -2301,6 +2352,47 @@ public Observable<T> take(final int num) {
return take(this, num);
}


/**
* Returns an Observable that items emitted by the source Observable as long as a specified condition is true.
*
* @param predicate a function to test each source element for a condition
* @return
*/
public Observable<T> takeWhile(final Func1<T, Boolean> predicate) {
return takeWhile(this, predicate);
}

/**
* Returns a specified number of contiguous values from the start of an observable sequence.
*
* @param predicate a function to test each source element for a condition
* @return
*/
public Observable<T> takeWhile(final Object predicate) {
return takeWhile(this, predicate);
}

/**
* Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values.
*
* @param predicate a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
* @return
*/
public Observable<T> takeWhileWithIndex(final Func2<T, Integer, Boolean> predicate) {
return takeWhileWithIndex(this, predicate);
}

/**
* Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values.
*
* @param predicate a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
* @return
*/
public Observable<T> takeWhileWithIndex(final Object predicate) {
return takeWhileWithIndex(this, predicate);
}

/**
* Returns an Observable that emits the last <code>count</code> items emitted by the source
* Observable.
Expand Down
120 changes: 101 additions & 19 deletions rxjava-core/src/main/java/rx/operators/OperationTake.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2013 Netflix, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -28,33 +28,75 @@
import rx.Subscription;
import rx.util.AtomicObservableSubscription;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

/**
* Returns a specified number of contiguous values from the start of an observable sequence.
*
* @param <T>
*/
public final class OperationTake {

/**
* Returns a specified number of contiguous values from the start of an observable sequence.
*
*
* @param items
* @param num
* @return
*/
public static <T> Func1<Observer<T>, Subscription> take(final Observable<T> items, final int num) {
// wrap in a Watchbable so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take<T> rather than 1 handing both, which is not thread-safe.
return takeWhileWithIndex(items, OperationTake.<T>numPredicate(num));
}

/**
* Returns a specified number of contiguous values from the start of an observable sequence.
*
* @param items
* @param predicate a function to test each source element for a condition
* @return
*/
public static <T> Func1<Observer<T>, Subscription> takeWhile(final Observable<T> items, final Func1<T, Boolean> predicate) {
return takeWhileWithIndex(items, OperationTake.<T>skipIndex(predicate));
}

/**
* Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values.
*
* @param items
* @param predicate a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
* @return
*/
public static <T> Func1<Observer<T>, Subscription> takeWhileWithIndex(final Observable<T> items, final Func2<T, Integer, Boolean> predicate) {
// wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take<T> rather than 1 handing both, which is not thread-safe.
return new Func1<Observer<T>, Subscription>() {

@Override
public Subscription call(Observer<T> observer) {
return new Take<T>(items, num).call(observer);
return new TakeWhile<T>(items, predicate).call(observer);
}

};
}

private static <T> Func2<T, Integer, Boolean> numPredicate(final int num) {
return new Func2<T, Integer, Boolean>() {

@Override
public Boolean call(T input, Integer index) {
return index < num;
}

};
}

private static <T> Func2<T, Integer, Boolean> skipIndex(final Func1<T, Boolean> underlying) {
return new Func2<T, Integer, Boolean>() {
@Override
public Boolean call(T input, Integer index) {
return underlying.call(input);
}
};
}


/**
* This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads.
* <p>
Expand All @@ -63,29 +105,27 @@ public Subscription call(Observer<T> observer) {
* This should all be fine as long as it's kept as a private class and a new instance created from static factory method above.
* <p>
* Note how the take() factory method above protects us from a single instance being exposed with the Observable wrapper handling the subscribe flow.
*
*
* @param <T>
*/
private static class Take<T> implements Func1<Observer<T>, Subscription> {
private final int num;
private static class TakeWhile<T> implements Func1<Observer<T>, Subscription> {
private final AtomicInteger counter = new AtomicInteger();
private final Observable<T> items;
private final Func2<T, Integer, Boolean> predicate;
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();

Take(final Observable<T> items, final int num) {
this.num = num;
private TakeWhile(Observable<T> items, Func2<T, Integer, Boolean> predicate) {
this.items = items;
this.predicate = predicate;
}


@Override
public Subscription call(Observer<T> observer) {
return subscription.wrap(items.subscribe(new ItemObserver(observer)));
}

/**
* Used to subscribe to the 'items' Observable sequence and forward to the actualObserver up to 'num' count.
*/
private class ItemObserver implements Observer<T> {

private AtomicInteger counter = new AtomicInteger();
private final Observer<T> observer;

public ItemObserver(Observer<T> observer) {
Expand All @@ -104,7 +144,7 @@ public void onError(Exception e) {

@Override
public void onNext(T args) {
if (counter.getAndIncrement() < num) {
if (predicate.call(args, counter.getAndIncrement())) {
observer.onNext(args);
} else {
// this will work if the sequence is asynchronous, it will have no effect on a synchronous observable
Expand All @@ -118,6 +158,48 @@ public void onNext(T args) {

public static class UnitTest {



@Test
public void testTakeWhile1() {
Observable<Integer> w = Observable.toObservable(1, 2, 3);
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer input) {
return input < 3;
}
}));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
take.subscribe(aObserver);
verify(aObserver, times(1)).onNext(1);
verify(aObserver, times(1)).onNext(2);
verify(aObserver, never()).onNext(3);
verify(aObserver, never()).onError(any(Exception.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testTakeWhile2() {
Observable<String> w = Observable.toObservable("one", "two", "three");
Observable<String> take = Observable.create(takeWhileWithIndex(w, new Func2<String, Integer, Boolean>() {
@Override
public Boolean call(String input, Integer index) {
return index < 2;
}
}));

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
take.subscribe(aObserver);
verify(aObserver, times(1)).onNext("one");
verify(aObserver, times(1)).onNext("two");
verify(aObserver, never()).onNext("three");
verify(aObserver, never()).onError(any(Exception.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testTake1() {
Observable<String> w = Observable.toObservable("one", "two", "three");
Expand Down
3 changes: 1 addition & 2 deletions rxjava-core/src/main/java/rx/util/functions/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* <p>
* Language support is provided via implementations of {@link FunctionLanguageAdaptor}.
* <p>
* This class will dynamically look for known language adaptors on the classpath at startup or new ones can be registered using {@link #registerLanguageAdaptor(Class, FunctionLanguageAdaptor)}.
* This class will dynamically look for known language adaptors on the classpath at startup or new ones can be registered using {@link #registerLanguageAdaptor(Class[], FunctionLanguageAdaptor)}.
*/
public class Functions {

Expand Down Expand Up @@ -81,7 +81,6 @@ public static Collection<FunctionLanguageAdaptor> getRegisteredLanguageAdaptors(
* Utility method for determining the type of closure/function and executing it.
*
* @param function
* @param args
*/
@SuppressWarnings({ "rawtypes" })
public static FuncN from(final Object function) {
Expand Down

0 comments on commit d791e87

Please sign in to comment.