Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions flows/src/main/java/com/softwaremill/jox/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,28 @@ public List<T> runToList() throws Exception {
return result;
}

/**
* Accumulates all elements emitted by this flow into a set. Blocks until the flow completes.
*/
public Set<T> runToSet() throws Exception {
Set<T> result = new HashSet<>();
runForeach(result::add);
return result;
}

/**
* Accumulates all elements emitted by this flow into a map, using the provided key and value
* extractors. Blocks until the flow completes.
*
* <p>If duplicate keys are encountered, the last value wins (same as {@link Map#put}).
*/
public <K, V> Map<K, V> runToMap(Function<T, K> keyExtractor, Function<T, V> valueExtractor)
throws Exception {
Map<K, V> result = new HashMap<>();
runForeach(t -> result.put(keyExtractor.apply(t), valueExtractor.apply(t)));
return result;
}

/**
* The flow is run in the background, and each emitted element is sent to a newly created
* channel, which is then returned as the result of this method.
Expand Down Expand Up @@ -442,6 +464,63 @@ public <S, U> Flow<U> mapStatefulConcat(
return mapStatefulConcat(initializeState, f, _ -> Optional.empty());
}

/**
* Maps each element using a resource that is created when the flow starts and closed when the
* flow completes (either successfully or with an error).
*
* <p>The {@code close} function may return an optional final element to emit after the flow
* completes successfully. If the flow fails, any value returned by {@code close} is discarded.
* If {@code close} throws and the flow also failed, the close exception is added as suppressed.
*
* @param create creates the resource (called once when the flow starts)
* @param close closes the resource, optionally returning a final element
* @param f maps each element using the resource
*/
public <R, U> Flow<U> mapWithResource(
Callable<R> create,
ThrowingFunction<R, Optional<U>> close,
ThrowingBiFunction<R, T, U> f) {
return usingEmit(
emit -> {
R resource = create.call();
Throwable[] error = {null};
try {
last.run(t -> emit.apply(f.apply(resource, t)));
} catch (Throwable e) {
error[0] = e;
throw e;
} finally {
try {
Optional<U> finalElement = close.apply(resource);
if (error[0] == null && finalElement.isPresent()) {
emit.apply(finalElement.get());
}
} catch (Throwable e) {
if (error[0] != null) error[0].addSuppressed(e);
else throw e;
}
}
});
}

/**
* A variant of {@link #mapWithResource} for {@link AutoCloseable} resources. The resource is
* closed using {@link AutoCloseable#close()} and no final element is emitted.
*
* @param create creates the {@link AutoCloseable} resource
* @param f maps each element using the resource
*/
public <R extends AutoCloseable, U> Flow<U> mapWithCloseableResource(
Callable<R> create, ThrowingBiFunction<R, T, U> f) {
return mapWithResource(
create,
r -> {
r.close();
return Optional.empty();
},
f);
}

/**
* Emits only those elements emitted by this flow, for which `filteringPredicate` returns
* `true`.
Expand Down Expand Up @@ -1206,6 +1285,48 @@ public <U extends T> Flow<U> recover(ThrowingFunction<Throwable, Optional<U>> pf
}));
}

/**
* Recovers from upstream errors by applying {@code f} to produce a replacement value. A variant
* of {@link #recover} that always handles the exception.
*/
public <U extends T> Flow<U> onErrorRecover(ThrowingFunction<Throwable, U> f) {
return recover(e -> Optional.of(f.apply(e)));
}

/**
* Completes the flow when any {@link Exception} is thrown by the upstream, discarding the
* exception. Downstream failures are not caught.
*/
public Flow<T> onErrorComplete() {
return onErrorComplete(e -> e instanceof Exception);
}

/**
* Completes the flow when an upstream error matches the provided predicate, discarding the
* exception. Downstream failures are not caught.
*/
public Flow<T> onErrorComplete(Predicate<Throwable> shouldComplete) {
return usingEmit(
emit -> {
Throwable[] downstreamFailure = {null};
FlowEmit<T> guardedEmit =
t -> {
try {
emit.apply(t);
} catch (Throwable e) {
downstreamFailure[0] = e;
throw e;
}
};
try {
last.run(guardedEmit);
} catch (Throwable e) {
if (downstreamFailure[0] == e || !shouldComplete.test(e)) throw e;
// else: upstream exception matching predicate, complete silently
}
});
}

/**
* Intersperses elements emitted by this flow with `inject` elements. The `inject` element is
* emitted between each pair of elements.
Expand Down
Loading