Skip to content

Commit

Permalink
Implemented the 'elementAt' and 'elementAtOrDefault' operators. see R…
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Sep 18, 2013
1 parent 00d7c3b commit 5df8e3c
Show file tree
Hide file tree
Showing 2 changed files with 279 additions and 0 deletions.
41 changes: 41 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import rx.operators.OperationDematerialize;
import rx.operators.OperationDistinctUntilChanged;
import rx.operators.OperationDistinct;
import rx.operators.OperationElementAt;
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationFirstOrDefault;
Expand Down Expand Up @@ -4186,5 +4187,45 @@ private boolean isInternalImplementation(Object o) {
Package p = o.getClass().getPackage(); // it can be null
return p != null && p.getName().startsWith("rx.operators");
}

/**
* Returns the element at a specified index in a sequence.
*
* @param index
* The zero-based index of the element to retrieve.
*
* @return An observable sequence that produces the element at the specified
* position in the source sequence.
*
* @throws IndexOutOfBoundsException
* Index is greater than or equal to the number of elements in
* the source sequence.
* @throws IndexOutOfBoundsException
* Index is less than 0.
*/
public Observable<T> elementAt(int index) {
return create(OperationElementAt.elementAt(this, index));
}

/**
* Returns the element at a specified index in a sequence or the default
* value if the index is out of range.
*
* @param index
* The zero-based index of the element to retrieve.
* @param defaultValue
* The default value.
*
* @return An observable sequence that produces the element at the specified
* position in the source sequence, or the default value if the
* index is outside the bounds of the source sequence.
*
* @throws IndexOutOfBoundsException
* Index is less than 0.
*/
public Observable<T> elementAtOrDefault(int index, T defaultValue) {
return create(OperationElementAt.elementAtOrDefault(this, index,
defaultValue));
}

}
238 changes: 238 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationElementAt.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package rx.operators;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;

/**
* Returns the element at a specified index in a sequence.
*/
public class OperationElementAt {

/**
* Returns the element at a specified index in a sequence.
*
* @param source
* Observable sequence to return the element from.
* @param index
* The zero-based index of the element to retrieve.
*
* @return An observable sequence that produces the element at the specified
* position in the source sequence.
*
* @throws IndexOutOfBoundsException
* Index is greater than or equal to the number of elements in
* the source sequence.
* @throws IndexOutOfBoundsException
* Index is less than 0.
*/
public static <T> OnSubscribeFunc<T> elementAt(
Observable<? extends T> source, int index) {
return new ElementAt<T>(source, index, null, false);
}

/**
* Returns the element at a specified index in a sequence or the default
* value if the index is out of range.
*
* @param source
* Observable sequence to return the element from.
* @param index
* The zero-based index of the element to retrieve.
* @param defaultValue
* The default value.
*
* @return An observable sequence that produces the element at the specified
* position in the source sequence, or the default value if the
* index is outside the bounds of the source sequence.
*
* @throws IndexOutOfBoundsException
* Index is less than 0.
*/
public static <T> OnSubscribeFunc<T> elementAtOrDefault(
Observable<? extends T> source, int index, T defaultValue) {
return new ElementAt<T>(source, index, defaultValue, true);
}

private static class ElementAt<T> implements OnSubscribeFunc<T> {

private final Observable<? extends T> source;
private final int index;
private final boolean hasDefault;
private final T defaultValue;

private ElementAt(Observable<? extends T> source, int index,
T defaultValue, boolean hasDefault) {
this.source = source;
this.index = index;
this.defaultValue = defaultValue;
this.hasDefault = hasDefault;
}

@Override
public Subscription onSubscribe(final Observer<? super T> observer) {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
return subscription.wrap(source.subscribe(new Observer<T>() {

private AtomicInteger counter = new AtomicInteger();

@Override
public void onNext(T value) {
try {
int currentIndex = counter.getAndIncrement();
if (currentIndex == index) {
observer.onNext(value);
observer.onCompleted();
} else if (currentIndex > index) {
// this will work if the sequence is asynchronous,
// it will have no effect on a synchronous
// observable
subscription.unsubscribe();
}
} catch (Throwable ex) {
observer.onError(ex);
// this will work if the sequence is asynchronous, it
// will have no effect on a synchronous observable
subscription.unsubscribe();
}

}

@Override
public void onError(Throwable ex) {
observer.onError(ex);
}

@Override
public void onCompleted() {
if (index < 0) {
observer.onError(new IndexOutOfBoundsException(index
+ " is out of bounds"));
} else if (counter.get() <= index) {
if (hasDefault) {
observer.onNext(defaultValue);
observer.onCompleted();
} else {
observer.onError(new IndexOutOfBoundsException(
index + " is out of bounds"));
}
}
}
}));
}
}

public static class UnitTest {

@Test
public void testElementAt() {
Observable<Integer> w = Observable.from(1, 2);
Observable<Integer> observable = Observable.create(elementAt(w, 1));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(1);
verify(aObserver, times(1)).onNext(2);
verify(aObserver, never()).onError(
org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testElementAtWithMinusIndex() {
Observable<Integer> w = Observable.from(1, 2);
Observable<Integer> observable = Observable
.create(elementAt(w, -1));

try {
Iterator<Integer> iter = OperationToIterator
.toIterator(observable);
assertTrue(iter.hasNext());
iter.next();
fail("expect an IndexOutOfBoundsException when index is out of bounds");
} catch (IndexOutOfBoundsException e) {
}
}

@Test
public void testElementAtWithIndexOutOfBounds()
throws InterruptedException, ExecutionException {
Observable<Integer> w = Observable.from(1, 2);
Observable<Integer> observable = Observable.create(elementAt(w, 2));
try {
Iterator<Integer> iter = OperationToIterator
.toIterator(observable);
assertTrue(iter.hasNext());
iter.next();
fail("expect an IndexOutOfBoundsException when index is out of bounds");
} catch (IndexOutOfBoundsException e) {
}
}

@Test
public void testElementAtOrDefault() throws InterruptedException,
ExecutionException {
Observable<Integer> w = Observable.from(1, 2);
Observable<Integer> observable = Observable
.create(elementAtOrDefault(w, 1, 0));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(1);
verify(aObserver, times(1)).onNext(2);
verify(aObserver, never()).onError(
org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testElementAtOrDefaultWithIndexOutOfBounds()
throws InterruptedException, ExecutionException {
Observable<Integer> w = Observable.from(1, 2);
Observable<Integer> observable = Observable
.create(elementAtOrDefault(w, 2, 0));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(1);
verify(aObserver, never()).onNext(2);
verify(aObserver, times(1)).onNext(0);
verify(aObserver, never()).onError(
org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testElementAtOrDefaultWithMinusIndex() {
Observable<Integer> w = Observable.from(1, 2);
Observable<Integer> observable = Observable
.create(elementAtOrDefault(w, -1, 0));

try {
Iterator<Integer> iter = OperationToIterator
.toIterator(observable);
assertTrue(iter.hasNext());
iter.next();
fail("expect an IndexOutOfBoundsException when index is out of bounds");
} catch (IndexOutOfBoundsException e) {
}
}
}
}

0 comments on commit 5df8e3c

Please sign in to comment.