From 637ed84cf128a63904f4ed8ab3603ba930361ac8 Mon Sep 17 00:00:00 2001 From: Dmitrii Tikhomirov Date: Thu, 2 Oct 2025 11:55:27 -0700 Subject: [PATCH 1/3] parallel exec output should be merged into one WorkflowModel Signed-off-by: Dmitrii Tikhomirov --- .../impl/expressions/agentic/AgenticModelFactory.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java index bed5dc9f..1cfca922 100644 --- a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java +++ b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java @@ -60,10 +60,7 @@ public WorkflowModel fromAny(WorkflowModel prev, Object obj) { @Override public WorkflowModel combine(Map workflowVariables) { - // TODO: create a new agenticScope object in the AgenticScopeRegistryAssessor per branch - // TODO: Since we share the same agenticScope object, both branches are updating the same - // instance, so for now we return the first key. - return workflowVariables.values().iterator().next(); + return newAgenticModel(workflowVariables); } @Override From 46e663257c7918f73d7ae95d15cbe387a5735206 Mon Sep 17 00:00:00 2001 From: Dmitrii Tikhomirov Date: Thu, 2 Oct 2025 14:29:23 -0700 Subject: [PATCH 2/3] better combined processsing Signed-off-by: Dmitrii Tikhomirov --- .../agentic/AgenticModelFactory.java | 7 ++++- .../fluent/agentic/WorkflowTests.java | 28 ++++++------------- 2 files changed, 15 insertions(+), 20 deletions(-) diff --git a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java index 1cfca922..10476aa3 100644 --- a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java +++ b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java @@ -24,6 +24,7 @@ import io.serverlessworkflow.impl.expressions.agentic.langchain4j.AgenticScopeRegistryAssessor; import java.time.OffsetDateTime; import java.util.Map; +import java.util.stream.Collectors; class AgenticModelFactory implements WorkflowModelFactory { @@ -60,7 +61,11 @@ public WorkflowModel fromAny(WorkflowModel prev, Object obj) { @Override public WorkflowModel combine(Map workflowVariables) { - return newAgenticModel(workflowVariables); + Map combinedState = + workflowVariables.entrySet().stream() + .map(e -> Map.entry(e.getKey(), e.getValue().asJavaObject())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + return newAgenticModel(combinedState); } @Override diff --git a/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java b/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java index cfbaf545..a05ff842 100644 --- a/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java +++ b/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java @@ -175,17 +175,12 @@ public void testParallel() throws ExecutionException, InterruptedException { topic.put("style", "sci-fi"); try (WorkflowApplication app = WorkflowApplication.builder().build()) { - AgenticScope result = - app.workflowDefinition(workflow) - .instance(topic) - .start() - .get() - .as(AgenticScope.class) - .orElseThrow(); + Map result = + app.workflowDefinition(workflow).instance(topic).start().get().asMap().orElseThrow(); - assertEquals("Fake conflict response", result.readState("setting")); - assertEquals("Fake hero response", result.readState("hero")); - assertEquals("Fake setting response", result.readState("conflict")); + assertEquals("Fake conflict response", result.get("setting").toString()); + assertEquals("Fake hero response", result.get("hero").toString()); + assertEquals("Fake setting response", result.get("conflict").toString()); } } @@ -224,16 +219,11 @@ public void testSeqAndThenParallel() throws ExecutionException, InterruptedExcep topic.put("fact", "alien"); try (WorkflowApplication app = WorkflowApplication.builder().build()) { - AgenticScope result = - app.workflowDefinition(workflow) - .instance(topic) - .start() - .get() - .as(AgenticScope.class) - .orElseThrow(); + Map result = + app.workflowDefinition(workflow).instance(topic).start().get().asMap().orElseThrow(); - assertEquals(cultureTraits, result.readState("culture")); - assertEquals(technologyTraits, result.readState("technology")); + assertEquals(cultureTraits, result.get("culture")); + assertEquals(technologyTraits, result.get("technology")); } } From ff59e5dac7d638d9406d42db0ef730e885c374b9 Mon Sep 17 00:00:00 2001 From: Dmitrii Tikhomirov Date: Thu, 2 Oct 2025 14:54:15 -0700 Subject: [PATCH 3/3] result can be AgenticScope and Map Signed-off-by: Dmitrii Tikhomirov --- .../fluent/agentic/WorkflowTests.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java b/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java index a05ff842..3d411888 100644 --- a/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java +++ b/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java @@ -182,6 +182,20 @@ public void testParallel() throws ExecutionException, InterruptedException { assertEquals("Fake hero response", result.get("hero").toString()); assertEquals("Fake setting response", result.get("conflict").toString()); } + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + AgenticScope result = + app.workflowDefinition(workflow) + .instance(topic) + .start() + .get() + .as(AgenticScope.class) + .orElseThrow(); + + assertEquals("Fake conflict response", result.readState("setting").toString()); + assertEquals("Fake hero response", result.readState("hero").toString()); + assertEquals("Fake setting response", result.readState("conflict").toString()); + } } @Test @@ -225,6 +239,19 @@ public void testSeqAndThenParallel() throws ExecutionException, InterruptedExcep assertEquals(cultureTraits, result.get("culture")); assertEquals(technologyTraits, result.get("technology")); } + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + AgenticScope result = + app.workflowDefinition(workflow) + .instance(topic) + .start() + .get() + .as(AgenticScope.class) + .orElseThrow(); + + assertEquals(cultureTraits, result.readState("culture")); + assertEquals(technologyTraits, result.readState("technology")); + } } @Test