Skip to content

Commit

Permalink
Fixed combineLatest(2), added missing javadoc elements to many operators
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Mar 9, 2016
1 parent dc4b6f2 commit 60d0942
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
29 changes: 27 additions & 2 deletions src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,17 @@ public static <T, V> Flux<V> combineLatest(Function<Object[], V> combinator, int
*
* @return a {@link Flux} based on the produced value
*/
public static <T1, T2, V> Flux<V> combineLatest(Publisher<? extends T1> source1,
public static <T1, T2, V> Flux<V> combineLatest(Publisher<? extends T1> source1,
Publisher<? extends T2> source2,
BiFunction<? super T1, ? super T2, ? extends V> combinator) {
return new FluxWithLatestFrom<>(source1, source2, combinator);
return combineLatest(new Function<Object[], V>() {
@Override
@SuppressWarnings("unchecked")
public V apply(Object[] tuple) {
return combinator.apply((T1)tuple[0], (T2)tuple[1]);
}
}, source1, source2);
// return new FluxWithLatestFrom<>(source1, source2, combinator);
}

/**
Expand Down Expand Up @@ -278,6 +285,7 @@ public static <T1, T2, T3, T4, V> Flux<V> combineLatest(Publisher<? extends T1>
* @param source2 The second upstream {@link Publisher} to subscribe to.
* @param source3 The third upstream {@link Publisher} to subscribe to.
* @param source4 The fourth upstream {@link Publisher} to subscribe to.
* @param source5 The fifth upstream {@link Publisher} to subscribe to.
* @param combinator The aggregate function that will receive a unique value from each upstream and return the value
* to signal downstream
* @param <T1> type of the value from source1
Expand Down Expand Up @@ -343,6 +351,7 @@ public static <T1, T2, T3, T4, T5, T6, V> Flux<V> combineLatest(Publisher<? exte
* @param sources The list of upstream {@link Publisher} to subscribe to.
* @param combinator The aggregate function that will receive a unique value from each upstream and return the value
* to signal downstream
* @param <T> The common base type of the source sequences
* @param <V> The produced output after transformation by the given combinator
*
* @return a {@link Flux} based on the produced value , 2.5
Expand All @@ -364,6 +373,7 @@ public static <T, V> Flux<V> combineLatest(Iterable<? extends Publisher<? extend
* @param prefetch demand produced to each combined source {@link Publisher}
* @param combinator The aggregate function that will receive a unique value from each upstream and return the value
* to signal downstream
* @param <T> The common base type of the source sequences
* @param <V> The produced output after transformation by the given combinator
*
* @return a {@link Flux} based on the produced value , 2.5
Expand Down Expand Up @@ -562,7 +572,9 @@ public static <T> Flux<T> error(Throwable error) {
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/projectreactor.io/master/src/main/static/assets/img/marble/errorrequest.png" alt="">
*
* @param throwable the error to signal to each {@link Subscriber}
* @param whenRequested if true, will onError on the first request instead of subscribe().
* @param <O> the output type
*
* @return a new failed {@link Flux}
*/
Expand Down Expand Up @@ -982,6 +994,8 @@ public static <T> Flux<T> never() {
* <img class="marble" src="https://raw.githubusercontent.com/reactor/projectreactor.io/master/src/main/static/assets/img/marble/onerrorresumewith.png" alt="">
* <p>
* @param <T> the {@link Subscriber} type target
* @param source the source sequence
* @param fallback the function called with the Throwable signal the source sequence produced that should return a fallback sequence
*
* @return a resilient {@link Flux}
*/
Expand Down Expand Up @@ -1236,6 +1250,7 @@ public static <T1, T2, T3, T4> Flux<Tuple4<T1, T2, T3, T4>> zip(Publisher<? exte
* @param source2 The second upstream {@link Publisher} to subscribe to.
* @param source3 The third upstream {@link Publisher} to subscribe to.
* @param source4 The fourth upstream {@link Publisher} to subscribe to.
* @param source5 The fifth upstream {@link Publisher} to subscribe to.
* @param <T1> type of the value from source1
* @param <T2> type of the value from source2
* @param <T3> type of the value from source3
Expand Down Expand Up @@ -1363,6 +1378,7 @@ public static <O> Flux<O> zip(Iterable<? extends Publisher<?>> sources,
* @param combinator The aggregate function that will receive a unique value from each upstream and return the
* value to signal downstream
* @param sources the {@link Publisher} array to iterate on {@link Publisher#subscribe(Subscriber)}
* @param <I> the type of the input sources
* @param <O> the combined produced type
*
* @return a zipped {@link Flux}
Expand All @@ -1384,6 +1400,7 @@ public static <I, O> Flux<O> zip(
* value to signal downstream
* @param prefetch individual source request size
* @param sources the {@link Publisher} array to iterate on {@link Publisher#subscribe(Subscriber)}
* @param <I> the type of the input sources
* @param <O> the combined produced type
*
* @return a zipped {@link Flux}
Expand Down Expand Up @@ -1413,6 +1430,7 @@ public static <I, O> Flux<O> zip(final Function<? super Object[], ? extends O> c
* @param sources The publisher of upstream {@link Publisher} to subscribe to.
* @param combinator The aggregate function that will receive a unique value from each upstream and return the value
* to signal downstream
* @param <TUPLE> the raw tuple type
* @param <V> The produced output after transformation by the given combinator
*
* @return a {@link Flux} based on the produced value
Expand Down Expand Up @@ -1446,6 +1464,7 @@ protected Flux() {
* @param transformer the {@link Function} to immediately map this {@link Flux} into a target {@link Publisher}
* instance.
* @param <P> the returned {@link Publisher} sequence type
* @param <V> the item type in the returned {@link Publisher}
*
* @return a new {@link Flux}
*/
Expand All @@ -1461,6 +1480,7 @@ public final <V, P extends Publisher<V>> P as(Function<? super Flux<T>, P> trans
* {@code flux.as(Observable.class).subscribe() }
*
* @param <E> the returned component type
* @param clazz the the class to convert to
*
* @return an eventually converted
* @throws a {@link UnsupportedOperationException} if conversion fails
Expand Down Expand Up @@ -1652,6 +1672,8 @@ public final Flux<List<T>> buffer(Publisher<?> other) {
* @param bucketOpening a {@link Publisher} to subscribe to for creating new receiving bucket signals.
* @param closeSelector a {@link Publisher} factory provided the opening signal and returning a {@link Publisher} to
* subscribe to for emitting relative bucket.
* @param <U> the element type of the bucket-opening sequence
* @param <V> the element type of the bucket-closing sequence
*
* @return a microbatched {@link Flux} of {@link List} delimited by an opening {@link Publisher} and a relative
* closing {@link Publisher}
Expand Down Expand Up @@ -1860,6 +1882,7 @@ public final Flux<T> cache(int history) {
* <img class="marble" src="https://raw.githubusercontent.com/reactor/projectreactor.io/master/src/main/static/assets/img/marble/cast.png" alt="">
*
* @param <E> the {@link Flux} output type
* @param stream the target class to cast to
*
* @return a casted {@link Flux}
*/
Expand All @@ -1877,6 +1900,8 @@ public final <E> Flux<E> cast(Class<E> stream) {
*
* @param <E> the {@link Flux} collected container type
* @param containerSupplier the supplier of the container instance for each Subscriber
* @param collector the consumer of both the container instance and the current value
*
* @return a Mono sequence of the collected value on complete
*
Expand Down
1 change: 1 addition & 0 deletions src/main/java/reactor/core/tuple/Tuple.java
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8> Function<Object[], Tuple8<T1, T2,
* @param <T5> The type of the fifth value.
* @param <T6> The type of the sixth value.
* @param <T7> The type of the seventh value.
* @param <T8> The type of the eighth value.
* @param <R> The type of the return value.
*
* @return The unchecked conversion function to {@link Tuple8}.
Expand Down

0 comments on commit 60d0942

Please sign in to comment.