diff --git a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java index bed5dc9f..10476aa3 100644 --- a/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java +++ b/experimental/agentic/src/main/java/io/serverlessworkflow/impl/expressions/agentic/AgenticModelFactory.java @@ -24,6 +24,7 @@ import io.serverlessworkflow.impl.expressions.agentic.langchain4j.AgenticScopeRegistryAssessor; import java.time.OffsetDateTime; import java.util.Map; +import java.util.stream.Collectors; class AgenticModelFactory implements WorkflowModelFactory { @@ -60,10 +61,11 @@ public WorkflowModel fromAny(WorkflowModel prev, Object obj) { @Override public WorkflowModel combine(Map workflowVariables) { - // TODO: create a new agenticScope object in the AgenticScopeRegistryAssessor per branch - // TODO: Since we share the same agenticScope object, both branches are updating the same - // instance, so for now we return the first key. - return workflowVariables.values().iterator().next(); + Map combinedState = + workflowVariables.entrySet().stream() + .map(e -> Map.entry(e.getKey(), e.getValue().asJavaObject())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + return newAgenticModel(combinedState); } @Override diff --git a/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java b/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java index cfbaf545..3d411888 100644 --- a/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java +++ b/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/WorkflowTests.java @@ -174,6 +174,15 @@ public void testParallel() throws ExecutionException, InterruptedException { Map topic = new HashMap<>(); topic.put("style", "sci-fi"); + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Map result = + app.workflowDefinition(workflow).instance(topic).start().get().asMap().orElseThrow(); + + assertEquals("Fake conflict response", result.get("setting").toString()); + assertEquals("Fake hero response", result.get("hero").toString()); + assertEquals("Fake setting response", result.get("conflict").toString()); + } + try (WorkflowApplication app = WorkflowApplication.builder().build()) { AgenticScope result = app.workflowDefinition(workflow) @@ -183,9 +192,9 @@ public void testParallel() throws ExecutionException, InterruptedException { .as(AgenticScope.class) .orElseThrow(); - assertEquals("Fake conflict response", result.readState("setting")); - assertEquals("Fake hero response", result.readState("hero")); - assertEquals("Fake setting response", result.readState("conflict")); + assertEquals("Fake conflict response", result.readState("setting").toString()); + assertEquals("Fake hero response", result.readState("hero").toString()); + assertEquals("Fake setting response", result.readState("conflict").toString()); } } @@ -223,6 +232,14 @@ public void testSeqAndThenParallel() throws ExecutionException, InterruptedExcep Map topic = new HashMap<>(); topic.put("fact", "alien"); + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Map result = + app.workflowDefinition(workflow).instance(topic).start().get().asMap().orElseThrow(); + + assertEquals(cultureTraits, result.get("culture")); + assertEquals(technologyTraits, result.get("technology")); + } + try (WorkflowApplication app = WorkflowApplication.builder().build()) { AgenticScope result = app.workflowDefinition(workflow)