Skip to content

Commit

Permalink
feat: Support for adding the task based on predicate as a condition (#37
Browse files Browse the repository at this point in the history
)
  • Loading branch information
jatin14493 committed Nov 30, 2023
1 parent 70058d5 commit 9b80227
Show file tree
Hide file tree
Showing 4 changed files with 488 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>

</dependencies>

<build>
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/com/intuit/async/execution/Chain.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
import com.intuit.async.execution.request.State;

import io.reactivex.Observable;
import org.apache.commons.lang3.tuple.Pair;

import java.util.List;
import java.util.function.Predicate;


/** @author Nishant-Sehgal */
public interface Chain {
Expand All @@ -23,6 +28,12 @@ public interface Chain {
* @return current {@link Chain} reference
*/
Chain next(Task... tasks);
/**
* @param taskPredicate list of task with respective predicates
* @param <T>
* @return current {@link Chain} reference
*/
<T> Chain next(final Pair<Task, Pair<Predicate<T>, T>>... taskPredicate);

/**
* Depends on the caller to handle async or sync and can add call backs as well.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -21,6 +24,8 @@
import com.intuit.async.execution.request.State;
import com.intuit.async.execution.util.RxExecutionChainAction;


import org.apache.commons.lang3.tuple.Pair;
import io.reactivex.Observable;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -47,6 +52,11 @@ public RxExecutionChain(State inputReq, Task... tasks) {
populateTasks(tasks);
}

public <T> RxExecutionChain(State inputReq, final Pair<Task, Pair<Predicate<T>, T>>... taskPredicatePair) {
this.chainState = inputReq;
populateTasks(taskPredicatePair);
}

public RxExecutionChain(State inputReq) {
this.chainState = inputReq;
}
Expand All @@ -68,6 +78,11 @@ public RxExecutionChain next(Task... tasks) {
return this;
}

public <T> RxExecutionChain next(final Pair<Task, Pair<Predicate<T>, T>>... taskPredicatePair) {
populateTasks(taskPredicatePair);
return this;
}

/**
* executes the tasks submitted to the execution chain.
*
Expand Down Expand Up @@ -152,6 +167,29 @@ private void populateTasks(Task... tasks) {
rollbackObv.put(key, mergedRollbackObv);
}

/**
* Based on supplied predicate, evaluates the condition and add to the task list
*
* @param taskPredicatePairs: List of A Pair of Tasks and Supplied Predicates
* @param <T> : Type Parameter of Passed Predicate
*/
private <T> void populateTasks(final Pair<Task, Pair<Predicate<T>, T>>... taskPredicatePairs) {
// if predicate resolves to true then add to chain
if (isNull(taskPredicatePairs) && taskPredicatePairs.length == 0) {
return;
}

// Tasks which are evaluating to True gets filtered and supplied to populateTasks Method
final Task[] tasks =
Arrays.asList(taskPredicatePairs).stream()
.map(pair -> pair.getValue().getKey().test(pair.getValue().getValue()) ? pair.getKey() : null)
.filter(Objects::nonNull)
.toArray(Task[]::new);

// Calling populate tasks
populateTasks(tasks);
}

/**
* maps the given input task to Observable by subscribing it on new thread.
*
Expand Down
Loading

0 comments on commit 9b80227

Please sign in to comment.