Skip to content

Commit

Permalink
Add BatchPromise support
Browse files Browse the repository at this point in the history
  • Loading branch information
rmannibucau committed Jul 10, 2022
1 parent e91fcd1 commit ea35ebd
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,49 @@ TIP: some components have a static factory to make it more expressive, don't hes
Finally, the `RunConfiguration` enables to intercept any step of the `BatchChain` defined by the previous DSL.
Combined with `ExecutionTracer`, it will let you store any execution and its steps in a database for audit or monitoring/administration purposes.

=== Async result

It can be neat to pass a step and its result to next step without it being finished.

It is often the case for reactive bacthes (one "thread" starts to poll data, next step processes it etc.. but you want to keep the polling and processing split in terms of "step" and tracing).

To do that, you can return a `BatchPromise` which is just a holder of a value (reactive in our example) and a `CompletionStage` which notifies the batch runtime and tracer when the step is done:

[source,java]
----
from()
.map("step1", new CommentifiableFunction<Void, BatchPromise<String>>() {
@Override
public BatchPromise<String> apply(final Void o) {
final var reactiveComponent = runStep1(); // an Observable with RxJava for example
return BatchPromise.of(reactiveComponent::onItem, reactiveComponent.toCompletionStage());
}
})
.map("step2", new CommentifiableConsumer<BatchPromise<Void>>() {
@Override
public BatchPromise<Void> apply(final BatchPromise<Observable> in) {
final var promise = new CompletableFuture<Void>();
in.subscribe(
this::doStep2ItemProcessing,
error -> {
// log etc...
promise.completeExceptionally(error);
},
() -> promise.complete(null));
return BatchPromise.of(null, promise);
}
})
.run(tracingConfig);
----

So overall the step1 starts to read some data and emiiting it when step2 starts and subscribes to it.

When step1 is done it notifies the batch runtime it is ok and the tracer (or runtime) will stop the thread 1 monitoring.

Step2 being asynchronous too (due to its reactive nature, it also emits a `BatchPromise` leading to the same kind of behavior).

TIP: thanks to this trick, you can run a concurrent job with a flat chain since you can pass in the `BatchRuntime` a complex value which would represent each branch of your concurrent batch.

== Reusable Iterators

Reusable iterators are either provided through `FluentIterator` or extensions (in this case you must add a dependency to get it).
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 - Yupiik SAS - https://www.yupiik.com
* Copyright (c) 2021-2022 - Yupiik SAS - https://www.yupiik.com
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
Expand All @@ -15,14 +15,26 @@
*/
package io.yupiik.batch.runtime.batch.builder;

import io.yupiik.batch.runtime.batch.BatchPromise;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.logging.Logger;

import static java.util.Collections.reverse;
import static java.util.Optional.ofNullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.SEVERE;

/**
* Very trivial way to define common operations simply.
Expand Down Expand Up @@ -86,8 +98,18 @@ default void run(final RunConfiguration configuration) { // todo: shouldn't we f
e -> configuration.elementExecutionWrapper.apply(e) : e -> e;
final Runnable execution = () -> {
Result<?> result = null; // starting node generates a result without a previous one normally
for (final var it : chain) {
result = wrapper.apply(BatchChain.class.cast(it)).execute(configuration, Result.class.cast(result));
final var promises = new CopyOnWriteArrayList<CompletableFuture<?>>();
try {
for (final var it : chain) {
result = wrapper.apply(BatchChain.class.cast(it)).execute(configuration, Result.class.cast(result));
if (result.value() instanceof BatchPromise<?> promise) {
final var end = promise.end().toCompletableFuture();
promises.add(end);
end.whenComplete((ok, ko) -> promises.remove(end));
}
}
} finally {
await(configuration, promises);
}
};
if (configuration != null && configuration.executionWrapper != null) {
Expand Down Expand Up @@ -221,4 +243,28 @@ public String toComment() {
return commentifiable.toComment();
}
}

private void await(final RunConfiguration configuration, final List<CompletableFuture<?>> promises) {
if (configuration != null && configuration.maxBatchPromiseAwait == 0) {
return;
}
final var duration = configuration == null ? TimeUnit.MINUTES.toMillis(1) : configuration.maxBatchPromiseAwait;
for (final var stage : promises) {
if (stage.isDone() || stage.isCompletedExceptionally()) {
continue;
}
try {
if (duration < 0) {
stage.get();
} else {
stage.get(duration, MILLISECONDS);
}
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
} catch (final ExecutionException |
TimeoutException e) { // we can't do much anymore there, should have awaited before
Logger.getLogger(getClass().getName()).log(SEVERE, e, e::getMessage);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 - Yupiik SAS - https://www.yupiik.com
* Copyright (c) 2021-2022 - Yupiik SAS - https://www.yupiik.com
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
Expand All @@ -20,6 +20,18 @@
public class RunConfiguration { // don't use a record, we don't want to break batches cause we added a toggle/config
Function<Runnable, Runnable> executionWrapper;
Function<BatchChain<?, ?, ?>, Executable<?, ?>> elementExecutionWrapper;
long maxBatchPromiseAwait = -1;

/**
* How long batch promises can be awaited at shutdown if not already done.
*
* @param maxBatchPromiseAwait await duration in ms. A negative value means infinite.
* @return the run configuration (this).
*/
public RunConfiguration setMaxBatchPromiseAwait(final long maxBatchPromiseAwait) {
this.maxBatchPromiseAwait = maxBatchPromiseAwait;
return this;
}

public RunConfiguration setExecutionWrapper(final Function<Runnable, Runnable> executionWrapper) {
this.executionWrapper = executionWrapper;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2021-2022 - Yupiik SAS - https://www.yupiik.com
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.yupiik.batch.runtime.batch.internal;

import io.yupiik.batch.runtime.batch.BatchPromise;

import java.util.concurrent.CompletionStage;

public record BatchPromiseImpl<A>(A value, CompletionStage<Void> end) implements BatchPromise<A> {
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 - Yupiik SAS - https://www.yupiik.com
* Copyright (c) 2021-2022 - Yupiik SAS - https://www.yupiik.com
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
Expand All @@ -15,11 +15,13 @@
*/
package io.yupiik.batch.runtime.tracing;

import io.yupiik.batch.runtime.batch.BatchPromise;
import io.yupiik.batch.runtime.batch.builder.BatchChain;
import io.yupiik.batch.runtime.batch.builder.Executable;
import io.yupiik.batch.runtime.batch.builder.RunConfiguration;

import java.time.Clock;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.List;
import java.util.UUID;
Expand Down Expand Up @@ -53,29 +55,64 @@ public Executable.Result<?> traceStep(final RunConfiguration configuration,
final var start = clock.instant();
var status = Status.SUCCESS;
String comment = null;
boolean async = false;
try {
final var executed = batchChain.execute(configuration, Executable.Result.class.cast(previous));
if (BatchChain.Commentifiable.class.isInstance(batchChain)) {
final var hasValue = executed != null && executed.value() != null;
final var chainIsCommentiafiable = BatchChain.Commentifiable.class.isInstance(batchChain);
if (hasValue && chainIsCommentiafiable && !(executed.value() instanceof BatchPromise<?>)) {
comment = BatchChain.Commentifiable.class.cast(batchChain).toComment();
}
if (executed != null && executed.value() != null && BatchChain.Commentifiable.class.isInstance(executed.value())) {
comment = (comment == null || comment.isBlank() ? "" : (comment + '\n')) + BatchChain.Commentifiable.class.cast(executed.value()).toComment();
if (hasValue) {
if (executed.value() instanceof BatchPromise<?> promise) {
async = true;
promise.end().whenComplete((ok, ko) -> {
String asyncComment = null;
if (chainIsCommentiafiable) {
asyncComment = BatchChain.Commentifiable.class.cast(batchChain).toComment();
}

if (executed.value() instanceof BatchChain.Commentifiable c) {
asyncComment = (asyncComment == null || asyncComment.isBlank() ? "" : (asyncComment + '\n')) + c.toComment();
}
if (ko != null) {
endStep(batchChain, start, Status.FAILURE, getErrorMessage(asyncComment, ko));
} else {
endStep(batchChain, start, Status.SUCCESS, asyncComment);
}
});
} else if (executed.value() instanceof BatchChain.Commentifiable c) {
comment = (comment == null || comment.isBlank() ? "" : (comment + '\n')) + c.toComment();
}
}
return executed;
} catch (final Error | RuntimeException err) {
status = Status.FAILURE;
comment = err.getMessage();
comment = getErrorMessage(comment, err);
async = false;
throw err;
} finally {
final var end = clock.instant();
final var execution = new StepExecution(
UUID.randomUUID().toString(), batchChain.name(), status, comment,
LocalDateTime.ofInstant(start, clock.getZone()), LocalDateTime.ofInstant(end, clock.getZone()),
steps.isEmpty() ? null : steps.get(steps.size() - 1).id());
steps.add(execution);
if (!async) {
endStep(batchChain, start, status, comment);
}
}
}

protected String getErrorMessage(final String comment, final Throwable err) {
return (comment != null ? comment + '\n' : "") + err.getMessage();
}

protected void endStep(final BatchChain<?, ?, ?> batchChain,
final Instant start, final Status status,
final String comment) {
final var end = clock.instant();
final var execution = new StepExecution(
UUID.randomUUID().toString(), batchChain.name(), status, comment,
LocalDateTime.ofInstant(start, clock.getZone()), LocalDateTime.ofInstant(end, clock.getZone()),
steps.isEmpty() ? null : steps.get(steps.size() - 1).id());
steps.add(execution);
}

public Runnable traceExecution(final Runnable runnable) {
return () -> {
final var start = clock.instant();
Expand Down

0 comments on commit ea35ebd

Please sign in to comment.