From cee52bb7b9a1e72316f9d394b749e577a9c99224 Mon Sep 17 00:00:00 2001 From: ssenko Date: Tue, 9 Jul 2024 13:32:16 +0300 Subject: [PATCH 01/13] flow sync engine --- gradle/version.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/version.gradle b/gradle/version.gradle index c51ec84f..e709b0d9 100644 --- a/gradle/version.gradle +++ b/gradle/version.gradle @@ -1,6 +1,6 @@ import java.util.regex.Pattern -version = "2.3.23.23" +version = "2.3.23.24" logger.lifecycle("Project version: $version") String detectSemVersion() { From 8a62faf3cca7579b5d3b77604bc24bae357c772e Mon Sep 17 00:00:00 2001 From: ssenko Date: Tue, 9 Jul 2024 13:48:41 +0300 Subject: [PATCH 02/13] Update version to snapshot --- gradle/version.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/version.gradle b/gradle/version.gradle index e709b0d9..7c81dd6f 100644 --- a/gradle/version.gradle +++ b/gradle/version.gradle @@ -1,6 +1,6 @@ import java.util.regex.Pattern -version = "2.3.23.24" +version = "2.3.23.24-SNAPSHOT" logger.lifecycle("Project version: $version") String detectSemVersion() { From bd5d0533838e2d7ee509fb6ed1129b459b644345 Mon Sep 17 00:00:00 2001 From: ssenko Date: Tue, 9 Jul 2024 13:58:44 +0300 Subject: [PATCH 03/13] Fix lombock bug --- .../java/com/icthh/xm/commons/flow/service/FlowService.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/FlowService.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/FlowService.java index 2061c023..54646126 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/FlowService.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/FlowService.java @@ -33,8 +33,11 @@ public class FlowService { private final CodeSnippetService codeSnippetService; private final TriggerProcessor triggerProcessor; - @Setter(onMethod = @__(@Autowired)) private FlowService self; + @Autowired + public void setFlowService(FlowService flowService) { + this.self = flowService; + } @LogicExtensionPoint("GetFlows") public List getFlows() { From 66477d1958db7db2cc0fbfc34c6c12f8385db0b1 Mon Sep 17 00:00:00 2001 From: ssenko Date: Thu, 11 Jul 2024 17:54:31 +0300 Subject: [PATCH 04/13] Sync engine v1 --- .../icthh/xm/commons/flow/domain/Action.java | 7 +- .../xm/commons/flow/domain/Condition.java | 11 ++- .../icthh/xm/commons/flow/domain/Flow.java | 3 + .../icthh/xm/commons/flow/domain/Step.java | 2 + .../xm/commons/flow/engine/FlowExecutor.java | 81 +++++++++++++++++++ .../flow/engine/StepExecutorService.java | 48 +++++++++++ .../engine/context/FlowExecutionContext.java | 15 ++++ .../commons/flow/rest/FlowSpecResource.java | 3 +- .../xm/commons/flow/service/FlowService.java | 13 +++ .../service/resolver/StepKeyResolver.java | 17 ++++ .../flow/step/StepClassExecutor.groovy | 18 +++++ .../commons/flow/rest/FlowExecuteIntTest.java | 54 +++++++++++++ .../flow/rest/FlowResourceIntTest.java | 5 +- xm-commons-flow/src/test/resources/flow.yml | 7 +- 14 files changed, 271 insertions(+), 13 deletions(-) create mode 100644 xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutor.java create mode 100644 xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/StepExecutorService.java create mode 100644 xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/context/FlowExecutionContext.java create mode 100644 xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/resolver/StepKeyResolver.java create mode 100644 xm-commons-flow/src/main/resources/lep/default/flow/step/StepClassExecutor.groovy create mode 100644 xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Action.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Action.java index 2e21f593..486a81fe 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Action.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Action.java @@ -11,5 +11,10 @@ public class Action extends Step { private Boolean isIterable; private String iterableJsonPath; private Boolean skipIterableJsonPathError; - private List next; + private String next; + + @Override + public String getNext(Object context) { + return next; + } } diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Condition.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Condition.java index 1b29c953..ef5a747c 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Condition.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Condition.java @@ -3,11 +3,14 @@ import lombok.Data; import lombok.EqualsAndHashCode; -import java.util.List; - @Data @EqualsAndHashCode(callSuper = true) public class Condition extends Step { - private List nextOnConditionTrue; - private List nextOnConditionFalse; + private String nextOnConditionTrue; + private String nextOnConditionFalse; + + @Override + public String getNext(Object context) { + return Boolean.TRUE.equals(context) ? nextOnConditionTrue : nextOnConditionFalse; + } } diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Flow.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Flow.java index c2b67a24..7c24399a 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Flow.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Flow.java @@ -3,6 +3,7 @@ import com.icthh.xm.commons.config.client.api.refreshable.ConfigWithKey; import lombok.Data; +import javax.validation.constraints.NotBlank; import java.util.List; @Data @@ -10,6 +11,8 @@ public class Flow implements ConfigWithKey { private String key; private String version; private String description; + @NotBlank + private String startStep; private List steps; private Trigger trigger; } diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Step.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Step.java index b32661cb..42f04859 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Step.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Step.java @@ -27,6 +27,8 @@ public abstract class Step { private Map snippets; private StepSpec.StepType type; + public abstract String getNext(Object context); + @Data public static class Snippet { private String content; diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutor.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutor.java new file mode 100644 index 00000000..ed9061be --- /dev/null +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutor.java @@ -0,0 +1,81 @@ +package com.icthh.xm.commons.flow.engine; + +import com.icthh.xm.commons.flow.domain.Action; +import com.icthh.xm.commons.flow.domain.Condition; +import com.icthh.xm.commons.flow.domain.Flow; +import com.icthh.xm.commons.flow.domain.Step; +import com.icthh.xm.commons.flow.engine.context.FlowExecutionContext; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.Map; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +@Slf4j +@Service +@RequiredArgsConstructor +public class FlowExecutor { + + private final StepExecutorService stepExecutorService; + + public Object execute(Flow flow, Object input) { + try { + return executeInternal(flow, input); + } catch (Throwable e) { + log.error("Error execute flow with error {}", flow, e); + throw e; + } + } + + private Object executeInternal(Flow flow, Object input) { + // to step map + Map steps = flow.getSteps().stream().collect(toMap(Step::getKey, identity())); + Step firstStep = steps.get(flow.getStartStep()); + FlowExecutionContext context = new FlowExecutionContext(); + context.setInput(input); + + Step currentStep = firstStep; + String lastActionKey = null; + + lastActionKey = currentStep instanceof Action ? currentStep.getKey() : lastActionKey; + + while(currentStep != null) { + if (log.isTraceEnabled()) { + log.trace("Execute step: {} with input: {}", currentStep.getKey(), input); + } + context.getStepInput().put(currentStep.getKey(), input); + Object result = executeStep(input, currentStep, context); + context.getStepOutput().put(currentStep.getKey(), result); + if (log.isTraceEnabled()) { + log.trace("Step: {} executed with result: {}", currentStep.getKey(), result); + } + + String nextStep = currentStep.getNext(result); + if (nextStep == null) { + return context.getStepOutput().get(lastActionKey); + } + currentStep = steps.get(nextStep); + if (currentStep == null && isNotBlank(nextStep)) { + log.error("Step for key: {} not found", nextStep); + } + } + + return context.getStepOutput().get(lastActionKey); + } + + private Object executeStep(Object input, Step currentStep, FlowExecutionContext context) { + // TODO refactor this to some pattern that follow open close principle + if (currentStep instanceof Action) { + return stepExecutorService.executeAction(input, (Action) currentStep, context); + } else if (currentStep instanceof Condition) { + return stepExecutorService.executeCondition(input, (Condition) currentStep, context); + } else { + throw new IllegalArgumentException("Unsupported step type: " + currentStep.getClass()); + } + } + +} diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/StepExecutorService.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/StepExecutorService.java new file mode 100644 index 00000000..f7ce19d1 --- /dev/null +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/StepExecutorService.java @@ -0,0 +1,48 @@ +package com.icthh.xm.commons.flow.engine; + +import com.icthh.xm.commons.flow.domain.Action; +import com.icthh.xm.commons.flow.domain.Condition; +import com.icthh.xm.commons.flow.domain.Step; +import com.icthh.xm.commons.flow.engine.context.FlowExecutionContext; +import com.icthh.xm.commons.flow.service.resolver.StepKeyResolver; +import com.icthh.xm.commons.flow.spec.step.StepSpec; +import com.icthh.xm.commons.flow.spec.step.StepSpecService; +import com.icthh.xm.commons.lep.LogicExtensionPoint; +import com.icthh.xm.commons.lep.spring.LepService; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.NotImplementedException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import static java.lang.Boolean.TRUE; + +@Component +@LepService(group = "flow.step") +@RequiredArgsConstructor +public class StepExecutorService { + + private final StepSpecService stepSpecService; + + private StepExecutorService self; + + @LogicExtensionPoint(value = "Action", resolver = StepKeyResolver.class) + public Object executeAction(Object input, Action step, FlowExecutionContext context) { + return self.executeStepByClassImpl(stepSpecService.getStepSpec(step.getTypeKey()), input, step, context); + } + + @LogicExtensionPoint(value = "Condition", resolver = StepKeyResolver.class) + public Boolean executeCondition(Object input, Condition step, FlowExecutionContext context) { + return TRUE.equals(self.executeStepByClassImpl(stepSpecService.getStepSpec(step.getTypeKey()), input, step, context)); + } + + @LogicExtensionPoint(value = "StepClassExecutor") + public Object executeStepByClassImpl(StepSpec stepSpec, Object input, Step step, FlowExecutionContext context) { + throw new NotImplementedException("Error resolve step. Pls check support default groovy lep."); + } + + @Autowired + public void setSelf(StepExecutorService self) { + this.self = self; + } + +} diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/context/FlowExecutionContext.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/context/FlowExecutionContext.java new file mode 100644 index 00000000..a1467c53 --- /dev/null +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/context/FlowExecutionContext.java @@ -0,0 +1,15 @@ +package com.icthh.xm.commons.flow.engine.context; + +import lombok.Data; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Data +public class FlowExecutionContext { + + private Object input; + private final Map stepInput = new ConcurrentHashMap<>(); + private final Map stepOutput = new ConcurrentHashMap<>(); + +} diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/FlowSpecResource.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/FlowSpecResource.java index 1715d319..236f4062 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/FlowSpecResource.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/FlowSpecResource.java @@ -3,7 +3,6 @@ import com.icthh.xm.commons.flow.spec.resource.TenantResourceType; import com.icthh.xm.commons.flow.spec.resource.TenantResourceTypeService; import com.icthh.xm.commons.flow.spec.step.StepSpec; -import com.icthh.xm.commons.flow.spec.step.StepSpec.StepType; import com.icthh.xm.commons.flow.spec.step.StepSpecService; import com.icthh.xm.commons.flow.spec.trigger.TriggerType; import com.icthh.xm.commons.flow.spec.trigger.TriggerTypeSpecService; @@ -25,7 +24,7 @@ public class FlowSpecResource { private final TriggerTypeSpecService triggerSpecService; @GetMapping("/steps") - public List getSteps(@RequestParam(name = "stepType", required = false) StepType stepType) { + public List getSteps(@RequestParam(name = "stepType", required = false) StepSpec.StepType stepType) { return stepSpecService.getSteps(stepType); } diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/FlowService.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/FlowService.java index 9b2a8ee4..23966e6b 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/FlowService.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/FlowService.java @@ -19,6 +19,7 @@ import java.util.Map; import static java.util.stream.Collectors.toList; +import static org.apache.commons.lang3.StringUtils.isNotBlank; @Slf4j @RequiredArgsConstructor @@ -51,12 +52,14 @@ public Flow getFlow(String flowKey) { @LogicExtensionPoint(value = "CreateFlow", resolver = FlowTypeLepKeyResolver.class) public void createFlow(Flow flow) { assertNotExits(flow); + assertStartStepExists(flow); self.saveFlowInternal(flow); } @LogicExtensionPoint(value = "UpdateFlow", resolver = FlowTypeLepKeyResolver.class) public void updateFlow(Flow flow) { assertExits(flow.getKey()); + assertStartStepExists(flow); self.saveFlowInternal(flow); } @@ -125,4 +128,14 @@ private List convertToConfiguration(Map upda .collect(toList()); } + private void assertStartStepExists(Flow flow) { + boolean isExists = isNotBlank(flow.getStartStep()) && flow.getSteps().stream().anyMatch(step -> + flow.getStartStep().equals(step.getKey()) + ); + + if (!isExists) { + throw new BusinessException("error.flow.start.step.not.found", "Start step with key " + flow.getStartStep() + " not found"); + } + } + } diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/resolver/StepKeyResolver.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/resolver/StepKeyResolver.java new file mode 100644 index 00000000..874f1fcc --- /dev/null +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/resolver/StepKeyResolver.java @@ -0,0 +1,17 @@ +package com.icthh.xm.commons.flow.service.resolver; + +import com.icthh.xm.commons.flow.domain.Step; +import com.icthh.xm.lep.api.LepKeyResolver; +import com.icthh.xm.lep.api.LepMethod; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Component +public class StepKeyResolver implements LepKeyResolver { + @Override + public List segments(LepMethod method) { + String step = method.getParameter("step", Step.class).getTypeKey(); + return List.of(step); + } +} diff --git a/xm-commons-flow/src/main/resources/lep/default/flow/step/StepClassExecutor.groovy b/xm-commons-flow/src/main/resources/lep/default/flow/step/StepClassExecutor.groovy new file mode 100644 index 00000000..c313c185 --- /dev/null +++ b/xm-commons-flow/src/main/resources/lep/default/flow/step/StepClassExecutor.groovy @@ -0,0 +1,18 @@ +import com.icthh.xm.commons.flow.spec.step.StepSpec + +import java.util.concurrent.ConcurrentHashMap + +def step = lepContext.lepServices.getInstance(StepResolver.class).resolve(lepContext.inArgs.stepSpec) +return step.execute(lepContext) + +class StepResolver { + private final Map steps = new ConcurrentHashMap<>(); + + public def resolve(StepSpec spec) { + def step = steps.computeIfAbsent(spec.implementation) { + lepContext.lepServices.getInstance(Class.forName(spec.implementation)) + } + return step + } + +} diff --git a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java new file mode 100644 index 00000000..9e68e5a8 --- /dev/null +++ b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java @@ -0,0 +1,54 @@ +package com.icthh.xm.commons.flow.rest; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.icthh.xm.commons.config.domain.Configuration; +import com.icthh.xm.commons.flow.domain.Action; +import com.icthh.xm.commons.flow.domain.Flow; +import com.icthh.xm.commons.flow.domain.Step; +import com.icthh.xm.commons.flow.domain.Trigger; +import com.icthh.xm.commons.flow.service.FlowConfigService; +import com.icthh.xm.commons.flow.spec.step.StepSpec; +import com.icthh.xm.commons.lep.XmLepScriptConfigServerResourceLoader; +import lombok.SneakyThrows; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.web.servlet.ResultActions; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import static com.icthh.xm.commons.flow.steps.StepsRefreshableConfigurationUnitTest.loadFile; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put; +import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +public class FlowExecuteIntTest extends AbstractFlowIntTest { + + @Autowired + TestLepService testLepService; + + @Autowired + FlowConfigService flowConfigService; + + @Autowired + XmLepScriptConfigServerResourceLoader lep; + + @Test + public void test() { + + } +} diff --git a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowResourceIntTest.java b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowResourceIntTest.java index a021ec1a..f0073a3a 100644 --- a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowResourceIntTest.java +++ b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowResourceIntTest.java @@ -184,7 +184,7 @@ public Flow mockFlow() { step1.setTypeKey("actionkey"); step1.setParameters(Map.of("query", "select * from orders")); step1.setType(StepSpec.StepType.ACTION); - step1.setNext(List.of("step2")); + step1.setNext("step2"); // Snippets for Step 2 Step.Snippet precheckSnippetStep2 = new Step.Snippet(); @@ -202,7 +202,7 @@ public Flow mockFlow() { step2.setParameters(Map.of("query", "select * from users")); step2.setSnippets(Map.of("precheck", precheckSnippetStep2, "mapping", mappingSnippetStep2)); step2.setType(StepSpec.StepType.ACTION); - step2.setNext(List.of("step3")); + step2.setNext("step3"); // Snippets for Step 3 Step.Snippet precheckSnippetStep3 = new Step.Snippet(); @@ -234,6 +234,7 @@ public Flow mockFlow() { Flow flow = new Flow(); flow.setKey("my-flow"); flow.setTrigger(trigger); + flow.setStartStep("step1"); flow.setSteps(List.of(step1, step2, step3)); return flow; diff --git a/xm-commons-flow/src/test/resources/flow.yml b/xm-commons-flow/src/test/resources/flow.yml index de768c0e..a9d588ba 100644 --- a/xm-commons-flow/src/test/resources/flow.yml +++ b/xm-commons-flow/src/test/resources/flow.yml @@ -2,14 +2,14 @@ flows: - key: "my-flow" description: "Init description" + startStep: "step1" steps: - key: "step1" typeKey: "actionkey" parameters: query: "select * from orders" type: "ACTION" - next: - - "step2" + next: "step2" - key: "step2" typeKey: "actionkey" parameters: @@ -24,8 +24,7 @@ flows: \ false; }" extension: "js" type: "ACTION" - next: - - "step3" + next: "step3" - key: "step3" typeKey: "actionkey" parameters: From 579f60e842919a91678e34873f199e0bea42422c Mon Sep 17 00:00:00 2001 From: ssenko Date: Fri, 12 Jul 2024 21:53:35 +0300 Subject: [PATCH 05/13] Implement simple flow engine --- .../icthh/xm/commons/flow/api/Condition.java | 6 +- .../flow/api/FlowLepContextFields.java | 10 +- .../context/FlowLepAdditionalContext.java | 71 ++++++++ .../context/StepLepAdditionalContext.java | 93 +++++++++++ .../context/StepsLepAdditionalContext.java | 152 ++++++++++++++++++ .../TenantResourceLepAdditionalContext.java | 8 +- .../icthh/xm/commons/flow/domain/Action.java | 1 + .../icthh/xm/commons/flow/domain/Step.java | 1 - .../xm/commons/flow/engine/FlowExecutor.java | 72 +++++---- .../flow/engine/StepExecutorService.java | 17 +- .../engine/context/FlowExecutionContext.java | 8 +- .../flow/service/CodeSnippetExecutor.java | 6 +- .../flow/service/CodeSnippetService.java | 2 +- .../flow/spec/step/StepSpecService.java | 6 + .../flow/step/StepClassExecutor.groovy | 4 +- .../flow/rest/AbstractFlowIntTest.java | 55 ++++++- .../commons/flow/rest/FlowExecuteIntTest.java | 116 ++++++++++--- .../flow/rest/FlowResourceIntTest.java | 1 - .../xm/commons/flow/rest/TestLepContext.java | 3 +- ...StepsRefreshableConfigurationUnitTest.java | 2 +- .../test-flow-execute/DivideAction.groovy | 12 ++ .../NotIsZeroCondition.groovy | 12 ++ .../test-flow-execute/simple-flow.yml | 51 ++++++ .../resources/test-flow-execute/steps.yml | 14 ++ .../xm/commons/lep/api/BaseLepContext.java | 4 + .../xm/commons/lep/api/UseAsLepContext.java | 11 ++ .../lep/impl/LogicExtensionPointHandler.java | 13 +- .../commons/lep/impl/MethodSignatureImpl.java | 22 +++ .../com/icthh/xm/lep/api/MethodSignature.java | 3 + 29 files changed, 691 insertions(+), 85 deletions(-) create mode 100644 xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/FlowLepAdditionalContext.java create mode 100644 xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/StepLepAdditionalContext.java create mode 100644 xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/StepsLepAdditionalContext.java create mode 100644 xm-commons-flow/src/test/resources/test-flow-execute/DivideAction.groovy create mode 100644 xm-commons-flow/src/test/resources/test-flow-execute/NotIsZeroCondition.groovy create mode 100644 xm-commons-flow/src/test/resources/test-flow-execute/simple-flow.yml create mode 100644 xm-commons-flow/src/test/resources/test-flow-execute/steps.yml create mode 100644 xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/api/UseAsLepContext.java diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/api/Condition.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/api/Condition.java index f914ba4e..ce02fe1f 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/api/Condition.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/api/Condition.java @@ -2,6 +2,10 @@ import com.icthh.xm.commons.lep.api.BaseLepContext; -public interface Condition { +public interface Condition extends Action { + @Override + default Boolean execute(T lepContext) { + return test(lepContext); + } Boolean test(T lepContext); } diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/api/FlowLepContextFields.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/api/FlowLepContextFields.java index 3c51f2a5..505b2c40 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/api/FlowLepContextFields.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/api/FlowLepContextFields.java @@ -1,6 +1,14 @@ package com.icthh.xm.commons.flow.api; +import com.icthh.xm.commons.flow.context.FlowLepAdditionalContext.FlowLepAdditionalContextField; +import com.icthh.xm.commons.flow.context.StepLepAdditionalContext.StepLepAdditionalContextField; +import com.icthh.xm.commons.flow.context.StepsLepAdditionalContext.StepsLepAdditionalContextField; import com.icthh.xm.commons.flow.context.TenantResourceLepAdditionalContext.TenantResourceLepAdditionalContextField; -public interface FlowLepContextFields extends TenantResourceLepAdditionalContextField { +public interface FlowLepContextFields extends + TenantResourceLepAdditionalContextField, + StepLepAdditionalContextField, + StepsLepAdditionalContextField, + FlowLepAdditionalContextField +{ } diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/FlowLepAdditionalContext.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/FlowLepAdditionalContext.java new file mode 100644 index 00000000..5f1d8612 --- /dev/null +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/FlowLepAdditionalContext.java @@ -0,0 +1,71 @@ +package com.icthh.xm.commons.flow.context; + +import com.icthh.xm.commons.flow.engine.context.FlowExecutionContext; +import com.icthh.xm.commons.flow.context.FlowLepAdditionalContext.FlowContext; +import com.icthh.xm.commons.lep.TargetProceedingLep; +import com.icthh.xm.commons.lep.api.BaseLepContext; +import com.icthh.xm.commons.lep.api.LepAdditionalContext; +import com.icthh.xm.commons.lep.api.LepAdditionalContextField; +import com.icthh.xm.commons.lep.api.LepBaseKey; +import com.icthh.xm.commons.lep.api.LepEngine; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.Optional; +import java.util.Set; + +import static com.icthh.xm.commons.flow.context.FlowLepAdditionalContext.FlowLepAdditionalContextField.FLOW; +import static com.icthh.xm.commons.flow.engine.StepExecutorService.ACTION; +import static com.icthh.xm.commons.flow.engine.StepExecutorService.CONDITION; +import static com.icthh.xm.commons.flow.engine.StepExecutorService.FLOW_STEP_GROUP; +import static com.icthh.xm.commons.flow.engine.StepExecutorService.STEP_CLASS_EXECUTOR; + +@Component +@RequiredArgsConstructor +public class FlowLepAdditionalContext implements LepAdditionalContext { + + private static final Set STEP_LEP = Set.of(ACTION, CONDITION, STEP_CLASS_EXECUTOR); + + @Override + public String additionalContextKey() { + return FLOW; + } + + @Override + public FlowContext additionalContextValue() { + return null; + } + + @Override + public Optional additionalContextValue(BaseLepContext lepContext, LepEngine lepEngine, TargetProceedingLep lepMethod) { + LepBaseKey lepBaseKey = lepMethod.getLepBaseKey(); + if (FLOW_STEP_GROUP.equals(lepBaseKey.getGroup()) && STEP_LEP.contains(lepBaseKey.getBaseKey())) { + FlowExecutionContext context = lepMethod.getParameter("context", FlowExecutionContext.class); + return Optional.of(new FlowContext(context.getInput())); + } else { + return Optional.empty(); + } + } + + @Override + public Class fieldAccessorInterface() { + return FlowLepAdditionalContextField.class; + } + + public interface FlowLepAdditionalContextField extends LepAdditionalContextField { + String FLOW = "flow"; + default FlowContext getFlow() { + return (FlowContext)get(FLOW); + } + } + + public static class FlowContext { + public final Object input; + public final Object context; + + public FlowContext(Object input) { + this.input = input; + this.context = input; + } + } +} diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/StepLepAdditionalContext.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/StepLepAdditionalContext.java new file mode 100644 index 00000000..863ef412 --- /dev/null +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/StepLepAdditionalContext.java @@ -0,0 +1,93 @@ +package com.icthh.xm.commons.flow.context; + +import com.icthh.xm.commons.flow.context.StepLepAdditionalContext.StepContext; +import com.icthh.xm.commons.flow.domain.Step; +import com.icthh.xm.commons.flow.engine.context.FlowExecutionContext; +import com.icthh.xm.commons.flow.service.CodeSnippetExecutor; +import com.icthh.xm.commons.lep.TargetProceedingLep; +import com.icthh.xm.commons.lep.api.BaseLepContext; +import com.icthh.xm.commons.lep.api.LepAdditionalContext; +import com.icthh.xm.commons.lep.api.LepAdditionalContextField; +import com.icthh.xm.commons.lep.api.LepBaseKey; +import com.icthh.xm.commons.lep.api.LepEngine; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.icthh.xm.commons.config.client.utils.Utils.nullSafeMap; +import static com.icthh.xm.commons.flow.context.StepLepAdditionalContext.StepLepAdditionalContextField.STEP; +import static com.icthh.xm.commons.flow.engine.StepExecutorService.ACTION; +import static com.icthh.xm.commons.flow.engine.StepExecutorService.CONDITION; +import static com.icthh.xm.commons.flow.engine.StepExecutorService.FLOW_STEP_GROUP; +import static com.icthh.xm.commons.flow.engine.StepExecutorService.STEP_CLASS_EXECUTOR; + +@Component +@RequiredArgsConstructor +public class StepLepAdditionalContext implements LepAdditionalContext { + + private static final Set STEP_LEP = Set.of(ACTION, CONDITION, STEP_CLASS_EXECUTOR); + + private final CodeSnippetExecutor snippetExecutor; + + @Override + public String additionalContextKey() { + return STEP; + } + + @Override + public StepContext additionalContextValue() { + return null; + } + + @Override + public Optional additionalContextValue(BaseLepContext lepContext, LepEngine lepEngine, TargetProceedingLep lepMethod) { + LepBaseKey lepBaseKey = lepMethod.getLepBaseKey(); + if (FLOW_STEP_GROUP.equals(lepBaseKey.getGroup()) && STEP_LEP.contains(lepBaseKey.getBaseKey())) { + FlowExecutionContext context = lepMethod.getParameter("context", FlowExecutionContext.class); + Step step = lepMethod.getParameter("step", Step.class); + return Optional.of(new StepContext(context, step, snippetExecutor)); + } else { + return Optional.empty(); + } + } + + @Override + public Class fieldAccessorInterface() { + return StepLepAdditionalContextField.class; + } + + public interface StepLepAdditionalContextField extends LepAdditionalContextField { + String STEP = "step"; + default StepContext getStep() { + return (StepContext)get(STEP); + } + } + + public static class StepContext { + public final Object input; + public final Object context; + public final Map parameters; + + private final String flowKey; + private final String stepKey; + private final CodeSnippetExecutor snippetExecutor; + + public StepContext(FlowExecutionContext context, Step step, CodeSnippetExecutor snippetExecutor) { + var input = context.getStepInput().get(step.getKey()); + this.input = input; + this.context = input; + this.flowKey = context.getFlowKey(); + this.stepKey = step.getKey(); + this.parameters = Map.copyOf(nullSafeMap(step.getParameters())); + this.snippetExecutor = snippetExecutor; + } + + public Object runSnippet(String snippetKey, BaseLepContext lepContext) { + return snippetExecutor.runCodeSnippet(lepContext, List.of(flowKey, stepKey, snippetKey)); + } + } +} diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/StepsLepAdditionalContext.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/StepsLepAdditionalContext.java new file mode 100644 index 00000000..97ea5dbd --- /dev/null +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/StepsLepAdditionalContext.java @@ -0,0 +1,152 @@ +package com.icthh.xm.commons.flow.context; + +import com.icthh.xm.commons.flow.context.StepsLepAdditionalContext.StepsContext; +import com.icthh.xm.commons.flow.engine.context.FlowExecutionContext; +import com.icthh.xm.commons.lep.TargetProceedingLep; +import com.icthh.xm.commons.lep.api.BaseLepContext; +import com.icthh.xm.commons.lep.api.LepAdditionalContext; +import com.icthh.xm.commons.lep.api.LepAdditionalContextField; +import com.icthh.xm.commons.lep.api.LepBaseKey; +import com.icthh.xm.commons.lep.api.LepEngine; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.icthh.xm.commons.flow.context.StepsLepAdditionalContext.StepsLepAdditionalContextField.STEPS; +import static com.icthh.xm.commons.flow.engine.StepExecutorService.ACTION; +import static com.icthh.xm.commons.flow.engine.StepExecutorService.CONDITION; +import static com.icthh.xm.commons.flow.engine.StepExecutorService.FLOW_STEP_GROUP; +import static com.icthh.xm.commons.flow.engine.StepExecutorService.STEP_CLASS_EXECUTOR; + +@Component +@RequiredArgsConstructor +public class StepsLepAdditionalContext implements LepAdditionalContext { + + private static final Set STEP_LEP = Set.of(ACTION, CONDITION, STEP_CLASS_EXECUTOR); + + @Override + public String additionalContextKey() { + return STEPS; + } + + @Override + public StepsContext additionalContextValue() { + return null; + } + + @Override + public Optional additionalContextValue(BaseLepContext lepContext, LepEngine lepEngine, TargetProceedingLep lepMethod) { + LepBaseKey lepBaseKey = lepMethod.getLepBaseKey(); + if (FLOW_STEP_GROUP.equals(lepBaseKey.getGroup()) && STEP_LEP.contains(lepBaseKey.getBaseKey())) { + FlowExecutionContext context = lepMethod.getParameter("context", FlowExecutionContext.class); + return Optional.of(new StepsContext(context)); + } else { + return Optional.empty(); + } + } + + @Override + public Class fieldAccessorInterface() { + return StepsLepAdditionalContextField.class; + } + + public interface StepsLepAdditionalContextField extends LepAdditionalContextField { + String STEPS = "steps"; + default StepsContext getSteps() { + return (StepsContext)get(STEPS); + } + } + + public static class StepLog { + public Object input; + public Object output; + } + + public static class StepsContext implements Map { + public final Map delegate; + + public StepsContext(FlowExecutionContext context) { + Set keys = new HashSet<>(); + keys.addAll(context.getStepInput().keySet()); + keys.addAll(context.getStepOutput().keySet()); + + Map steps = new HashMap<>(); + keys.forEach(key -> { + StepLog stepLog = new StepLog(); + stepLog.input = context.getStepInput().get(key); + stepLog.output = context.getStepOutput().get(key); + steps.put(key, stepLog); + }); + + this.delegate = Map.copyOf(steps); + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public boolean isEmpty() { + return delegate.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return delegate.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + return delegate.containsValue(value); + } + + @Override + public StepLog get(Object key) { + return delegate.get(key); + } + + @Override + public StepLog put(String key, StepLog value) { + throw new UnsupportedOperationException(); + } + + @Override + public StepLog remove(Object key) { + throw new UnsupportedOperationException(); + } + + @Override + public void putAll(Map m) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + delegate.clear(); + } + + @Override + public Set keySet() { + return delegate.keySet(); + } + + @Override + public Collection values() { + return delegate.values(); + } + + @Override + public Set> entrySet() { + return delegate.entrySet(); + } + + } + +} diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/TenantResourceLepAdditionalContext.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/TenantResourceLepAdditionalContext.java index 94adc1b6..6ece0b4b 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/TenantResourceLepAdditionalContext.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/TenantResourceLepAdditionalContext.java @@ -8,7 +8,7 @@ import java.util.Map; -import static com.icthh.xm.commons.flow.context.TenantResourceLepAdditionalContext.TenantResourceLepAdditionalContextField.FIELD_NAME; +import static com.icthh.xm.commons.flow.context.TenantResourceLepAdditionalContext.TenantResourceLepAdditionalContextField.TENANT_RESOURCE; @Component @RequiredArgsConstructor @@ -18,7 +18,7 @@ public class TenantResourceLepAdditionalContext implements LepAdditionalContext< @Override public String additionalContextKey() { - return FIELD_NAME; + return TENANT_RESOURCE; } @Override @@ -32,9 +32,9 @@ public Class fieldAccessorInterface() { } public interface TenantResourceLepAdditionalContextField extends LepAdditionalContextField { - String FIELD_NAME = "resources"; + String TENANT_RESOURCE = "resources"; default Map>> getResources() { - return (Map>>)get(FIELD_NAME); + return (Map>>)get(TENANT_RESOURCE); } } } diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Action.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Action.java index 486a81fe..2b40668e 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Action.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Action.java @@ -11,6 +11,7 @@ public class Action extends Step { private Boolean isIterable; private String iterableJsonPath; private Boolean skipIterableJsonPathError; + private Boolean removeNullOutputForIterableResult; private String next; @Override diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Step.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Step.java index 42f04859..cafc97f2 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Step.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/domain/Step.java @@ -22,7 +22,6 @@ public abstract class Step { private String key; private String typeKey; - private List depends; private Map parameters; private Map snippets; private StepSpec.StepType type; diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutor.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutor.java index ed9061be..8d0b28d4 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutor.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutor.java @@ -22,49 +22,51 @@ public class FlowExecutor { private final StepExecutorService stepExecutorService; - public Object execute(Flow flow, Object input) { + public FlowExecutionContext execute(Flow flow, Object input) { + + FlowExecutionContext context = new FlowExecutionContext(flow.getKey()); + try { - return executeInternal(flow, input); - } catch (Throwable e) { - log.error("Error execute flow with error {}", flow, e); - throw e; - } - } + // to step map + Map steps = flow.getSteps().stream().collect(toMap(Step::getKey, identity())); + Step firstStep = steps.get(flow.getStartStep()); + context.setInput(input); - private Object executeInternal(Flow flow, Object input) { - // to step map - Map steps = flow.getSteps().stream().collect(toMap(Step::getKey, identity())); - Step firstStep = steps.get(flow.getStartStep()); - FlowExecutionContext context = new FlowExecutionContext(); - context.setInput(input); + Step currentStep = firstStep; + String lastActionKey = null; - Step currentStep = firstStep; - String lastActionKey = null; + while(currentStep != null) { + if (log.isTraceEnabled()) { + log.trace("Execute step: {} with input: {}", currentStep.getKey(), input); + } + lastActionKey = currentStep instanceof Action ? currentStep.getKey() : lastActionKey; - lastActionKey = currentStep instanceof Action ? currentStep.getKey() : lastActionKey; + context.getStepInput().put(currentStep.getKey(), input); + Object result = executeStep(input, currentStep, context); + context.getStepOutput().put(currentStep.getKey(), result); + if (log.isTraceEnabled()) { + log.trace("Step: {} executed with result: {}", currentStep.getKey(), result); + } - while(currentStep != null) { - if (log.isTraceEnabled()) { - log.trace("Execute step: {} with input: {}", currentStep.getKey(), input); - } - context.getStepInput().put(currentStep.getKey(), input); - Object result = executeStep(input, currentStep, context); - context.getStepOutput().put(currentStep.getKey(), result); - if (log.isTraceEnabled()) { - log.trace("Step: {} executed with result: {}", currentStep.getKey(), result); - } + input = result; - String nextStep = currentStep.getNext(result); - if (nextStep == null) { - return context.getStepOutput().get(lastActionKey); - } - currentStep = steps.get(nextStep); - if (currentStep == null && isNotBlank(nextStep)) { - log.error("Step for key: {} not found", nextStep); + String nextStep = currentStep.getNext(result); + if (nextStep == null) { + context.setOutput(context.getStepOutput().get(lastActionKey)); + return context; + } + currentStep = steps.get(nextStep); + if (currentStep == null && isNotBlank(nextStep)) { + log.error("Step for key: {} not found", nextStep); + } } - } - return context.getStepOutput().get(lastActionKey); + context.setOutput(context.getStepOutput().get(lastActionKey)); + return context; + } catch (Throwable e) { + log.error("Error execute flow with error {} | executionContext: {}", flow.getKey(), context, e); + throw e; + } } private Object executeStep(Object input, Step currentStep, FlowExecutionContext context) { diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/StepExecutorService.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/StepExecutorService.java index f7ce19d1..9fd08fa8 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/StepExecutorService.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/StepExecutorService.java @@ -11,32 +11,41 @@ import com.icthh.xm.commons.lep.spring.LepService; import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import static java.lang.Boolean.TRUE; @Component -@LepService(group = "flow.step") +@LepService(group = StepExecutorService.FLOW_STEP_GROUP) @RequiredArgsConstructor public class StepExecutorService { + public static final String ACTION = "Action"; + public static final String CONDITION = "Condition"; + public static final String STEP_CLASS_EXECUTOR = "StepClassExecutor"; + public static final String FLOW_STEP_GROUP = "flow.step"; + private final StepSpecService stepSpecService; private StepExecutorService self; - @LogicExtensionPoint(value = "Action", resolver = StepKeyResolver.class) + @LogicExtensionPoint(value = ACTION, resolver = StepKeyResolver.class) public Object executeAction(Object input, Action step, FlowExecutionContext context) { return self.executeStepByClassImpl(stepSpecService.getStepSpec(step.getTypeKey()), input, step, context); } - @LogicExtensionPoint(value = "Condition", resolver = StepKeyResolver.class) + @LogicExtensionPoint(value = CONDITION, resolver = StepKeyResolver.class) public Boolean executeCondition(Object input, Condition step, FlowExecutionContext context) { return TRUE.equals(self.executeStepByClassImpl(stepSpecService.getStepSpec(step.getTypeKey()), input, step, context)); } - @LogicExtensionPoint(value = "StepClassExecutor") + @LogicExtensionPoint(value = STEP_CLASS_EXECUTOR) public Object executeStepByClassImpl(StepSpec stepSpec, Object input, Step step, FlowExecutionContext context) { + if (StringUtils.isBlank(stepSpec.getImplementation())) { + throw new IllegalArgumentException("Step implementation is not defined for step: " + step.getTypeKey()); + } throw new NotImplementedException("Error resolve step. Pls check support default groovy lep."); } diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/context/FlowExecutionContext.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/context/FlowExecutionContext.java index a1467c53..9a620157 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/context/FlowExecutionContext.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/context/FlowExecutionContext.java @@ -2,14 +2,16 @@ import lombok.Data; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; @Data public class FlowExecutionContext { + private final String flowKey; private Object input; - private final Map stepInput = new ConcurrentHashMap<>(); - private final Map stepOutput = new ConcurrentHashMap<>(); + private Object output; + private final Map stepInput = new HashMap<>(); + private final Map stepOutput = new HashMap<>(); } diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/CodeSnippetExecutor.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/CodeSnippetExecutor.java index 5f4cc43b..46cd57eb 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/CodeSnippetExecutor.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/CodeSnippetExecutor.java @@ -3,6 +3,8 @@ import com.icthh.xm.commons.flow.service.resolver.SnippetListLepKeyResolver; import com.icthh.xm.commons.flow.service.resolver.SnippetVarargsLepKeyResolver; import com.icthh.xm.commons.lep.LogicExtensionPoint; +import com.icthh.xm.commons.lep.api.BaseLepContext; +import com.icthh.xm.commons.lep.api.UseAsLepContext; import com.icthh.xm.commons.lep.spring.LepService; import org.springframework.stereotype.Service; @@ -15,12 +17,12 @@ public class CodeSnippetExecutor { public static final String SNIPPET = "Snippet"; @LogicExtensionPoint(value = SNIPPET, resolver = SnippetVarargsLepKeyResolver.class) - public Object runCodeSnippet(Object input, String ...snippetKeys) { + public Object runCodeSnippet(@UseAsLepContext BaseLepContext lepContext, String ...snippetKeys) { return null; } @LogicExtensionPoint(value = SNIPPET, resolver = SnippetListLepKeyResolver.class) - public Object runCodeSnippet(Object input, List snippetKeys) { + public Object runCodeSnippet(@UseAsLepContext BaseLepContext lepContext, List snippetKeys) { return null; } } diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/CodeSnippetService.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/CodeSnippetService.java index 25809779..e3ca058c 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/CodeSnippetService.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/CodeSnippetService.java @@ -60,7 +60,7 @@ private String buildPath(String flowKey, String stepKey, String fileKey, Snippet throw new BusinessException("error.illegal.code.snippet.file.name", "File name can't contain '/' character"); } String tenantKey = tenantContextHolder.getTenantKey().toUpperCase(); - String folderPath = "/config/tenants/" + tenantKey + "/" + appName + "/flow/snippets/"; + String folderPath = "/config/tenants/" + tenantKey + "/" + appName + "/lep/flow/snippets/"; return folderPath + fileName; } } diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/spec/step/StepSpecService.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/spec/step/StepSpecService.java index d1cf1bab..5a458f79 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/spec/step/StepSpecService.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/spec/step/StepSpecService.java @@ -10,6 +10,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import static com.icthh.xm.commons.config.client.utils.Utils.nullSafeList; @@ -48,6 +49,11 @@ public JavaType configFileJavaType(TypeFactory factory) { } public StepSpec getStepSpec(String stepKey) { + return Optional.ofNullable(getStepsSpec().get(stepKey)) + .orElseThrow(() -> new IllegalArgumentException("Step not found: " + stepKey)); + } + + public StepSpec findStepSpec(String stepKey) { return getStepsSpec().get(stepKey); } diff --git a/xm-commons-flow/src/main/resources/lep/default/flow/step/StepClassExecutor.groovy b/xm-commons-flow/src/main/resources/lep/default/flow/step/StepClassExecutor.groovy index c313c185..3d49c3aa 100644 --- a/xm-commons-flow/src/main/resources/lep/default/flow/step/StepClassExecutor.groovy +++ b/xm-commons-flow/src/main/resources/lep/default/flow/step/StepClassExecutor.groovy @@ -2,13 +2,13 @@ import com.icthh.xm.commons.flow.spec.step.StepSpec import java.util.concurrent.ConcurrentHashMap -def step = lepContext.lepServices.getInstance(StepResolver.class).resolve(lepContext.inArgs.stepSpec) +def step = lepContext.lepServices.getInstance(StepResolver.class).resolve(lepContext, lepContext.inArgs.stepSpec) return step.execute(lepContext) class StepResolver { private final Map steps = new ConcurrentHashMap<>(); - public def resolve(StepSpec spec) { + public def resolve(def lepContext, StepSpec spec) { def step = steps.computeIfAbsent(spec.implementation) { lepContext.lepServices.getInstance(Class.forName(spec.implementation)) } diff --git a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/AbstractFlowIntTest.java b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/AbstractFlowIntTest.java index d00b901f..550fca32 100644 --- a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/AbstractFlowIntTest.java +++ b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/AbstractFlowIntTest.java @@ -1,7 +1,15 @@ package com.icthh.xm.commons.flow.rest; +import com.icthh.xm.commons.config.client.api.RefreshableConfiguration; import com.icthh.xm.commons.config.client.repository.TenantConfigRepository; +import com.icthh.xm.commons.config.domain.Configuration; +import com.icthh.xm.commons.flow.context.FlowLepAdditionalContext; +import com.icthh.xm.commons.flow.context.StepLepAdditionalContext; +import com.icthh.xm.commons.flow.context.StepsLepAdditionalContext; import com.icthh.xm.commons.flow.context.TenantResourceLepAdditionalContext; +import com.icthh.xm.commons.flow.engine.FlowExecutor; +import com.icthh.xm.commons.flow.engine.StepExecutorService; +import com.icthh.xm.commons.flow.service.CodeSnippetExecutor; import com.icthh.xm.commons.flow.service.CodeSnippetService; import com.icthh.xm.commons.flow.service.FlowConfigService; import com.icthh.xm.commons.flow.service.FlowService; @@ -9,6 +17,8 @@ import com.icthh.xm.commons.flow.service.TenantResourceService; import com.icthh.xm.commons.flow.service.YamlConverter; import com.icthh.xm.commons.flow.service.resolver.FlowTypeLepKeyResolver; +import com.icthh.xm.commons.flow.service.resolver.SnippetListLepKeyResolver; +import com.icthh.xm.commons.flow.service.resolver.StepKeyResolver; import com.icthh.xm.commons.flow.service.resolver.TenantResourceTypeLepKeyResolver; import com.icthh.xm.commons.flow.service.resolver.TriggerResolver; import com.icthh.xm.commons.flow.service.trigger.TriggerProcessor; @@ -23,18 +33,28 @@ import com.icthh.xm.commons.tenant.TenantContextHolder; import com.icthh.xm.commons.tenant.TenantContextUtils; import com.icthh.xm.commons.tenant.spring.config.TenantContextConfiguration; +import lombok.SneakyThrows; +import org.apache.commons.io.IOUtils; import org.junit.After; import org.junit.Before; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.core.io.ClassPathResource; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.setup.MockMvcBuilders; import org.springframework.web.context.WebApplicationContext; import org.springframework.web.servlet.config.annotation.EnableWebMvc; +import java.io.InputStream; +import java.util.List; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; + @RunWith(SpringRunner.class) @SpringBootTest( @@ -56,12 +76,20 @@ FlowResource.class, FlowService.class, CodeSnippetService.class, + CodeSnippetExecutor.class, + SnippetListLepKeyResolver.class, TriggerProcessor.class, FlowTypeLepKeyResolver.class, TriggerResolver.class, TriggerTypeSpecService.class, YamlConverter.class, - FlowConfigService.class + FlowConfigService.class, + FlowExecutor.class, + StepKeyResolver.class, + StepExecutorService.class, + StepLepAdditionalContext.class, + StepsLepAdditionalContext.class, + FlowLepAdditionalContext.class }, properties = {"spring.application.name=testApp"} ) @@ -77,6 +105,8 @@ public abstract class AbstractFlowIntTest { TenantContextHolder tenantContextHolder; @MockBean TenantConfigRepository tenantConfigRepository; + @Autowired + List configurations; @Before public void setup() { @@ -90,4 +120,27 @@ public void after() { lepManagementService.endThreadContext(); } + public void updateConfiguration(String path, String content) { + configurations.stream().filter(c -> c.isListeningConfiguration(path)).forEach(c -> c.onRefresh(path, content)); + } + + @SneakyThrows + public static String loadFile(String path) { + try (InputStream cfgInputStream = new ClassPathResource(path).getInputStream()) { + return IOUtils.toString(cfgInputStream, UTF_8); + } + } + + public void mockSendConfigToRefresh() { + doAnswer(invocation -> { + if (invocation.getArguments()[0] == null) { + return null; + } + List configurations = invocation.getArgument(0); + configurations.forEach(c -> updateConfiguration(c.getPath(), c.getContent())); + return null; + }).when(tenantConfigRepository).updateConfigurations(any()); + + } + } diff --git a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java index 9e68e5a8..625139a1 100644 --- a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java +++ b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java @@ -2,39 +2,20 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import com.icthh.xm.commons.config.domain.Configuration; -import com.icthh.xm.commons.flow.domain.Action; import com.icthh.xm.commons.flow.domain.Flow; -import com.icthh.xm.commons.flow.domain.Step; -import com.icthh.xm.commons.flow.domain.Trigger; +import com.icthh.xm.commons.flow.engine.FlowExecutor; +import com.icthh.xm.commons.flow.engine.context.FlowExecutionContext; import com.icthh.xm.commons.flow.service.FlowConfigService; -import com.icthh.xm.commons.flow.spec.step.StepSpec; -import com.icthh.xm.commons.lep.XmLepScriptConfigServerResourceLoader; +import com.icthh.xm.commons.flow.service.FlowConfigService.FlowsConfig; +import com.icthh.xm.commons.flow.service.FlowService; import lombok.SneakyThrows; import org.junit.Test; -import org.mockito.ArgumentCaptor; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.web.servlet.ResultActions; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; +import java.io.Serializable; import java.util.Map; -import static com.icthh.xm.commons.flow.steps.StepsRefreshableConfigurationUnitTest.loadFile; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.verify; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put; -import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; public class FlowExecuteIntTest extends AbstractFlowIntTest { @@ -45,10 +26,93 @@ public class FlowExecuteIntTest extends AbstractFlowIntTest { FlowConfigService flowConfigService; @Autowired - XmLepScriptConfigServerResourceLoader lep; + FlowService flowService; + + @Autowired + FlowExecutor flowExecutor; @Test - public void test() { + @SneakyThrows + public void testSimpleExecution() { + String simpleFlow = loadFile("test-flow-execute/simple-flow.yml"); + mockSendConfigToRefresh(); + updateConfiguration("/config/tenants/TEST/testApp/lep/flow/step/Action$$sum.groovy", + "return lepContext.flow.input.b + lepContext.step.parameters.a"); + updateConfiguration("/config/tenants/TEST/testApp/lep/flow/step/Condition$$groovyCondition.groovy", + "return lepContext.step.runSnippet('groovyCondition', lepContext)"); + updateConfiguration("/config/tenants/commons/lep/flow/conditions/NotIsZeroCondition.groovy", + loadFile("test-flow-execute/NotIsZeroCondition.groovy")); + updateConfiguration("/config/tenants/commons/lep/flow/actions/DivideAction.groovy", + loadFile("test-flow-execute/DivideAction.groovy")); + + updateConfiguration("/config/tenants/TEST/testApp/lep/flow/step/Action$$groovyAction.groovy", + "return lepContext.step.runSnippet('groovyAction', lepContext)"); + updateConfiguration("/config/tenants/TEST/testApp/lep/flow/step/Action$$minus.groovy", + "return (int) (lepContext.step.parameters.a - lepContext.flow.input.b)"); + + updateConfiguration("/config/tenants/TEST/testApp/flow/step-spec/steps.yml", loadFile("test-flow-execute/steps.yml")); + + FlowsConfig flows = new ObjectMapper(new YAMLFactory()).readValue(simpleFlow, FlowsConfig.class); + Flow flow = flows.getFlows().get(0); + flowService.createFlow(flow); + FlowExecutionContext executionContext = flowExecutor.execute(flow, Map.of("b", -1)); + assertEquals(0, executionContext.getOutput()); + assertEquals( + mockContext( + "simple-flow", + Map.of("b", -1), + 0, + Map.of("step1", Map.of("b", -1), "step2", 0, "step3", false), + Map.of("step1", 0, "step2", false, "step3", false) + ), + executionContext + ); + + executionContext = flowExecutor.execute(flow, Map.of("b", 2)); + assertEquals(8, executionContext.getOutput()); + assertEquals( + mockContext( + "simple-flow", + Map.of("b", 2), + 8, + Map.of("step1", Map.of("b", 2), "step2", 3, "step3", false, "step4", true), + Map.of("step1", 3, "step2", false, "step3", true, "step4", 8) + ), + executionContext + ); + + executionContext = flowExecutor.execute(flow, Map.of("b", 5)); + assertEquals(2, executionContext.getOutput()); + assertEquals( + mockContext( + "simple-flow", + Map.of("b", 5), + 2, + Map.of("step1", Map.of("b", 5), "step2", 6, "step5", true), + Map.of("step1", 6, "step2", true, "step5", 2) + ), + executionContext + ); } + + private FlowExecutionContext mockContext( + String key, + Map input, + Object output, + Map stepInput, + Map stepOutput + ) { + var context = new FlowExecutionContext(key); + context.setInput(input); + context.setOutput(output); + for (Map.Entry entry : stepInput.entrySet()) { + context.getStepInput().put(entry.getKey(), entry.getValue()); + } + for (Map.Entry entry : stepOutput.entrySet()) { + context.getStepOutput().put(entry.getKey(), entry.getValue()); + } + return context; + } + } diff --git a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowResourceIntTest.java b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowResourceIntTest.java index f0073a3a..9111bbab 100644 --- a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowResourceIntTest.java +++ b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowResourceIntTest.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.Map; -import static com.icthh.xm.commons.flow.steps.StepsRefreshableConfigurationUnitTest.loadFile; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; diff --git a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/TestLepContext.java b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/TestLepContext.java index 1e24d907..de39ee23 100644 --- a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/TestLepContext.java +++ b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/TestLepContext.java @@ -1,6 +1,7 @@ package com.icthh.xm.commons.flow.rest; +import com.icthh.xm.commons.flow.api.FlowLepContextFields; import com.icthh.xm.commons.lep.api.BaseLepContext; -public class TestLepContext extends BaseLepContext { +public class TestLepContext extends BaseLepContext implements FlowLepContextFields { } diff --git a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/steps/StepsRefreshableConfigurationUnitTest.java b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/steps/StepsRefreshableConfigurationUnitTest.java index aa744a8f..454da7f3 100644 --- a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/steps/StepsRefreshableConfigurationUnitTest.java +++ b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/steps/StepsRefreshableConfigurationUnitTest.java @@ -25,7 +25,7 @@ public void testReadConfiguration() { when(mock.getTenantKey()).thenReturn("UNIT_TEST"); var service = new StepSpecService("test", mock); service.onRefresh("/config/tenants/UNIT_TEST/test/flow/step-spec/testreadspec.yml", loadFile("step-spec/testreadspec.yml")); - StepSpec actionSpec = service.getStepSpec("actionkey"); + StepSpec actionSpec = service.findStepSpec("actionkey"); assertThat(actionSpec, equalTo(mockAction())); } diff --git a/xm-commons-flow/src/test/resources/test-flow-execute/DivideAction.groovy b/xm-commons-flow/src/test/resources/test-flow-execute/DivideAction.groovy new file mode 100644 index 00000000..04c56a37 --- /dev/null +++ b/xm-commons-flow/src/test/resources/test-flow-execute/DivideAction.groovy @@ -0,0 +1,12 @@ +package commons.lep.flow.actions + +import com.icthh.xm.commons.flow.api.Action +import com.icthh.xm.commons.flow.api.FlowLepContextFields +import com.icthh.xm.commons.lep.api.BaseLepContext + +class DivideAction implements Action { + @Override + Object execute(T lepContext) { + return (int) (lepContext.step.parameters.c / lepContext.steps[lepContext.step.parameters.stepKey].output) + } +} diff --git a/xm-commons-flow/src/test/resources/test-flow-execute/NotIsZeroCondition.groovy b/xm-commons-flow/src/test/resources/test-flow-execute/NotIsZeroCondition.groovy new file mode 100644 index 00000000..5294e6ed --- /dev/null +++ b/xm-commons-flow/src/test/resources/test-flow-execute/NotIsZeroCondition.groovy @@ -0,0 +1,12 @@ +package commons.lep.flow.conditions + +import com.icthh.xm.commons.flow.api.Condition +import com.icthh.xm.commons.flow.api.FlowLepContextFields +import com.icthh.xm.commons.lep.api.BaseLepContext + +class NotIsZeroCondition implements Condition { + @Override + Boolean test(T lepContext) { + return lepContext.steps[lepContext.step.parameters.stepKey].output != 0 + } +} diff --git a/xm-commons-flow/src/test/resources/test-flow-execute/simple-flow.yml b/xm-commons-flow/src/test/resources/test-flow-execute/simple-flow.yml new file mode 100644 index 00000000..8cd74279 --- /dev/null +++ b/xm-commons-flow/src/test/resources/test-flow-execute/simple-flow.yml @@ -0,0 +1,51 @@ +--- +flows: +- key: "simple-flow" + startStep: "step1" + steps: + - key: "step1" + typeKey: "sum" + parameters: + a: 1 + type: "ACTION" + next: "step2" + - key: "step2" + typeKey: "groovyCondition" + parameters: + c: 5 + snippets: + groovyCondition: + content: "lepContext.step.context > lepContext.step.parameters.c" + extension: "groovy" + type: "CONDITION" + nextOnConditionFalse: "step3" + nextOnConditionTrue: "step5" + - key: "step3" + typeKey: "notIsZero" + type: "CONDITION" + parameters: + stepKey: "step1" + nextOnConditionTrue: "step4" + - key: "step4" + typeKey: "divide" + parameters: + c: 24 + stepKey: "step1" + type: "ACTION" + - key: "step5" + typeKey: "minus" + parameters: + a: 7 + type: "ACTION" + - key: "step6" + typeKey: "groovyAction" + type: "ACTION" + snippets: + groovyAction: + content: "(int)(lepContext.steps.step1.output * lepContext.steps.step5.output)" + extension: "groovy" + trigger: + typeKey: "httpkey" + parameters: + method: "GET" + url: "/api/orders" diff --git a/xm-commons-flow/src/test/resources/test-flow-execute/steps.yml b/xm-commons-flow/src/test/resources/test-flow-execute/steps.yml new file mode 100644 index 00000000..1ba16ed8 --- /dev/null +++ b/xm-commons-flow/src/test/resources/test-flow-execute/steps.yml @@ -0,0 +1,14 @@ +- key: sum + type: ACTION +- key: jsCondition + type: CONDITION +- key: notIsZero + type: CONDITION + implementation: commons.lep.flow.conditions.NotIsZeroCondition +- key: divide + type: ACTION + implementation: commons.lep.flow.actions.DivideAction +- key: minus + type: ACTION +- key: jsAction + type: ACTION diff --git a/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/api/BaseLepContext.java b/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/api/BaseLepContext.java index 72c24132..31edc49a 100644 --- a/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/api/BaseLepContext.java +++ b/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/api/BaseLepContext.java @@ -28,6 +28,10 @@ public Object get(Object additionalContextKey) { return additionalContext.get(additionalContextKey); } + public Object get(String additionalContextKey) { + return additionalContext.get(additionalContextKey); + } + public Object propertyMissing(String prop) { return get(prop); } diff --git a/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/api/UseAsLepContext.java b/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/api/UseAsLepContext.java new file mode 100644 index 00000000..89e37d9a --- /dev/null +++ b/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/api/UseAsLepContext.java @@ -0,0 +1,11 @@ +package com.icthh.xm.commons.lep.api; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.PARAMETER) +public @interface UseAsLepContext { +} diff --git a/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/impl/LogicExtensionPointHandler.java b/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/impl/LogicExtensionPointHandler.java index 34e2d2d5..0deab2f6 100644 --- a/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/impl/LogicExtensionPointHandler.java +++ b/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/impl/LogicExtensionPointHandler.java @@ -7,6 +7,7 @@ import com.icthh.xm.commons.lep.api.LepExecutor; import com.icthh.xm.commons.lep.api.LepKey; import com.icthh.xm.commons.lep.api.LepManagementService; +import com.icthh.xm.commons.lep.api.UseAsLepContext; import com.icthh.xm.commons.lep.spring.LepContextService; import com.icthh.xm.commons.lep.spring.LepService; import com.icthh.xm.lep.api.LepInvocationCauseException; @@ -18,6 +19,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.lang.reflect.Parameter; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -75,10 +77,19 @@ public Object handleLepMethod(Class targetType, Object target, Method method, private Object invokeLepMethod(LepEngine lepEngine, Object target, LepMethod lepMethod, LepKey lepKey) { TargetProceedingLep targetProceedingLep = new TargetProceedingLep(target, lepMethod, lepKey); - BaseLepContext lepContext = lepContextService.createLepContext(lepEngine, targetProceedingLep); + BaseLepContext lepContext = buildLepContext(lepEngine, lepMethod, targetProceedingLep); return lepEngine.invoke(lepKey, targetProceedingLep, lepContext); } + private BaseLepContext buildLepContext(LepEngine lepEngine, LepMethod lepMethod, TargetProceedingLep targetProceedingLep) { + String lepContextMethodParameter = lepMethod.getMethodSignature().getLepContextMethodParameter(); + if (lepContextMethodParameter == null) { + return lepContextService.createLepContext(lepEngine, targetProceedingLep); + } else { + return lepMethod.getParameter(lepContextMethodParameter, BaseLepContext.class); + } + } + @SneakyThrows private Object invokeOriginalMethod(Object target, LepMethod lepMethod, LepKey lepKey) { Method method = lepMethod.getMethodSignature().getMethod(); diff --git a/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/impl/MethodSignatureImpl.java b/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/impl/MethodSignatureImpl.java index 43904825..41b918ea 100644 --- a/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/impl/MethodSignatureImpl.java +++ b/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/impl/MethodSignatureImpl.java @@ -1,5 +1,7 @@ package com.icthh.xm.commons.lep.impl; +import com.icthh.xm.commons.lep.api.BaseLepContext; +import com.icthh.xm.commons.lep.api.UseAsLepContext; import com.icthh.xm.lep.api.MethodSignature; import java.lang.reflect.Method; @@ -9,6 +11,7 @@ import java.util.List; import java.util.Map; +import static java.util.Arrays.stream; import static java.util.Collections.emptyList; import static java.util.Collections.unmodifiableList; import static java.util.Collections.unmodifiableMap; @@ -23,9 +26,11 @@ public class MethodSignatureImpl implements MethodSignature { private final String[] parameterNamesArray; private final Method method; private final String declaringClassName; + private final String lepContextMethodParameter; public MethodSignatureImpl(Method method, Class targetType) { this.method = method; + this.lepContextMethodParameter = calculateLepContextMethodParameter(method); this.parameterNames = calculateParametersNames(method); this.parameterIndexes = calculateParametersIndexes(this.parameterNames); this.parameterNamesArray = this.parameterNames.toArray(STRINGS_EMPTY_ARRAY); @@ -33,6 +38,19 @@ public MethodSignatureImpl(Method method, Class targetType) { this.declaringClassName = (declaringClass != null) ? declaringClass.getName() : null; } + private String calculateLepContextMethodParameter(Method method) { + Parameter[] parameters = method.getParameters(); + if (parameters != null) { + return stream(parameters) + .filter(p -> BaseLepContext.class.isAssignableFrom(p.getType())) + .filter(p -> p.getAnnotation(UseAsLepContext.class) != null) + .findAny() + .map(Parameter::getName) + .orElse(null); + } + return null; + } + private Map calculateParametersIndexes(List parameterNames) { Map parameterIndexes = new HashMap<>(); for (int i = 0; i < parameterNames.size(); i++) { @@ -109,5 +127,9 @@ public Method getMethod() { return method; } + @Override + public String getLepContextMethodParameter() { + return lepContextMethodParameter; + } } diff --git a/xm-commons-lep/src/main/java/com/icthh/xm/lep/api/MethodSignature.java b/xm-commons-lep/src/main/java/com/icthh/xm/lep/api/MethodSignature.java index 07400c73..c26a7296 100644 --- a/xm-commons-lep/src/main/java/com/icthh/xm/lep/api/MethodSignature.java +++ b/xm-commons-lep/src/main/java/com/icthh/xm/lep/api/MethodSignature.java @@ -1,6 +1,8 @@ package com.icthh.xm.lep.api; +import java.lang.annotation.Annotation; import java.lang.reflect.Method; +import java.lang.reflect.Parameter; import java.util.List; /** @@ -116,4 +118,5 @@ public interface MethodSignature { */ Method getMethod(); + String getLepContextMethodParameter(); } From c6090678e3e68e6d4a06d70bf9e784d5c71bc405 Mon Sep 17 00:00:00 2001 From: ssenko Date: Sat, 13 Jul 2024 21:58:16 +0300 Subject: [PATCH 06/13] Implement sync engine --- xm-commons-flow/build.gradle | 1 + .../context/FlowLepAdditionalContext.java | 16 +++- .../xm/commons/flow/engine/FlowExecutor.java | 8 +- .../flow/rest/FlowExecuteResource.java | 58 +++++++++++++ .../xm/commons/flow/rest/FlowResource.java | 13 +++ .../commons/flow/rest/FlowSpecResource.java | 8 ++ .../flow/rest/TenantResourceResource.java | 13 +++ .../xm/commons/flow/service/FlowService.java | 36 ++++++++ .../resolver/FlowKeyLepKeyResolver.java | 15 ++++ .../flow/rest/AbstractFlowIntTest.java | 12 +++ .../commons/flow/rest/FlowExecuteIntTest.java | 83 ++++++++++++++----- .../flow/rest/FlowResourceIntTest.java | 10 +-- 12 files changed, 238 insertions(+), 35 deletions(-) create mode 100644 xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/FlowExecuteResource.java create mode 100644 xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/resolver/FlowKeyLepKeyResolver.java diff --git a/xm-commons-flow/build.gradle b/xm-commons-flow/build.gradle index 70e502dc..d2aeb21c 100644 --- a/xm-commons-flow/build.gradle +++ b/xm-commons-flow/build.gradle @@ -11,6 +11,7 @@ dependencies { compile project(":xm-commons-config") compile project(":xm-commons-i18n") compile project(":xm-commons-lep") + compile project(":xm-commons-permission") implementation("com.github.java-json-tools:json-schema-validator:2.2.8") { exclude group: 'javax.mail', module: 'mailapi' diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/FlowLepAdditionalContext.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/FlowLepAdditionalContext.java index 5f1d8612..836f6a33 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/FlowLepAdditionalContext.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/FlowLepAdditionalContext.java @@ -1,7 +1,9 @@ package com.icthh.xm.commons.flow.context; +import com.icthh.xm.commons.flow.engine.FlowExecutor; import com.icthh.xm.commons.flow.engine.context.FlowExecutionContext; import com.icthh.xm.commons.flow.context.FlowLepAdditionalContext.FlowContext; +import com.icthh.xm.commons.flow.service.FlowService; import com.icthh.xm.commons.lep.TargetProceedingLep; import com.icthh.xm.commons.lep.api.BaseLepContext; import com.icthh.xm.commons.lep.api.LepAdditionalContext; @@ -26,6 +28,8 @@ public class FlowLepAdditionalContext implements LepAdditionalContext STEP_LEP = Set.of(ACTION, CONDITION, STEP_CLASS_EXECUTOR); + private final FlowService flowService; + @Override public String additionalContextKey() { return FLOW; @@ -41,9 +45,9 @@ public Optional additionalContextValue(BaseLepContext lepContext, L LepBaseKey lepBaseKey = lepMethod.getLepBaseKey(); if (FLOW_STEP_GROUP.equals(lepBaseKey.getGroup()) && STEP_LEP.contains(lepBaseKey.getBaseKey())) { FlowExecutionContext context = lepMethod.getParameter("context", FlowExecutionContext.class); - return Optional.of(new FlowContext(context.getInput())); + return Optional.of(new FlowContext(context.getInput(), flowService)); } else { - return Optional.empty(); + return Optional.of(new FlowContext(null, flowService)); } } @@ -62,10 +66,16 @@ default FlowContext getFlow() { public static class FlowContext { public final Object input; public final Object context; + private final FlowService flowService; - public FlowContext(Object input) { + public FlowContext(Object input, FlowService flowService) { this.input = input; this.context = input; + this.flowService = flowService; + } + + public FlowExecutionContext executeFlow(String flowKey, Object input) { + return flowService.runFlowInternal(flowKey, input); } } } diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutor.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutor.java index 8d0b28d4..1630f6c1 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutor.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutor.java @@ -36,16 +36,16 @@ public FlowExecutionContext execute(Flow flow, Object input) { String lastActionKey = null; while(currentStep != null) { - if (log.isTraceEnabled()) { - log.trace("Execute step: {} with input: {}", currentStep.getKey(), input); + if (log.isDebugEnabled()) { + log.debug("Execute step: {} with input: {}", currentStep.getKey(), input); } lastActionKey = currentStep instanceof Action ? currentStep.getKey() : lastActionKey; context.getStepInput().put(currentStep.getKey(), input); Object result = executeStep(input, currentStep, context); context.getStepOutput().put(currentStep.getKey(), result); - if (log.isTraceEnabled()) { - log.trace("Step: {} executed with result: {}", currentStep.getKey(), result); + if (log.isDebugEnabled()) { + log.debug("Step: {} executed with result: {}", currentStep.getKey(), result); } input = result; diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/FlowExecuteResource.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/FlowExecuteResource.java new file mode 100644 index 00000000..bc6831f9 --- /dev/null +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/FlowExecuteResource.java @@ -0,0 +1,58 @@ +package com.icthh.xm.commons.flow.rest; + +import com.icthh.xm.commons.flow.service.FlowService; +import com.icthh.xm.commons.permission.annotation.PrivilegeDescription; +import lombok.RequiredArgsConstructor; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestHeader; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Map; + +@RestController +@RequestMapping("/api/flow") +@RequiredArgsConstructor +public class FlowExecuteResource { + + private final FlowService flowService; + + @GetMapping("/{flowKey}/execute") + @PreAuthorize("hasPermission({'flowKey': #flowKey, 'queryParams': #queryParams}, 'FLOW.EXECUTE.GET')") + @PrivilegeDescription("Privilege to execute the flow by key using get method") + public Object executeFlowGet(@PathVariable("flowKey") String flowKey, @RequestParam Map queryParams) { + return executeFlow(flowKey, queryParams); + } + + @PutMapping("/{flowKey}/execute") + @PreAuthorize("hasPermission({'flowKey': #flowKey, 'body': #body, 'queryParams': #queryParams}, 'FLOW.EXECUTE.PUT')") + @PrivilegeDescription("Privilege to execute the flow by key using put method") + public Object executeFlowPut(@PathVariable String flowKey, @RequestBody Map body) { + return executeFlow(flowKey, body); + } + + @PostMapping(path = "/{flowKey}/execute", consumes = "application/json") + @PreAuthorize("hasPermission({'flowKey': #flowKey, 'body': #body, 'queryParams': #queryParams}, 'FLOW.EXECUTE.POST_JSON')") + @PrivilegeDescription("Privilege to execute the flow by key using post json method") + public Object executeFlowPostJson(@PathVariable String flowKey, @RequestBody Map body) { + return executeFlow(flowKey, body); + } + + @PostMapping(path = "/{flowKey}/execute", consumes = "application/x-www-form-urlencoded") + @PreAuthorize("hasPermission({'flowKey': #flowKey, 'formData': #formData}, 'FLOW.EXECUTE.POST_FORM')") + @PrivilegeDescription("Privilege to execute the flow by key using post url encoded method") + public Object executeFlowPostUrlEncoded(@PathVariable String flowKey, @RequestParam Map formData) { + return executeFlow(flowKey, formData); + } + + private Object executeFlow(String flowKey, Object body) { + return flowService.runFlow(flowKey, body); + } + +} diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/FlowResource.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/FlowResource.java index 39bd3d39..b31c4c68 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/FlowResource.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/FlowResource.java @@ -2,7 +2,10 @@ import com.icthh.xm.commons.flow.domain.Flow; import com.icthh.xm.commons.flow.service.FlowService; +import com.icthh.xm.commons.permission.annotation.PrivilegeDescription; import lombok.RequiredArgsConstructor; +import org.springframework.security.access.prepost.PostAuthorize; +import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; @@ -22,26 +25,36 @@ public class FlowResource { private final FlowService flowService; @GetMapping("/{flowKey}") + @PostAuthorize("hasPermission({'returnObject': returnObject.body}, 'FLOW.GET_ITEM')") + @PrivilegeDescription("Privilege to get the flow by key") public Flow getFlow(@PathVariable("flowKey") String flowKey) { return flowService.getFlow(flowKey); } @GetMapping() + @PostAuthorize("hasPermission({'returnObject': returnObject.body}, 'FLOW.GET_LIST')") + @PrivilegeDescription("Privilege to get all flows") public List getFlows() { return flowService.getFlows(); } @PostMapping() + @PreAuthorize("hasPermission({'flow': #flow}, 'FLOW.CREATE')") + @PrivilegeDescription("Privilege to create a new flow") public void createFlow(@RequestBody Flow flow) { flowService.createFlow(flow); } @PutMapping() + @PreAuthorize("hasPermission({'flow': #flow}, 'FLOW.UPDATE')") + @PrivilegeDescription("Privilege to update the flow") public void updateFlow(@RequestBody Flow flow) { flowService.updateFlow(flow); } @DeleteMapping("/{flowKey}") + @PreAuthorize("hasPermission({'flowKey': #flowKey}, 'FLOW.DELETE')") + @PrivilegeDescription("Privilege to delete the flow") public void deleteFlow(@PathVariable("flowKey") String flowKey) { flowService.deleteFlow(flowKey); } diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/FlowSpecResource.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/FlowSpecResource.java index 236f4062..277385bf 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/FlowSpecResource.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/FlowSpecResource.java @@ -6,7 +6,9 @@ import com.icthh.xm.commons.flow.spec.step.StepSpecService; import com.icthh.xm.commons.flow.spec.trigger.TriggerType; import com.icthh.xm.commons.flow.spec.trigger.TriggerTypeSpecService; +import com.icthh.xm.commons.permission.annotation.PrivilegeDescription; import lombok.RequiredArgsConstructor; +import org.springframework.security.access.prepost.PostAuthorize; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -24,16 +26,22 @@ public class FlowSpecResource { private final TriggerTypeSpecService triggerSpecService; @GetMapping("/steps") + @PostAuthorize("hasPermission({'returnObject': returnObject.body}, 'FLOW.STEP_SPEC.GET_LIST')") + @PrivilegeDescription("Privilege to get all step specs") public List getSteps(@RequestParam(name = "stepType", required = false) StepSpec.StepType stepType) { return stepSpecService.getSteps(stepType); } @GetMapping("/resource-types") + @PostAuthorize("hasPermission({'returnObject': returnObject.body}, 'FLOW.RESOURCE_TYPE.GET_LIST')") + @PrivilegeDescription("Privilege to get all resource types") public List getResourceTypes() { return resourceTypeService.resourceTypes(); } @GetMapping("/trigger-types") + @PostAuthorize("hasPermission({'returnObject': returnObject.body}, 'FLOW.TRIGGER_TYPE.GET_LIST')") + @PrivilegeDescription("Privilege to get all trigger types") public List getTriggerTypes() { return triggerSpecService.triggerTypes(); } diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/TenantResourceResource.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/TenantResourceResource.java index 4346848f..f67f5341 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/TenantResourceResource.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/rest/TenantResourceResource.java @@ -2,7 +2,10 @@ import com.icthh.xm.commons.flow.domain.TenantResource; import com.icthh.xm.commons.flow.service.TenantResourceService; +import com.icthh.xm.commons.permission.annotation.PrivilegeDescription; import lombok.RequiredArgsConstructor; +import org.springframework.security.access.prepost.PostAuthorize; +import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; @@ -24,26 +27,36 @@ public class TenantResourceResource { private final TenantResourceService resourceService; @GetMapping("/{resourceKey}") + @PostAuthorize("hasPermission({'returnObject': returnObject.body}, 'FLOW.RESOURCE.GET_ITEM')") + @PrivilegeDescription("Privilege to get the resource by resourceKey") public TenantResource getResource(@PathVariable("resourceKey") String resourceKey) { return resourceService.getResource(resourceKey); } @GetMapping() + @PostAuthorize("hasPermission({'returnObject': returnObject.body}, 'FLOW.RESOURCE.GET_LIST')") + @PrivilegeDescription("Privilege to get all resources") public List getResources(@RequestParam(name = "resourceType", required = false) String resourceType) { return resourceService.getResources(resourceType); } @PostMapping() + @PreAuthorize("hasPermission({'resource': #resource}, 'FLOW.RESOURCE.CREATE')") + @PrivilegeDescription("Privilege to create a new resource") public void createResource(@RequestBody TenantResource resource) { resourceService.createResource(resource); } @PutMapping() + @PreAuthorize("hasPermission({'resource': #resource}, 'FLOW.RESOURCE.UPDATE')") + @PrivilegeDescription("Privilege to update the resource") public void updateResource(@RequestBody TenantResource resource) { resourceService.updateResource(resource); } @DeleteMapping("/{resourceKey}") + @PreAuthorize("hasPermission({'resourceKey': #resourceKey}, 'FLOW.RESOURCE.DELETE')") + @PrivilegeDescription("Privilege to delete the resource") public void deleteResource(@PathVariable("resourceKey") String resourceKey) { resourceService.deleteResource(resourceKey); } diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/FlowService.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/FlowService.java index 23966e6b..8d3e8377 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/FlowService.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/FlowService.java @@ -3,8 +3,12 @@ import com.icthh.xm.commons.config.client.repository.TenantConfigRepository; import com.icthh.xm.commons.config.domain.Configuration; import com.icthh.xm.commons.exceptions.BusinessException; +import com.icthh.xm.commons.exceptions.EntityNotFoundException; import com.icthh.xm.commons.flow.domain.Flow; +import com.icthh.xm.commons.flow.engine.FlowExecutor; +import com.icthh.xm.commons.flow.engine.context.FlowExecutionContext; import com.icthh.xm.commons.flow.service.FlowConfigService.FlowsConfig; +import com.icthh.xm.commons.flow.service.resolver.FlowKeyLepKeyResolver; import com.icthh.xm.commons.flow.service.resolver.FlowTypeLepKeyResolver; import com.icthh.xm.commons.flow.service.trigger.TriggerProcessor; import com.icthh.xm.commons.lep.LogicExtensionPoint; @@ -17,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static java.util.stream.Collectors.toList; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -32,6 +37,7 @@ public class FlowService { private final TenantConfigRepository tenantConfigRepository; private final CodeSnippetService codeSnippetService; private final TriggerProcessor triggerProcessor; + private final FlowExecutor flowExecutor; private FlowService self; @Autowired @@ -138,4 +144,34 @@ private void assertStartStepExists(Flow flow) { } } + @LogicExtensionPoint(value = "RunFlow", resolver = FlowKeyLepKeyResolver.class) + public Object runFlow(String flowKey, Object input) { + return self.runFlowInternal(flowKey, input).getOutput(); + } + + @LogicExtensionPoint("RunFlow") + public FlowExecutionContext runFlowInternal(String flowKey, Object input) { + Flow flow = getRequiredFlow(flowKey); + FlowExecutionContext executionContext = self.executeFlow(flow, input); + if (log.isDebugEnabled()) { + log.debug("Flow execution context: {}", executionContext); + } + return executionContext; + } + + @LogicExtensionPoint(value = "ExecuteFlow", resolver = FlowTypeLepKeyResolver.class) + public FlowExecutionContext executeFlow(Flow flow, Object input) { + return self.executeFlowInternal(flow, input); + } + + @LogicExtensionPoint(value = "ExecuteFlow") + public FlowExecutionContext executeFlowInternal(Flow flow, Object input) { + return flowExecutor.execute(flow, input); + } + + private Flow getRequiredFlow(String flowKey) { + return Optional.ofNullable(flowConfigService.getFlow(flowKey)) + .orElseThrow(() -> new EntityNotFoundException("Flow with key " + flowKey + " not found")); + } + } diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/resolver/FlowKeyLepKeyResolver.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/resolver/FlowKeyLepKeyResolver.java new file mode 100644 index 00000000..f6c0b285 --- /dev/null +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/resolver/FlowKeyLepKeyResolver.java @@ -0,0 +1,15 @@ +package com.icthh.xm.commons.flow.service.resolver; + +import com.icthh.xm.lep.api.LepKeyResolver; +import com.icthh.xm.lep.api.LepMethod; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Component +public class FlowKeyLepKeyResolver implements LepKeyResolver { + @Override + public List segments(LepMethod method) { + return List.of(method.getParameter("flowKey", String.class)); + } +} diff --git a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/AbstractFlowIntTest.java b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/AbstractFlowIntTest.java index 550fca32..d8dc68ee 100644 --- a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/AbstractFlowIntTest.java +++ b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/AbstractFlowIntTest.java @@ -16,6 +16,7 @@ import com.icthh.xm.commons.flow.service.TenantResourceConfigService; import com.icthh.xm.commons.flow.service.TenantResourceService; import com.icthh.xm.commons.flow.service.YamlConverter; +import com.icthh.xm.commons.flow.service.resolver.FlowKeyLepKeyResolver; import com.icthh.xm.commons.flow.service.resolver.FlowTypeLepKeyResolver; import com.icthh.xm.commons.flow.service.resolver.SnippetListLepKeyResolver; import com.icthh.xm.commons.flow.service.resolver.StepKeyResolver; @@ -49,6 +50,7 @@ import org.springframework.web.servlet.config.annotation.EnableWebMvc; import java.io.InputStream; +import java.util.ArrayList; import java.util.List; import static java.nio.charset.StandardCharsets.UTF_8; @@ -81,6 +83,7 @@ TriggerProcessor.class, FlowTypeLepKeyResolver.class, TriggerResolver.class, + FlowKeyLepKeyResolver.class, TriggerTypeSpecService.class, YamlConverter.class, FlowConfigService.class, @@ -108,19 +111,28 @@ public abstract class AbstractFlowIntTest { @Autowired List configurations; + List updateConfigs; + @Before public void setup() { this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).build(); TenantContextUtils.setTenant(tenantContextHolder, "TEST"); lepManagementService.beginThreadContext(); + updateConfigs = new ArrayList<>(); } @After public void after() { + updateConfigs.forEach(c -> updateConfig(c, null)); lepManagementService.endThreadContext(); } public void updateConfiguration(String path, String content) { + updateConfigs.add(path); + updateConfig(path, content); + } + + private void updateConfig(String path, String content) { configurations.stream().filter(c -> c.isListeningConfiguration(path)).forEach(c -> c.onRefresh(path, content)); } diff --git a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java index 625139a1..ceede15c 100644 --- a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java +++ b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java @@ -12,7 +12,6 @@ import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; -import java.io.Serializable; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -34,27 +33,10 @@ public class FlowExecuteIntTest extends AbstractFlowIntTest { @Test @SneakyThrows public void testSimpleExecution() { - String simpleFlow = loadFile("test-flow-execute/simple-flow.yml"); - mockSendConfigToRefresh(); - updateConfiguration("/config/tenants/TEST/testApp/lep/flow/step/Action$$sum.groovy", - "return lepContext.flow.input.b + lepContext.step.parameters.a"); - updateConfiguration("/config/tenants/TEST/testApp/lep/flow/step/Condition$$groovyCondition.groovy", - "return lepContext.step.runSnippet('groovyCondition', lepContext)"); - updateConfiguration("/config/tenants/commons/lep/flow/conditions/NotIsZeroCondition.groovy", - loadFile("test-flow-execute/NotIsZeroCondition.groovy")); - updateConfiguration("/config/tenants/commons/lep/flow/actions/DivideAction.groovy", - loadFile("test-flow-execute/DivideAction.groovy")); - - updateConfiguration("/config/tenants/TEST/testApp/lep/flow/step/Action$$groovyAction.groovy", - "return lepContext.step.runSnippet('groovyAction', lepContext)"); - updateConfiguration("/config/tenants/TEST/testApp/lep/flow/step/Action$$minus.groovy", - "return (int) (lepContext.step.parameters.a - lepContext.flow.input.b)"); - - updateConfiguration("/config/tenants/TEST/testApp/flow/step-spec/steps.yml", loadFile("test-flow-execute/steps.yml")); + String simpleFlow = initFlow(); FlowsConfig flows = new ObjectMapper(new YAMLFactory()).readValue(simpleFlow, FlowsConfig.class); Flow flow = flows.getFlows().get(0); - flowService.createFlow(flow); FlowExecutionContext executionContext = flowExecutor.execute(flow, Map.of("b", -1)); assertEquals(0, executionContext.getOutput()); @@ -94,14 +76,69 @@ public void testSimpleExecution() { ), executionContext ); + + flowService.deleteFlow(flow.getKey()); + } + + @Test + public void testRunFlowFromLep() { + initFlow(); + + updateConfiguration("/config/tenants/TEST/testApp/lep/test/Test.groovy", "lepContext.flow.executeFlow('simple-flow', [b: 5])"); + + FlowExecutionContext result = (FlowExecutionContext) testLepService.test(); + assertEquals(2, result.getOutput()); + assertEquals( + mockContext( + "simple-flow", + Map.of("b", 5), + 2, + Map.of("step1", Map.of("b", 5), "step2", 6, "step5", true), + Map.of("step1", 6, "step2", true, "step5", 2) + ), + result + ); + + flowService.deleteFlow("simple-flow"); + } + + @SneakyThrows + private String initFlow() { + String simpleFlow = loadFile("test-flow-execute/simple-flow.yml"); + mockSendConfigToRefresh(); + // test action lep + updateConfiguration("/config/tenants/TEST/testApp/lep/flow/step/Action$$sum.groovy", + "return lepContext.flow.input.b + lepContext.step.parameters.a"); + // test condition lep and run snippet + updateConfiguration("/config/tenants/TEST/testApp/lep/flow/step/Condition$$groovyCondition.groovy", + "return lepContext.step.runSnippet('groovyCondition', lepContext)"); + // test class condition and get previous step output + updateConfiguration("/config/tenants/commons/lep/flow/conditions/NotIsZeroCondition.groovy", + loadFile("test-flow-execute/NotIsZeroCondition.groovy")); + // test class action and get previous step output + updateConfiguration("/config/tenants/commons/lep/flow/actions/DivideAction.groovy", + loadFile("test-flow-execute/DivideAction.groovy")); + // test run snippet + updateConfiguration("/config/tenants/TEST/testApp/lep/flow/step/Action$$groovyAction.groovy", + "return lepContext.step.runSnippet('groovyAction', lepContext)"); + // test get from step patams and flow input + updateConfiguration("/config/tenants/TEST/testApp/lep/flow/step/Action$$minus.groovy", + "return (int) (lepContext.step.parameters.a - lepContext.flow.input.b)"); + + updateConfiguration("/config/tenants/TEST/testApp/flow/step-spec/steps.yml", loadFile("test-flow-execute/steps.yml")); + + FlowsConfig flows = new ObjectMapper(new YAMLFactory()).readValue(simpleFlow, FlowsConfig.class); + Flow flow = flows.getFlows().get(0); + flowService.createFlow(flow); + return simpleFlow; } private FlowExecutionContext mockContext( String key, - Map input, - Object output, - Map stepInput, - Map stepOutput + Map input, + Object output, + Map stepInput, + Map stepOutput ) { var context = new FlowExecutionContext(key); context.setInput(input); diff --git a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowResourceIntTest.java b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowResourceIntTest.java index 9111bbab..f3b06fac 100644 --- a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowResourceIntTest.java +++ b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowResourceIntTest.java @@ -102,15 +102,15 @@ public void testFlowCrud() { assertEquals("foofoofoo", value.get(0).getContent()); assertEquals("/config/tenants/TEST/test/somespec.yml", value.get(1).getPath()); assertEquals("blablabla", value.get(1).getContent()); - assertEquals("/config/tenants/TEST/testApp/flow/snippets/Snippet$$my-flow$$step2$$mapping.js", value.get(2).getPath()); + assertEquals("/config/tenants/TEST/testApp/lep/flow/snippets/Snippet$$my-flow$$step2$$mapping.js", value.get(2).getPath()); assertEquals("return context.get('orders').map(order => { return { id: order.id, name: order.name }; })", value.get(2).getContent()); - assertEquals("/config/tenants/TEST/testApp/flow/snippets/Snippet$$my-flow$$step2$$precheck.js", value.get(3).getPath()); + assertEquals("/config/tenants/TEST/testApp/lep/flow/snippets/Snippet$$my-flow$$step2$$precheck.js", value.get(3).getPath()); assertEquals("if (context.get('orders').length > 0) { return true; } else { return false; }", value.get(3).getContent()); - assertEquals("/config/tenants/TEST/testApp/flow/snippets/Snippet$$my-flow$$step3$$mapping.js", value.get(4).getPath()); + assertEquals("/config/tenants/TEST/testApp/lep/flow/snippets/Snippet$$my-flow$$step3$$mapping.js", value.get(4).getPath()); assertEquals("return context.get('users').map(user => { return { id: user.id, name: user.name }; })", value.get(4).getContent()); - assertEquals("/config/tenants/TEST/testApp/flow/snippets/Snippet$$my-flow$$step3$$postcheck.js", value.get(5).getPath()); + assertEquals("/config/tenants/TEST/testApp/lep/flow/snippets/Snippet$$my-flow$$step3$$postcheck.js", value.get(5).getPath()); assertEquals("if (context.get('votes').length > 0) { return true; } else { return false; }", value.get(5).getContent()); - assertEquals("/config/tenants/TEST/testApp/flow/snippets/Snippet$$my-flow$$step3$$precheck.js", value.get(6).getPath()); + assertEquals("/config/tenants/TEST/testApp/lep/flow/snippets/Snippet$$my-flow$$step3$$precheck.js", value.get(6).getPath()); assertEquals("if (context.get('users').length > 0) { return true; } else { return false; }", value.get(6).getContent()); ResultActions flowGet = mockMvc.perform(get("/api/flow/my-flow")) From fc80602a10c5589458c49707dda2a42e6d713c63 Mon Sep 17 00:00:00 2001 From: ssenko Date: Sat, 13 Jul 2024 22:24:04 +0300 Subject: [PATCH 07/13] Implement sync engine --- .../src/main/java/com/icthh/xm/lep/api/MethodSignature.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/xm-commons-lep/src/main/java/com/icthh/xm/lep/api/MethodSignature.java b/xm-commons-lep/src/main/java/com/icthh/xm/lep/api/MethodSignature.java index c26a7296..1ba325a3 100644 --- a/xm-commons-lep/src/main/java/com/icthh/xm/lep/api/MethodSignature.java +++ b/xm-commons-lep/src/main/java/com/icthh/xm/lep/api/MethodSignature.java @@ -1,8 +1,6 @@ package com.icthh.xm.lep.api; -import java.lang.annotation.Annotation; import java.lang.reflect.Method; -import java.lang.reflect.Parameter; import java.util.List; /** From d0ac3ef7c5dc3bf68e2e7b3a4b8b4c4df14587a8 Mon Sep 17 00:00:00 2001 From: ssenko Date: Sat, 13 Jul 2024 22:26:29 +0300 Subject: [PATCH 08/13] clean up --- .../icthh/xm/commons/lep/impl/LogicExtensionPointHandler.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/impl/LogicExtensionPointHandler.java b/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/impl/LogicExtensionPointHandler.java index 0deab2f6..af50d121 100644 --- a/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/impl/LogicExtensionPointHandler.java +++ b/xm-commons-lep/src/main/java/com/icthh/xm/commons/lep/impl/LogicExtensionPointHandler.java @@ -7,7 +7,6 @@ import com.icthh.xm.commons.lep.api.LepExecutor; import com.icthh.xm.commons.lep.api.LepKey; import com.icthh.xm.commons.lep.api.LepManagementService; -import com.icthh.xm.commons.lep.api.UseAsLepContext; import com.icthh.xm.commons.lep.spring.LepContextService; import com.icthh.xm.commons.lep.spring.LepService; import com.icthh.xm.lep.api.LepInvocationCauseException; @@ -19,7 +18,6 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.lang.reflect.Parameter; import java.util.HashMap; import java.util.List; import java.util.Map; From f989a9f07725cb0e6af3711df574774e667bb7f7 Mon Sep 17 00:00:00 2001 From: ssenko Date: Sun, 14 Jul 2024 17:36:29 +0300 Subject: [PATCH 09/13] Add snc flow --- .../context/FlowLepAdditionalContext.java | 1 - .../context/StepLepAdditionalContext.java | 11 ++ .../xm/commons/flow/engine/FlowExecutor.java | 83 ----------- .../flow/engine/FlowExecutorService.java | 140 ++++++++++++++++++ .../engine/context/FlowExecutionContext.java | 15 +- .../xm/commons/flow/service/FlowService.java | 4 +- .../flow/rest/AbstractFlowIntTest.java | 4 +- .../commons/flow/rest/FlowExecuteIntTest.java | 51 ++++++- .../test-flow-execute/count-words-flow.yml | 36 +++++ 9 files changed, 251 insertions(+), 94 deletions(-) delete mode 100644 xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutor.java create mode 100644 xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutorService.java create mode 100644 xm-commons-flow/src/test/resources/test-flow-execute/count-words-flow.yml diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/FlowLepAdditionalContext.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/FlowLepAdditionalContext.java index 836f6a33..37b8e9c2 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/FlowLepAdditionalContext.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/FlowLepAdditionalContext.java @@ -1,6 +1,5 @@ package com.icthh.xm.commons.flow.context; -import com.icthh.xm.commons.flow.engine.FlowExecutor; import com.icthh.xm.commons.flow.engine.context.FlowExecutionContext; import com.icthh.xm.commons.flow.context.FlowLepAdditionalContext.FlowContext; import com.icthh.xm.commons.flow.service.FlowService; diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/StepLepAdditionalContext.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/StepLepAdditionalContext.java index 863ef412..1a41a7c4 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/StepLepAdditionalContext.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/context/StepLepAdditionalContext.java @@ -72,6 +72,11 @@ public static class StepContext { public final Object context; public final Map parameters; + public final Integer iteration; + public final Object iterationItem; + public final List iterationsInput; + public final List iterationsOutput; + private final String flowKey; private final String stepKey; private final CodeSnippetExecutor snippetExecutor; @@ -80,6 +85,12 @@ public StepContext(FlowExecutionContext context, Step step, CodeSnippetExecutor var input = context.getStepInput().get(step.getKey()); this.input = input; this.context = input; + + this.iteration = context.getIteration(); + this.iterationItem = context.getIterationItem(); + this.iterationsInput = context.getIterationsInput(); + this.iterationsOutput = context.getIterationsOutput(); + this.flowKey = context.getFlowKey(); this.stepKey = step.getKey(); this.parameters = Map.copyOf(nullSafeMap(step.getParameters())); diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutor.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutor.java deleted file mode 100644 index 1630f6c1..00000000 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutor.java +++ /dev/null @@ -1,83 +0,0 @@ -package com.icthh.xm.commons.flow.engine; - -import com.icthh.xm.commons.flow.domain.Action; -import com.icthh.xm.commons.flow.domain.Condition; -import com.icthh.xm.commons.flow.domain.Flow; -import com.icthh.xm.commons.flow.domain.Step; -import com.icthh.xm.commons.flow.engine.context.FlowExecutionContext; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Service; - -import java.util.Map; - -import static java.util.function.Function.identity; -import static java.util.stream.Collectors.toMap; -import static org.apache.commons.lang3.StringUtils.isNotBlank; - -@Slf4j -@Service -@RequiredArgsConstructor -public class FlowExecutor { - - private final StepExecutorService stepExecutorService; - - public FlowExecutionContext execute(Flow flow, Object input) { - - FlowExecutionContext context = new FlowExecutionContext(flow.getKey()); - - try { - // to step map - Map steps = flow.getSteps().stream().collect(toMap(Step::getKey, identity())); - Step firstStep = steps.get(flow.getStartStep()); - context.setInput(input); - - Step currentStep = firstStep; - String lastActionKey = null; - - while(currentStep != null) { - if (log.isDebugEnabled()) { - log.debug("Execute step: {} with input: {}", currentStep.getKey(), input); - } - lastActionKey = currentStep instanceof Action ? currentStep.getKey() : lastActionKey; - - context.getStepInput().put(currentStep.getKey(), input); - Object result = executeStep(input, currentStep, context); - context.getStepOutput().put(currentStep.getKey(), result); - if (log.isDebugEnabled()) { - log.debug("Step: {} executed with result: {}", currentStep.getKey(), result); - } - - input = result; - - String nextStep = currentStep.getNext(result); - if (nextStep == null) { - context.setOutput(context.getStepOutput().get(lastActionKey)); - return context; - } - currentStep = steps.get(nextStep); - if (currentStep == null && isNotBlank(nextStep)) { - log.error("Step for key: {} not found", nextStep); - } - } - - context.setOutput(context.getStepOutput().get(lastActionKey)); - return context; - } catch (Throwable e) { - log.error("Error execute flow with error {} | executionContext: {}", flow.getKey(), context, e); - throw e; - } - } - - private Object executeStep(Object input, Step currentStep, FlowExecutionContext context) { - // TODO refactor this to some pattern that follow open close principle - if (currentStep instanceof Action) { - return stepExecutorService.executeAction(input, (Action) currentStep, context); - } else if (currentStep instanceof Condition) { - return stepExecutorService.executeCondition(input, (Condition) currentStep, context); - } else { - throw new IllegalArgumentException("Unsupported step type: " + currentStep.getClass()); - } - } - -} diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutorService.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutorService.java new file mode 100644 index 00000000..c05cafa0 --- /dev/null +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/FlowExecutorService.java @@ -0,0 +1,140 @@ +package com.icthh.xm.commons.flow.engine; + +import com.google.common.collect.Lists; +import com.icthh.xm.commons.flow.domain.Action; +import com.icthh.xm.commons.flow.domain.Condition; +import com.icthh.xm.commons.flow.domain.Flow; +import com.icthh.xm.commons.flow.domain.Step; +import com.icthh.xm.commons.flow.engine.context.FlowExecutionContext; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.JsonPathException; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.time.StopWatch; +import org.springframework.stereotype.Service; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static java.lang.Boolean.TRUE; +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.IntStream.range; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +@Slf4j +@Service +@RequiredArgsConstructor +public class FlowExecutorService { + + private final StepExecutorService stepExecutorService; + + public FlowExecutionContext execute(Flow flow, Object input) { + FlowExecutionContext context = new FlowExecutionContext(flow.getKey(), input); + Map steps = flow.getSteps().stream().collect(toMap(Step::getKey, identity())); + String lastActionKey = null; + try { + + Step currentStep = steps.get(flow.getStartStep()); + while(currentStep != null) { + lastActionKey = currentStep instanceof Action ? currentStep.getKey() : lastActionKey; + input = runStep(currentStep, input, context); + currentStep = getNextStep(currentStep, input, steps); + } + + context.setOutput(context.getStepOutput().get(lastActionKey)); + return context; + } catch (Throwable e) { + log.error("Error execute flow with error {} | executionContext: {}", flow.getKey(), context, e); + throw e; + } + } + + private Object runStep(Step currentStep, Object input, FlowExecutionContext context) { + StopWatch stopWatch = StopWatch.createStarted(); + log.debug("Execute step: {} with input: {}", currentStep.getKey(), input); + + context.getStepInput().put(currentStep.getKey(), input); + Object result = executeStep(currentStep, input, context); + context.getStepOutput().put(currentStep.getKey(), result); + + log.debug("Step: {} executed with result: {}, {}ms", currentStep.getKey(), result, stopWatch.getTime(MICROSECONDS)); + return result; + } + + private Object executeStep(Step currentStep, Object input, FlowExecutionContext context) { + // TODO refactor this to some pattern that follow open close principle + if (currentStep instanceof Action) { + Action action = (Action) currentStep; + if (TRUE.equals(action.getIsIterable())) { + return executeIterableAction(action, input, context); + } + return stepExecutorService.executeAction(input, action, context); + } else if (currentStep instanceof Condition) { + return stepExecutorService.executeCondition(input, (Condition) currentStep, context); + } else { + throw new IllegalArgumentException("Unsupported step type: " + currentStep.getClass()); + } + } + + @SneakyThrows + private Object executeIterableAction(Action action, Object input, FlowExecutionContext context) { + List items = readActionArray(action, input); + context.resetIteration(); + context.setIterationsInput(items); + for (int i = 0; i < items.size(); i++) { + Object iterationItem = items.get(i); + context.setIteration(i); + context.setIterationItem(iterationItem); + + Object result = stepExecutorService.executeAction(iterationItem, action, context); + context.getIterationsOutput().add(result); + } + + List output = context.getIterationsOutput(); + if (TRUE.equals(action.getRemoveNullOutputForIterableResult())) { + output = output.stream().filter(Objects::nonNull).collect(toList()); + } + + context.resetIteration(); + return output; + } + + private static List readActionArray(Action action, Object input) { + try { + Object value = JsonPath.read(input, action.getIterableJsonPath()); + if (value instanceof Number) { + Number iterations = (Number) value; + return range(0, iterations.intValue()).boxed().collect(toList()); + } else if (value instanceof List) { + return (List) value; + } else if (value.getClass().isArray()) { + return Arrays.asList((Object[]) value); + } else { + Iterable iterable = (Iterable) value; + return Lists.newArrayList(iterable); + } + } catch (JsonPathException e) { + log.error("Error read from action {} by json path {}", action, action.getIterableJsonPath(), e); + if (TRUE.equals(action.getSkipIterableJsonPathError())) { + return List.of(); + } + throw e; + } + } + + private Step getNextStep(Step currentStep, Object result, Map steps) { + String nextStep = currentStep.getNext(result); + currentStep = nextStep != null ? steps.get(nextStep): null; + if (currentStep == null && isNotBlank(nextStep)) { + log.error("Step for key: {} not found", nextStep); + } + return currentStep; + } + +} diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/context/FlowExecutionContext.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/context/FlowExecutionContext.java index 9a620157..eacf48e3 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/context/FlowExecutionContext.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/engine/context/FlowExecutionContext.java @@ -2,16 +2,29 @@ import lombok.Data; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; @Data public class FlowExecutionContext { private final String flowKey; - private Object input; + private final Object input; private Object output; private final Map stepInput = new HashMap<>(); private final Map stepOutput = new HashMap<>(); + private Integer iteration; + private Object iterationItem; + private List iterationsInput; + private List iterationsOutput; + + public void resetIteration() { + iteration = null; + iterationItem = null; + iterationsInput = new ArrayList<>(); + iterationsOutput = new ArrayList<>(); + } } diff --git a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/FlowService.java b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/FlowService.java index 8d3e8377..613ca0c5 100644 --- a/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/FlowService.java +++ b/xm-commons-flow/src/main/java/com/icthh/xm/commons/flow/service/FlowService.java @@ -5,7 +5,7 @@ import com.icthh.xm.commons.exceptions.BusinessException; import com.icthh.xm.commons.exceptions.EntityNotFoundException; import com.icthh.xm.commons.flow.domain.Flow; -import com.icthh.xm.commons.flow.engine.FlowExecutor; +import com.icthh.xm.commons.flow.engine.FlowExecutorService; import com.icthh.xm.commons.flow.engine.context.FlowExecutionContext; import com.icthh.xm.commons.flow.service.FlowConfigService.FlowsConfig; import com.icthh.xm.commons.flow.service.resolver.FlowKeyLepKeyResolver; @@ -37,7 +37,7 @@ public class FlowService { private final TenantConfigRepository tenantConfigRepository; private final CodeSnippetService codeSnippetService; private final TriggerProcessor triggerProcessor; - private final FlowExecutor flowExecutor; + private final FlowExecutorService flowExecutor; private FlowService self; @Autowired diff --git a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/AbstractFlowIntTest.java b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/AbstractFlowIntTest.java index d8dc68ee..c5e50d55 100644 --- a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/AbstractFlowIntTest.java +++ b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/AbstractFlowIntTest.java @@ -7,7 +7,7 @@ import com.icthh.xm.commons.flow.context.StepLepAdditionalContext; import com.icthh.xm.commons.flow.context.StepsLepAdditionalContext; import com.icthh.xm.commons.flow.context.TenantResourceLepAdditionalContext; -import com.icthh.xm.commons.flow.engine.FlowExecutor; +import com.icthh.xm.commons.flow.engine.FlowExecutorService; import com.icthh.xm.commons.flow.engine.StepExecutorService; import com.icthh.xm.commons.flow.service.CodeSnippetExecutor; import com.icthh.xm.commons.flow.service.CodeSnippetService; @@ -87,7 +87,7 @@ TriggerTypeSpecService.class, YamlConverter.class, FlowConfigService.class, - FlowExecutor.class, + FlowExecutorService.class, StepKeyResolver.class, StepExecutorService.class, StepLepAdditionalContext.class, diff --git a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java index ceede15c..800b2fe3 100644 --- a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java +++ b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java @@ -3,7 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.icthh.xm.commons.flow.domain.Flow; -import com.icthh.xm.commons.flow.engine.FlowExecutor; +import com.icthh.xm.commons.flow.engine.FlowExecutorService; import com.icthh.xm.commons.flow.engine.context.FlowExecutionContext; import com.icthh.xm.commons.flow.service.FlowConfigService; import com.icthh.xm.commons.flow.service.FlowConfigService.FlowsConfig; @@ -12,6 +12,7 @@ import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; +import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -28,7 +29,7 @@ public class FlowExecuteIntTest extends AbstractFlowIntTest { FlowService flowService; @Autowired - FlowExecutor flowExecutor; + FlowExecutorService flowExecutor; @Test @SneakyThrows @@ -102,6 +103,40 @@ public void testRunFlowFromLep() { flowService.deleteFlow("simple-flow"); } + @Test + @SneakyThrows + public void testRunIterableTaskLep() { + String countWordsFlow = loadFile("test-flow-execute/count-words-flow.yml"); + mockSendConfigToRefresh(); + + updateConfiguration("/config/tenants/TEST/testApp/lep/flow/step/Action$$groovyAction.groovy", + "return lepContext.step.runSnippet('groovyAction', lepContext)"); + updateConfiguration("/config/tenants/TEST/testApp/flow/step-spec/steps.yml", loadFile("test-flow-execute/steps.yml")); + + FlowsConfig flows = new ObjectMapper(new YAMLFactory()).readValue(countWordsFlow, FlowsConfig.class); + Flow flow = flows.getFlows().get(0); + flowService.createFlow(flow); + + String input = "Yes, some text words to count"; + FlowExecutionContext result = flowExecutor.execute(flow, input); + assertEquals( + remap(mockContext( + "count-long-words-flow", + input, + 18, + Map.of("sum_chars", List.of(4, 4, 5, 5), "split_words", input, "words_to_length", Map.of( + "a", Map.of("b", new String[]{"Yes", "some", "text", "words", "to", "count"}) + )), + Map.of("words_to_length", List.of(4, 4, 5, 5), "sum_chars", 18, "split_words", Map.of( + "a", Map.of("b", new String[]{"Yes", "some", "text", "words", "to", "count"}) + )) + )), + remap(result) + ); + + flowService.deleteFlow("count-words-flow"); + } + @SneakyThrows private String initFlow() { String simpleFlow = loadFile("test-flow-execute/simple-flow.yml"); @@ -135,13 +170,13 @@ private String initFlow() { private FlowExecutionContext mockContext( String key, - Map input, + Object input, Object output, Map stepInput, Map stepOutput ) { - var context = new FlowExecutionContext(key); - context.setInput(input); + var context = new FlowExecutionContext(key, input); + context.resetIteration(); context.setOutput(output); for (Map.Entry entry : stepInput.entrySet()) { context.getStepInput().put(entry.getKey(), entry.getValue()); @@ -152,4 +187,10 @@ private FlowExecutionContext mockContext( return context; } + @SneakyThrows + public Map remap(Object obj) { + ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.readValue(objectMapper.writeValueAsString(obj), Map.class); + } + } diff --git a/xm-commons-flow/src/test/resources/test-flow-execute/count-words-flow.yml b/xm-commons-flow/src/test/resources/test-flow-execute/count-words-flow.yml new file mode 100644 index 00000000..74b88c8f --- /dev/null +++ b/xm-commons-flow/src/test/resources/test-flow-execute/count-words-flow.yml @@ -0,0 +1,36 @@ +--- +flows: +- key: "count-long-words-flow" + startStep: "split_words" + steps: + - key: "split_words" + typeKey: "groovyAction" + type: "ACTION" + snippets: + groovyAction: + content: "[a:[b:lepContext.flow.input.split('\\\\W+')]]" + extension: "groovy" + next: "words_to_length" + - key: "words_to_length" + typeKey: "groovyAction" + type: "ACTION" + isIterable: true + iterableJsonPath: "$.a.b" + removeNullOutputForIterableResult: true + snippets: + groovyAction: + content: "lepContext.step.iterationItem.length() >= 4 ? lepContext.step.iterationItem.length() : null" + extension: "groovy" + next: "sum_chars" + - key: "sum_chars" + typeKey: "groovyAction" + type: "ACTION" + snippets: + groovyAction: + content: "lepContext.step.input.sum()" + extension: "groovy" + trigger: + typeKey: "httpkey" + parameters: + method: "GET" + url: "/api/orders" From 98883d629b7f4692ffc6c5343c6dc9b5b48b88b3 Mon Sep 17 00:00:00 2001 From: sergeysenja1992 Date: Sun, 14 Jul 2024 17:37:24 +0300 Subject: [PATCH 10/13] Update version.gradle --- gradle/version.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/version.gradle b/gradle/version.gradle index 7c81dd6f..184feb57 100644 --- a/gradle/version.gradle +++ b/gradle/version.gradle @@ -1,6 +1,6 @@ import java.util.regex.Pattern -version = "2.3.23.24-SNAPSHOT" +version = "2.3.23.25-SNAPSHOT" logger.lifecycle("Project version: $version") String detectSemVersion() { From 6fcbfa7ad7934dddba471ce5c3b96c3f7acfb0b0 Mon Sep 17 00:00:00 2001 From: ssenko Date: Sun, 14 Jul 2024 17:59:59 +0300 Subject: [PATCH 11/13] Fix tests --- .../commons/flow/rest/FlowExecuteIntTest.java | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java index 800b2fe3..b8c02923 100644 --- a/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java +++ b/xm-commons-flow/src/test/java/com/icthh/xm/commons/flow/rest/FlowExecuteIntTest.java @@ -119,18 +119,20 @@ public void testRunIterableTaskLep() { String input = "Yes, some text words to count"; FlowExecutionContext result = flowExecutor.execute(flow, input); - assertEquals( - remap(mockContext( - "count-long-words-flow", - input, - 18, - Map.of("sum_chars", List.of(4, 4, 5, 5), "split_words", input, "words_to_length", Map.of( - "a", Map.of("b", new String[]{"Yes", "some", "text", "words", "to", "count"}) - )), - Map.of("words_to_length", List.of(4, 4, 5, 5), "sum_chars", 18, "split_words", Map.of( - "a", Map.of("b", new String[]{"Yes", "some", "text", "words", "to", "count"}) - )) + FlowExecutionContext mockedContext = mockContext( + "count-long-words-flow", + input, + 18, + Map.of("sum_chars", List.of(4, 4, 5, 5), "split_words", input, "words_to_length", Map.of( + "a", Map.of("b", new String[]{"Yes", "some", "text", "words", "to", "count"}) )), + Map.of("words_to_length", List.of(4, 4, 5, 5), "sum_chars", 18, "split_words", Map.of( + "a", Map.of("b", new String[]{"Yes", "some", "text", "words", "to", "count"}) + )) + ); + mockedContext.resetIteration(); + assertEquals( + remap(mockedContext), remap(result) ); @@ -176,7 +178,6 @@ private FlowExecutionContext mockContext( Map stepOutput ) { var context = new FlowExecutionContext(key, input); - context.resetIteration(); context.setOutput(output); for (Map.Entry entry : stepInput.entrySet()) { context.getStepInput().put(entry.getKey(), entry.getValue()); From 750c8fd9d254c994980d56b41e2fe9284e5a90cc Mon Sep 17 00:00:00 2001 From: ssenko Date: Mon, 15 Jul 2024 21:16:58 +0300 Subject: [PATCH 12/13] remove local build --- build.gradle | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/build.gradle b/build.gradle index fbadaa0b..97c47ce9 100644 --- a/build.gradle +++ b/build.gradle @@ -291,16 +291,6 @@ subprojects { } } - apply plugin: 'maven-publish' - - publishing { - publications { - maven(MavenPublication) { - from components.java - } - } - } - } boolean isRelease(Project project) { From e631a6854d9f6a28d7f37ab0dafeb4b2d2ae41ce Mon Sep 17 00:00:00 2001 From: ssenko Date: Mon, 15 Jul 2024 21:30:36 +0300 Subject: [PATCH 13/13] sync --- gradle/version.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle/version.gradle b/gradle/version.gradle index bc85718f..6d30d5dc 100644 --- a/gradle/version.gradle +++ b/gradle/version.gradle @@ -1,6 +1,6 @@ import java.util.regex.Pattern -version = "2.3.24-SNAPSHOT" +version = "2.3.24" logger.lifecycle("Project version: $version") String detectSemVersion() {