diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d5d8fb8297..f20a4ec5e6 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -38,6 +38,7 @@ import rx.operators.OperationDematerialize; import rx.operators.OperationDistinctUntilChanged; import rx.operators.OperationDistinct; +import rx.operators.OperationElementAt; import rx.operators.OperationFilter; import rx.operators.OperationFinally; import rx.operators.OperationFirstOrDefault; @@ -4186,5 +4187,45 @@ private boolean isInternalImplementation(Object o) { Package p = o.getClass().getPackage(); // it can be null return p != null && p.getName().startsWith("rx.operators"); } + + /** + * Returns the element at a specified index in a sequence. + * + * @param index + * The zero-based index of the element to retrieve. + * + * @return An observable sequence that produces the element at the specified + * position in the source sequence. + * + * @throws IndexOutOfBoundsException + * Index is greater than or equal to the number of elements in + * the source sequence. + * @throws IndexOutOfBoundsException + * Index is less than 0. + */ + public Observable elementAt(int index) { + return create(OperationElementAt.elementAt(this, index)); + } + /** + * Returns the element at a specified index in a sequence or the default + * value if the index is out of range. + * + * @param index + * The zero-based index of the element to retrieve. + * @param defaultValue + * The default value. + * + * @return An observable sequence that produces the element at the specified + * position in the source sequence, or the default value if the + * index is outside the bounds of the source sequence. + * + * @throws IndexOutOfBoundsException + * Index is less than 0. + */ + public Observable elementAtOrDefault(int index, T defaultValue) { + return create(OperationElementAt.elementAtOrDefault(this, index, + defaultValue)); + } + } diff --git a/rxjava-core/src/main/java/rx/operators/OperationElementAt.java b/rxjava-core/src/main/java/rx/operators/OperationElementAt.java new file mode 100644 index 0000000000..7585650e19 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationElementAt.java @@ -0,0 +1,238 @@ +package rx.operators; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.Iterator; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; + +/** + * Returns the element at a specified index in a sequence. + */ +public class OperationElementAt { + + /** + * Returns the element at a specified index in a sequence. + * + * @param source + * Observable sequence to return the element from. + * @param index + * The zero-based index of the element to retrieve. + * + * @return An observable sequence that produces the element at the specified + * position in the source sequence. + * + * @throws IndexOutOfBoundsException + * Index is greater than or equal to the number of elements in + * the source sequence. + * @throws IndexOutOfBoundsException + * Index is less than 0. + */ + public static OnSubscribeFunc elementAt( + Observable source, int index) { + return new ElementAt(source, index, null, false); + } + + /** + * Returns the element at a specified index in a sequence or the default + * value if the index is out of range. + * + * @param source + * Observable sequence to return the element from. + * @param index + * The zero-based index of the element to retrieve. + * @param defaultValue + * The default value. + * + * @return An observable sequence that produces the element at the specified + * position in the source sequence, or the default value if the + * index is outside the bounds of the source sequence. + * + * @throws IndexOutOfBoundsException + * Index is less than 0. + */ + public static OnSubscribeFunc elementAtOrDefault( + Observable source, int index, T defaultValue) { + return new ElementAt(source, index, defaultValue, true); + } + + private static class ElementAt implements OnSubscribeFunc { + + private final Observable source; + private final int index; + private final boolean hasDefault; + private final T defaultValue; + + private ElementAt(Observable source, int index, + T defaultValue, boolean hasDefault) { + this.source = source; + this.index = index; + this.defaultValue = defaultValue; + this.hasDefault = hasDefault; + } + + @Override + public Subscription onSubscribe(final Observer observer) { + final SafeObservableSubscription subscription = new SafeObservableSubscription(); + return subscription.wrap(source.subscribe(new Observer() { + + private AtomicInteger counter = new AtomicInteger(); + + @Override + public void onNext(T value) { + try { + int currentIndex = counter.getAndIncrement(); + if (currentIndex == index) { + observer.onNext(value); + observer.onCompleted(); + } else if (currentIndex > index) { + // this will work if the sequence is asynchronous, + // it will have no effect on a synchronous + // observable + subscription.unsubscribe(); + } + } catch (Throwable ex) { + observer.onError(ex); + // this will work if the sequence is asynchronous, it + // will have no effect on a synchronous observable + subscription.unsubscribe(); + } + + } + + @Override + public void onError(Throwable ex) { + observer.onError(ex); + } + + @Override + public void onCompleted() { + if (index < 0) { + observer.onError(new IndexOutOfBoundsException(index + + " is out of bounds")); + } else if (counter.get() <= index) { + if (hasDefault) { + observer.onNext(defaultValue); + observer.onCompleted(); + } else { + observer.onError(new IndexOutOfBoundsException( + index + " is out of bounds")); + } + } + } + })); + } + } + + public static class UnitTest { + + @Test + public void testElementAt() { + Observable w = Observable.from(1, 2); + Observable observable = Observable.create(elementAt(w, 1)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, never()).onNext(1); + verify(aObserver, times(1)).onNext(2); + verify(aObserver, never()).onError( + org.mockito.Matchers.any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testElementAtWithMinusIndex() { + Observable w = Observable.from(1, 2); + Observable observable = Observable + .create(elementAt(w, -1)); + + try { + Iterator iter = OperationToIterator + .toIterator(observable); + assertTrue(iter.hasNext()); + iter.next(); + fail("expect an IndexOutOfBoundsException when index is out of bounds"); + } catch (IndexOutOfBoundsException e) { + } + } + + @Test + public void testElementAtWithIndexOutOfBounds() + throws InterruptedException, ExecutionException { + Observable w = Observable.from(1, 2); + Observable observable = Observable.create(elementAt(w, 2)); + try { + Iterator iter = OperationToIterator + .toIterator(observable); + assertTrue(iter.hasNext()); + iter.next(); + fail("expect an IndexOutOfBoundsException when index is out of bounds"); + } catch (IndexOutOfBoundsException e) { + } + } + + @Test + public void testElementAtOrDefault() throws InterruptedException, + ExecutionException { + Observable w = Observable.from(1, 2); + Observable observable = Observable + .create(elementAtOrDefault(w, 1, 0)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, never()).onNext(1); + verify(aObserver, times(1)).onNext(2); + verify(aObserver, never()).onError( + org.mockito.Matchers.any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testElementAtOrDefaultWithIndexOutOfBounds() + throws InterruptedException, ExecutionException { + Observable w = Observable.from(1, 2); + Observable observable = Observable + .create(elementAtOrDefault(w, 2, 0)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, never()).onNext(1); + verify(aObserver, never()).onNext(2); + verify(aObserver, times(1)).onNext(0); + verify(aObserver, never()).onError( + org.mockito.Matchers.any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testElementAtOrDefaultWithMinusIndex() { + Observable w = Observable.from(1, 2); + Observable observable = Observable + .create(elementAtOrDefault(w, -1, 0)); + + try { + Iterator iter = OperationToIterator + .toIterator(observable); + assertTrue(iter.hasNext()); + iter.next(); + fail("expect an IndexOutOfBoundsException when index is out of bounds"); + } catch (IndexOutOfBoundsException e) { + } + } + } +}