Skip to content

Commit

Permalink
https://github.com/reactor/reactor-core/issues/1734
Browse files Browse the repository at this point in the history
  • Loading branch information
elrodro83 committed Sep 13, 2022
1 parent 11fb9bd commit 21dc440
Showing 1 changed file with 17 additions and 7 deletions.
Expand Up @@ -7,14 +7,15 @@

package org.mule.runtime.core.internal.routing.forkjoin;

import static java.time.Duration.ofMillis;
import static java.util.Optional.empty;
import static java.util.stream.Collectors.toList;

import static org.mule.runtime.core.api.event.CoreEvent.builder;
import static org.mule.runtime.core.internal.exception.ErrorHandlerContextManager.ERROR_HANDLER_CONTEXT;
import static org.mule.runtime.core.internal.routing.ForkJoinStrategy.RoutingPair.of;
import static org.mule.runtime.core.privileged.processor.MessageProcessors.processWithChildContextDontComplete;

import static java.lang.Long.MAX_VALUE;
import static java.util.Optional.empty;
import static java.util.stream.Collectors.toList;

import static reactor.core.Exceptions.propagate;
import static reactor.core.publisher.Flux.from;
import static reactor.core.publisher.Mono.defer;
Expand Down Expand Up @@ -43,6 +44,7 @@
import org.mule.runtime.core.privileged.routing.CompositeRoutingException;
import org.mule.runtime.core.privileged.routing.RoutingResult;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -88,13 +90,21 @@ public AbstractForkJoinStrategyFactory(boolean mergeVariables) {
public ForkJoinStrategy createForkJoinStrategy(ProcessingStrategy processingStrategy, int maxConcurrency, boolean delayErrors,
long timeout, Scheduler timeoutScheduler, ErrorType timeoutErrorType,
boolean isDetailedLogEnabled) {
Duration timeoutDuration;
if (timeout == Long.MAX_VALUE) {
timeoutDuration = Duration.ofNanos(MAX_VALUE);
} else {
timeoutDuration = Duration.ofMillis(timeout);
}

reactor.core.scheduler.Scheduler reactorTimeoutScheduler = Schedulers.fromExecutorService(timeoutScheduler);
return (original, routingPairs) -> {
final AtomicInteger count = new AtomicInteger();
final CoreEvent.Builder resultBuilder = builder(original);
return from(routingPairs)
.map(addSequence(count))
.flatMapSequential(processRoutePair(processingStrategy, maxConcurrency, delayErrors, timeout, reactorTimeoutScheduler,
.flatMapSequential(processRoutePair(processingStrategy, maxConcurrency, delayErrors, timeoutDuration,
reactorTimeoutScheduler,
timeoutErrorType),
maxConcurrency)
.reduce(new Pair<List<Pair<CoreEvent, EventProcessingException>>, Boolean>(new ArrayList<>(), false),
Expand Down Expand Up @@ -154,7 +164,7 @@ private Function<RoutingPair, RoutingPair> addSequence(AtomicInteger count) {
private Function<RoutingPair, Publisher<Pair<CoreEvent, EventProcessingException>>> processRoutePair(ProcessingStrategy processingStrategy,
int maxConcurrency,
boolean delayErrors,
long timeout,
Duration timeout,
reactor.core.scheduler.Scheduler timeoutScheduler,
ErrorType timeoutErrorType) {

Expand All @@ -164,7 +174,7 @@ private Function<RoutingPair, Publisher<Pair<CoreEvent, EventProcessingException
return from(processWithChildContextDontComplete(pair.getEvent(),
applyProcessingStrategy(processingStrategy, route, maxConcurrency),
empty()))
.timeout(ofMillis(timeout),
.timeout(timeout,
onTimeout(processingStrategy, delayErrors, timeoutErrorType,
pair),
timeoutScheduler)
Expand Down

0 comments on commit 21dc440

Please sign in to comment.