Skip to content

Commit

Permalink
Merge pull request ReactiveX#381 from jmhofer/mapWithIndex
Browse files Browse the repository at this point in the history
Implemented `mapWithIndex`
  • Loading branch information
benjchristensen committed Sep 14, 2013
2 parents 141bb6b + 8f10729 commit 00d7c3b
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 40 deletions.
18 changes: 18 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3090,11 +3090,29 @@ public Observable<T> where(Func1<? super T, Boolean> predicate) {
* a function to apply to each item emitted by the Observable
* @return an Observable that emits the items from the source Observable, transformed by the
* given function
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244306%28v=vs.103%29.aspx">MSDN: Observable.Select</a>
*/
public <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return create(OperationMap.map(this, func));
}

/**
* Returns an Observable that applies the given function to each item emitted by an
* Observable and emits the result.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/map.png">
*
* @param func
* a function to apply to each item emitted by the Observable. The function takes the
* index of the emitted item as additional parameter.
* @return an Observable that emits the items from the source Observable, transformed by the
* given function
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244311%28v=vs.103%29.aspx">MSDN: Observable.Select</a>
*/
public <R> Observable<R> mapWithIndex(Func2<? super T, Integer, ? extends R> func) {
return create(OperationMap.mapWithIndex(this, func));
}

/**
* Creates a new Observable by applying a function that you supply to each item emitted by
* the source Observable, where that function returns an Observable, and then merging those
Expand Down
166 changes: 126 additions & 40 deletions rxjava-core/src/main/java/rx/operators/OperationMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

Expand All @@ -33,6 +34,7 @@
import rx.Observer;
import rx.Subscription;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

/**
* Applies a function of your choosing to every item emitted by an Observable, and returns this
Expand All @@ -56,8 +58,42 @@ public final class OperationMap {
* the type of the output sequence.
* @return a sequence that is the result of applying the transformation function to each item in the input sequence.
*/
public static <T, R> OnSubscribeFunc<R> map(Observable<? extends T> sequence, Func1<? super T, ? extends R> func) {
return new MapObservable<T, R>(sequence, func);
public static <T, R> OnSubscribeFunc<R> map(final Observable<? extends T> sequence, final Func1<? super T, ? extends R> func) {
return new OnSubscribeFunc<R>() {
@Override
public Subscription onSubscribe(Observer<? super R> observer) {
return new MapObservable<T, R>(sequence, new Func2<T, Integer, R>() {
@Override
public R call(T value, @SuppressWarnings("unused") Integer unused) {
return func.call(value);
}
}).onSubscribe(observer);
}
};
}

/**
* Accepts a sequence and a transformation function. Returns a sequence that is the result of
* applying the transformation function to each item in the sequence.
*
* @param sequence
* the input sequence.
* @param func
* a function to apply to each item in the sequence. The function gets the index of the emitted item
* as additional parameter.
* @param <T>
* the type of the input sequence.
* @param <R>
* the type of the output sequence.
* @return a sequence that is the result of applying the transformation function to each item in the input sequence.
*/
public static <T, R> OnSubscribeFunc<R> mapWithIndex(final Observable<? extends T> sequence, final Func2<? super T, Integer, ? extends R> func) {
return new OnSubscribeFunc<R>() {
@Override
public Subscription onSubscribe(Observer<? super R> observer) {
return new MapObservable<T, R>(sequence, func).onSubscribe(observer);
}
};
}

/**
Expand Down Expand Up @@ -89,56 +125,50 @@ public static <T, R> OnSubscribeFunc<R> mapMany(Observable<? extends T> sequence
* the type of the output sequence.
*/
private static class MapObservable<T, R> implements OnSubscribeFunc<R> {
public MapObservable(Observable<? extends T> sequence, Func1<? super T, ? extends R> func) {
public MapObservable(Observable<? extends T> sequence, Func2<? super T, Integer, ? extends R> func) {
this.sequence = sequence;
this.func = func;
}

private Observable<? extends T> sequence;

private Func1<? super T, ? extends R> func;

public Subscription onSubscribe(Observer<? super R> observer) {
return sequence.subscribe(new MapObserver<T, R>(observer, func));
}
}

/**
* An observer that applies a transformation function to each item and forwards the result to an inner observer.
*
* @param <T>
* the type of the observer items.
* @param <R>
* the type of the inner observer items.
*/
private static class MapObserver<T, R> implements Observer<T> {
public MapObserver(Observer<? super R> observer, Func1<? super T, ? extends R> func) {
this.observer = observer;
this.func = func;
}

Observer<? super R> observer;

Func1<? super T, ? extends R> func;
private final Observable<? extends T> sequence;
private final Func2<? super T, Integer, ? extends R> func;
private int index;

public void onNext(T value) {
// let the exception be thrown if func fails as a SafeObserver wrapping this will handle it
observer.onNext(func.call(value));
}
@Override
public Subscription onSubscribe(final Observer<? super R> observer) {
return sequence.subscribe(new Observer<T>() {
@Override
public void onNext(T value) {
observer.onNext(func.call(value, index));
index++;
}

public void onError(Throwable ex) {
observer.onError(ex);
}
@Override
public void onError(Throwable ex) {
observer.onError(ex);
}

public void onCompleted() {
observer.onCompleted();
@Override
public void onCompleted() {
observer.onCompleted();
}
});
}
}

public static class UnitTest {
@Mock
Observer<String> stringObserver;

@Mock
Observer<String> stringObserver2;

final static Func2<String, Integer, String> APPEND_INDEX = new Func2<String, Integer, String>() {
@Override
public String call(String value, Integer index) {
return value + index;
}
};

@Before
public void before() {
MockitoAnnotations.initMocks(this);
Expand All @@ -164,9 +194,42 @@ public String call(Map<String, String> map) {
verify(stringObserver, times(1)).onNext("OneFirst");
verify(stringObserver, times(1)).onNext("TwoFirst");
verify(stringObserver, times(1)).onCompleted();
}

@Test
public void testMapWithIndex() {
Observable<String> w = Observable.from("a", "b", "c");
Observable<String> m = Observable.create(mapWithIndex(w, APPEND_INDEX));
m.subscribe(stringObserver);
InOrder inOrder = inOrder(stringObserver);
inOrder.verify(stringObserver, times(1)).onNext("a0");
inOrder.verify(stringObserver, times(1)).onNext("b1");
inOrder.verify(stringObserver, times(1)).onNext("c2");
inOrder.verify(stringObserver, times(1)).onCompleted();
verify(stringObserver, never()).onError(any(Throwable.class));
}

@Test
public void testMapWithIndexAndMultipleSubscribers() {
Observable<String> w = Observable.from("a", "b", "c");
Observable<String> m = Observable.create(mapWithIndex(w, APPEND_INDEX));
m.subscribe(stringObserver);
m.subscribe(stringObserver2);
InOrder inOrder = inOrder(stringObserver);
inOrder.verify(stringObserver, times(1)).onNext("a0");
inOrder.verify(stringObserver, times(1)).onNext("b1");
inOrder.verify(stringObserver, times(1)).onNext("c2");
inOrder.verify(stringObserver, times(1)).onCompleted();
verify(stringObserver, never()).onError(any(Throwable.class));

InOrder inOrder2 = inOrder(stringObserver2);
inOrder2.verify(stringObserver2, times(1)).onNext("a0");
inOrder2.verify(stringObserver2, times(1)).onNext("b1");
inOrder2.verify(stringObserver2, times(1)).onNext("c2");
inOrder2.verify(stringObserver2, times(1)).onCompleted();
verify(stringObserver2, never()).onError(any(Throwable.class));
}

@Test
public void testMapMany() {
/* simulate a top-level async call which returns IDs */
Expand Down Expand Up @@ -246,12 +309,34 @@ public String call(Map<String, String> map) {

}

@Test
public void testMapWithError() {
Observable<String> w = Observable.from("one", "fail", "two", "three", "fail");
Observable<String> m = Observable.create(map(w, new Func1<String, String>() {
@Override
public String call(String s) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
return s;
}
}));

m.subscribe(stringObserver);
verify(stringObserver, times(1)).onNext("one");
verify(stringObserver, never()).onNext("two");
verify(stringObserver, never()).onNext("three");
verify(stringObserver, never()).onCompleted();
verify(stringObserver, times(1)).onError(any(Throwable.class));
}

@Test
public void testMapWithSynchronousObservableContainingError() {
Observable<String> w = Observable.from("one", "fail", "two", "three", "fail");
final AtomicInteger c1 = new AtomicInteger();
final AtomicInteger c2 = new AtomicInteger();
Observable<String> m = Observable.create(map(w, new Func1<String, String>() {
@Override
public String call(String s) {
if ("fail".equals(s))
throw new RuntimeException("Forced Failure");
Expand All @@ -260,6 +345,7 @@ public String call(String s) {
return s;
}
})).map(new Func1<String, String>() {
@Override
public String call(String s) {
System.out.println("SecondMapper:" + s);
c2.incrementAndGet();
Expand All @@ -280,7 +366,7 @@ public String call(String s) {
assertEquals(1, c2.get());
}

private Map<String, String> getMap(String prefix) {
private static Map<String, String> getMap(String prefix) {
Map<String, String> m = new HashMap<String, String>();
m.put("firstName", prefix + "First");
m.put("lastName", prefix + "Last");
Expand Down

0 comments on commit 00d7c3b

Please sign in to comment.