Skip to content

Commit

Permalink
feat: run flow with Mono or Flux
Browse files Browse the repository at this point in the history
  • Loading branch information
juliengalet committed Nov 18, 2021
1 parent 0cd0fde commit 27502dc
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 5 deletions.
57 changes: 57 additions & 0 deletions src/main/java/fr/jtools/reactorflow/flow/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import fr.jtools.reactorflow.report.Status;
import fr.jtools.reactorflow.utils.ConsoleStyle;
import fr.jtools.reactorflow.utils.PrettyPrint;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.concurrent.Queues;

import java.math.BigDecimal;
import java.math.RoundingMode;
Expand Down Expand Up @@ -257,6 +259,61 @@ public final Mono<GlobalReport<T>> run(T initialContext) {
.map(report -> GlobalReport.create(report.getContext(), this));
}

/**
* Run the {@link Flow} and all its children with an initial {@link T} context.
*
* @param initialContextMono The initial context
* @return A {@link Mono} containing the resulting {@link GlobalReport}
*/
public final Mono<GlobalReport<T>> run(Mono<T> initialContextMono) {
return initialContextMono
.flatMap(initialContext -> this.execute(initialContext, Metadata.empty()))
.map(report -> GlobalReport.create(report.getContext(), this));
}

/**
* Run a {@link Flow} and all its children with an initial {@link T} context, from a {@link Flux}.
*
* @param initialContextFlux The initial contexts
* @return A {@link Flux} containing the resulting {@link GlobalReport}
*/
public final Flux<GlobalReport<T>> run(Flux<T> initialContextFlux) {
return this.run(initialContextFlux, Queues.SMALL_BUFFER_SIZE);
}

/**
* Run a {@link Flow} and all its children with an initial {@link T} context, from a {@link Flux}, sequentially.
*
* @param initialContextFlux The initial contexts
* @return A {@link Flux} containing the resulting {@link GlobalReport}
*/
public final Flux<GlobalReport<T>> runSequential(Flux<T> initialContextFlux) {
return this.run(initialContextFlux, 1);
}

/**
* Run a {@link Flow} and all its children with an initial {@link T} context, from a {@link Flux},
* with a specified concurrency.
* Warning: if the {@link Flux} provided emits an error, it will break the reactor chain.
*
* @param initialContextFlux The initial contexts
* @param concurrency The concurrency
* @return A {@link Flux} containing the resulting {@link GlobalReport}
*/
public final Flux<GlobalReport<T>> run(Flux<T> initialContextFlux, int concurrency) {
return initialContextFlux
.flatMap(
initialContext -> {
Flow<T> clonedFlow = this.cloneFlow(String.format("%s (%s)", this.getName(), initialContext.hashCode()));

return clonedFlow
.execute(initialContext, Metadata.empty())
.map(report -> GlobalReport.create(report.getContext(), clonedFlow));
},
concurrency
);
}

/**
* Get a {@link String}, representing the actual {@link Flow}, without its children.
*
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/fr/jtools/reactorflow/flow/Step.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
*
* @param <T> Context type
*/
interface Step<T extends FlowContext> extends StepWithMetadata<T, Object> {
public interface Step<T extends FlowContext> extends StepWithMetadata<T, Object> {

}
4 changes: 1 addition & 3 deletions src/test/java/fr/jtools/reactorflow/CombinedFlowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ final void complexCase() {

StepVerifier
.create(complexCase.run(FlowContext.create()))
.assertNext(assertAndLog(globalReport -> {
assertThat(globalReport.getStatus()).isEqualTo(Status.WARNING);
}))
.assertNext(assertAndLog(globalReport -> assertThat(globalReport.getStatus()).isEqualTo(Status.WARNING)))
.verifyComplete();
}
}
101 changes: 100 additions & 1 deletion src/test/java/fr/jtools/reactorflow/flow/StepFlowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import fr.jtools.reactorflow.report.Status;
import fr.jtools.reactorflow.testutils.CustomContext;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

Expand Down Expand Up @@ -118,7 +119,7 @@ final void givenClonedSuccessLambdaExecution_stepFlow_shouldSuccess() {
}

@Test
final void givenSuccessClassExecution_stepFlow_shouldSuccess() {
final void givenSuccessClassicExecution_stepFlow_shouldSuccess() {
StepFlow<FlowContext, Object> stepFlow = StepFlowBuilder
.defaultBuilder()
.named("Test")
Expand All @@ -138,6 +139,104 @@ final void givenSuccessClassExecution_stepFlow_shouldSuccess() {
.verifyComplete();
}


@Test
final void givenSuccessWithMonoContext_stepFlow_shouldSuccess() {
StepFlow<FlowContext, Object> stepFlow = StepFlowBuilder
.defaultBuilder()
.named("Test")
.execution(new TestWork())
.build();

StepVerifier
.create(stepFlow.run(Mono.just(FlowContext.create())))
.assertNext(globalReport -> {
assertThat(globalReport.getStatus()).isEqualTo(Status.SUCCESS);
assertThat(globalReport.getContext().get("Test")).isEqualTo("Test");
assertThat(globalReport.getName()).isEqualTo("Test");
assertThat(globalReport.getAllRecoveredErrors()).isEmpty();
assertThat(globalReport.getAllErrors()).isEmpty();
assertThat(globalReport.getAllWarnings()).isEmpty();
})
.verifyComplete();
}

@Test
final void givenSuccessWithSequentialFluxContext_stepFlow_shouldSuccess() {
StepFlow<FlowContext, Object> stepFlow = StepFlowBuilder
.defaultBuilder()
.named("Test")
.execution(new TestWork())
.build();

StepVerifier
.create(stepFlow
.runSequential(Flux.just(
FlowContext.createFrom(Map.of("Init1", "Init1")),
FlowContext.createFrom(Map.of("Init2", "Init2"))
))
)
.assertNext(globalReport -> {
assertThat(globalReport.getStatus()).isEqualTo(Status.SUCCESS);
assertThat(globalReport.getContext().get("Test")).isEqualTo("Test");
assertThat(globalReport.getContext().get("Init2")).isNull();
assertThat(globalReport.getContext().get("Init1")).isEqualTo("Init1");
assertThat(globalReport.getName()).startsWith("Test");
assertThat(globalReport.getAllRecoveredErrors()).isEmpty();
assertThat(globalReport.getAllErrors()).isEmpty();
assertThat(globalReport.getAllWarnings()).isEmpty();
})
.assertNext(globalReport -> {
assertThat(globalReport.getStatus()).isEqualTo(Status.SUCCESS);
assertThat(globalReport.getContext().get("Test")).isEqualTo("Test");
assertThat(globalReport.getContext().get("Init1")).isNull();
assertThat(globalReport.getContext().get("Init2")).isEqualTo("Init2");
assertThat(globalReport.getName()).startsWith("Test");
assertThat(globalReport.getAllRecoveredErrors()).isEmpty();
assertThat(globalReport.getAllErrors()).isEmpty();
assertThat(globalReport.getAllWarnings()).isEmpty();
})
.verifyComplete();
}

@Test
final void givenSuccessWithFluxContext_stepFlow_shouldSuccess() {
StepFlow<FlowContext, Object> stepFlow = StepFlowBuilder
.defaultBuilder()
.named("Test")
.execution(new TestWork())
.build();

StepVerifier
.create(stepFlow
.run(Flux.just(
FlowContext.createFrom(Map.of("Init1", "Init1")),
FlowContext.createFrom(Map.of("Init2", "Init2"))
))
)
.assertNext(globalReport -> {
assertThat(globalReport.getStatus()).isEqualTo(Status.SUCCESS);
assertThat(globalReport.getContext().get("Test")).isEqualTo("Test");
assertThat(globalReport.getContext().get("Init2")).isNull();
assertThat(globalReport.getContext().get("Init1")).isEqualTo("Init1");
assertThat(globalReport.getName()).startsWith("Test");
assertThat(globalReport.getAllRecoveredErrors()).isEmpty();
assertThat(globalReport.getAllErrors()).isEmpty();
assertThat(globalReport.getAllWarnings()).isEmpty();
})
.assertNext(globalReport -> {
assertThat(globalReport.getStatus()).isEqualTo(Status.SUCCESS);
assertThat(globalReport.getContext().get("Test")).isEqualTo("Test");
assertThat(globalReport.getContext().get("Init1")).isNull();
assertThat(globalReport.getContext().get("Init2")).isEqualTo("Init2");
assertThat(globalReport.getName()).startsWith("Test");
assertThat(globalReport.getAllRecoveredErrors()).isEmpty();
assertThat(globalReport.getAllErrors()).isEmpty();
assertThat(globalReport.getAllWarnings()).isEmpty();
})
.verifyComplete();
}

@Test
final void givenSuccessLambdaExecutionAndCustomMeta_stepFlow_shouldSuccess() {
StepFlow<FlowContext, String> stepFlow = StepFlowBuilder
Expand Down

0 comments on commit 27502dc

Please sign in to comment.