From 77143c197e5ff5f7776b45b3044d3f8b4f787338 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matheus=20Andr=C3=A9?= Date: Tue, 19 May 2026 12:58:42 -0300 Subject: [PATCH] feat: Implement correlation on event filter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add CorrelationPredicate for evaluating correlation expressions - Add correlate support in AbstractEventFilterBuilder and AbstractEventFilterSpec - Update TypeEventRegistration and TypeEventRegistrationBuilder with correlation predicates - Implement correlation matching in AbstractTypeConsumer - Add CorrelationTest and listen-correlate.yaml - Add correlate tests in WorkflowBuilderTest and DSLTest Signed-off-by: Matheus André --- .../spec/AbstractEventFilterBuilder.java | 7 +- .../spec/dsl/AbstractEventFilterSpec.java | 13 +- .../fluent/spec/WorkflowBuilderTest.java | 13 +- .../fluent/spec/dsl/DSLTest.java | 15 +- .../impl/events/AbstractTypeConsumer.java | 50 +++++- .../impl/events/CorrelationPredicate.java | 95 ++++++++++ .../impl/events/TypeEventRegistration.java | 15 +- .../events/TypeEventRegistrationBuilder.java | 15 +- .../impl/test/CorrelationTest.java | 166 ++++++++++++++++++ .../workflows-samples/listen-correlate.yaml | 18 ++ 10 files changed, 387 insertions(+), 20 deletions(-) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/events/CorrelationPredicate.java create mode 100644 impl/test/src/test/java/io/serverlessworkflow/impl/test/CorrelationTest.java create mode 100644 impl/test/src/test/resources/workflows-samples/listen-correlate.yaml diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java index 80bfba154..051cd02bf 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java @@ -37,8 +37,11 @@ public SELF with(Consumer

c) { } public SELF correlate(String key, Consumer c) { - throw new UnsupportedOperationException( - "correlate is not supported in the engine level: https://github.com/serverlessworkflow/sdk-java/issues/1206"); + ListenTaskBuilder.CorrelatePropertyBuilder cb = + new ListenTaskBuilder.CorrelatePropertyBuilder(); + c.accept(cb); + correlate.setAdditionalProperty(key, cb.build()); + return self(); } public EventFilter build() { diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/AbstractEventFilterSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/AbstractEventFilterSpec.java index 29799c801..61ee5b814 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/AbstractEventFilterSpec.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/AbstractEventFilterSpec.java @@ -17,6 +17,7 @@ import io.serverlessworkflow.fluent.spec.AbstractEventFilterBuilder; import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; +import io.serverlessworkflow.fluent.spec.AbstractListenTaskBuilder; import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; @@ -41,13 +42,11 @@ protected List> getFilterSteps() { return filterSteps; } - // TODO: "correlate is not supported in the engine level: - // https://github.com/serverlessworkflow/sdk-java/issues/1206". Keeping the code for a future - // reference. - // public SELF correlate(String key, Consumer c) { - // filterSteps.add(f -> f.correlate(key, c)); - // return self(); - // } + public SELF correlate( + String key, Consumer c) { + addFilterStep(f -> f.correlate(key, c)); + return self(); + } @Override public void accept(EVENT_FILTER filterBuilder) { diff --git a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java index 32243f28d..c350392da 100644 --- a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java +++ b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/WorkflowBuilderTest.java @@ -37,6 +37,7 @@ import io.serverlessworkflow.api.types.AuthenticationPolicyUnion; import io.serverlessworkflow.api.types.CallHTTP; import io.serverlessworkflow.api.types.CatchErrors; +import io.serverlessworkflow.api.types.CorrelateProperty; import io.serverlessworkflow.api.types.Document; import io.serverlessworkflow.api.types.EmitEventDefinition; import io.serverlessworkflow.api.types.EmitTask; @@ -310,8 +311,12 @@ void testDoTaskListenOne() { to -> to.one( f -> - f.with( - p -> p.type("com.fake.pet").source("mySource")))))) + f.with(p -> p.type("com.fake.pet").source("mySource")) + .correlate( + "orderId", + c -> + c.from("$.data.orderId") + .expect("$.input.orderId")))))) .build(); List items = wf.getDo(); @@ -327,6 +332,10 @@ void testDoTaskListenOne() { EventFilter filter = one.getOne(); assertNotNull(filter, "EventFilter should be present"); assertEquals("com.fake.pet", filter.getWith().getType(), "Filter type should match"); + CorrelateProperty correlate = filter.getCorrelate().getAdditionalProperties().get("orderId"); + assertNotNull(correlate, "Correlate property should be present"); + assertEquals("$.data.orderId", correlate.getFrom(), "Correlate from should match"); + assertEquals("$.input.orderId", correlate.getExpect(), "Correlate expect should match"); } @Test diff --git a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java index eba88fe7f..b78ce6da6 100644 --- a/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java +++ b/fluent/spec/src/test/java/io/serverlessworkflow/fluent/spec/dsl/DSLTest.java @@ -30,6 +30,7 @@ import static io.serverlessworkflow.fluent.spec.dsl.DSL.workflow; import static org.assertj.core.api.Assertions.assertThat; +import io.serverlessworkflow.api.types.CorrelateProperty; import io.serverlessworkflow.api.types.HTTPArguments; import io.serverlessworkflow.api.types.ListenTaskConfiguration; import io.serverlessworkflow.api.types.RunTaskConfiguration; @@ -166,7 +167,15 @@ public void when_listen_any_with_until() { public void when_listen_one() { Workflow wf = WorkflowBuilder.workflow("f", "ns", "1") - .tasks(t -> t.listen(to().one(event().type("only-once")))) + .tasks( + t -> + t.listen( + to().one( + event() + .type("only-once") + .correlate( + "workflowInstanceId", + c -> c.from("$.metadata.instanceId"))))) .build(); var to = wf.getDo().get(0).getTask().getListenTask().getListen().getTo(); @@ -178,6 +187,10 @@ public void when_listen_one() { var one = to.getOneEventConsumptionStrategy().getOne(); assertThat(one.getWith()).isNotNull(); assertThat(one.getWith().getType()).isEqualTo("only-once"); + CorrelateProperty correlate = + one.getCorrelate().getAdditionalProperties().get("workflowInstanceId"); + assertThat(correlate).isNotNull(); + assertThat(correlate.getFrom()).isEqualTo("$.metadata.instanceId"); } @Test diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java index d7c6ac1eb..32330f8c8 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/AbstractTypeConsumer.java @@ -16,12 +16,15 @@ package io.serverlessworkflow.impl.events; import io.cloudevents.CloudEvent; +import io.serverlessworkflow.api.types.CorrelateProperty; import io.serverlessworkflow.api.types.EventFilter; +import io.serverlessworkflow.api.types.EventFilterCorrelate; import io.serverlessworkflow.api.types.EventProperties; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowContext; import java.util.AbstractCollection; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -52,8 +55,26 @@ public TypeEventRegistrationBuilder listen( EventFilter register, WorkflowApplication application) { EventProperties properties = register.getWith(); String type = properties.getType(); - return new TypeEventRegistrationBuilder( - type, application.cloudEventPredicateFactory().build(application, properties)); + CloudEventPredicate cePredicate = + application.cloudEventPredicateFactory().build(application, properties); + Collection correlationPredicates = + buildCorrelationPredicates(register.getCorrelate(), application); + return correlationPredicates.isEmpty() + ? new TypeEventRegistrationBuilder(type, cePredicate) + : new TypeEventRegistrationBuilder(type, cePredicate, correlationPredicates); + } + + private Collection buildCorrelationPredicates( + EventFilterCorrelate correlate, WorkflowApplication application) { + if (correlate == null || correlate.getAdditionalProperties().isEmpty()) { + return List.of(); + } + Collection predicates = new ArrayList<>(); + for (Map.Entry entry : + correlate.getAdditionalProperties().entrySet()) { + predicates.add(CorrelationPredicate.from(entry.getKey(), entry.getValue(), application)); + } + return predicates; } @Override @@ -67,14 +88,27 @@ private static class CloudEventConsumer extends AbstractCollection predicates = registration.correlationPredicates(); + if (predicates.isEmpty()) { + return true; + } + for (CorrelationPredicate pred : predicates) { + if (!pred.test(ce, registration.workflow(), registration.task())) { + return false; + } + } + return true; + } + @Override public boolean add(TypeEventRegistration registration) { return registrations.add(registration); @@ -107,7 +141,13 @@ public TypeEventRegistration register( return new TypeEventRegistration(null, ce, null, workflow, task); } else { TypeEventRegistration registration = - new TypeEventRegistration(builder.type(), ce, builder.cePredicate(), workflow, task); + new TypeEventRegistration( + builder.type(), + ce, + builder.cePredicate(), + builder.correlationPredicates(), + workflow, + task); registrations .computeIfAbsent( registration.type(), diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/CorrelationPredicate.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/CorrelationPredicate.java new file mode 100644 index 000000000..30486b556 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/CorrelationPredicate.java @@ -0,0 +1,95 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelFactory; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.expressions.ExpressionDescriptor; +import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CorrelationPredicate { + + private static final Logger logger = LoggerFactory.getLogger(CorrelationPredicate.class); + + private final String key; + private final WorkflowModelFactory modelFactory; + private final WorkflowValueResolver fromResolver; + private final WorkflowValueResolver expectResolver; + + private CorrelationPredicate( + String key, + WorkflowModelFactory modelFactory, + WorkflowValueResolver fromResolver, + WorkflowValueResolver expectResolver) { + this.key = key; + this.modelFactory = modelFactory; + this.fromResolver = fromResolver; + this.expectResolver = expectResolver; + } + + public static CorrelationPredicate from( + String key, io.serverlessworkflow.api.types.CorrelateProperty prop, WorkflowApplication app) { + WorkflowValueResolver fromResolver = + app.expressionFactory().resolveString(ExpressionDescriptor.from(prop.getFrom())); + WorkflowValueResolver expectResolver = + prop.getExpect() != null + ? app.expressionFactory().resolveString(ExpressionDescriptor.from(prop.getExpect())) + : null; + return new CorrelationPredicate(key, app.modelFactory(), fromResolver, expectResolver); + } + + public boolean test(CloudEvent event, WorkflowContext workflow, TaskContext task) { + String eventValue = extractFromEvent(event, workflow, task); + if (eventValue == null) { + logger.debug("Correlation key '{}': from expression returned null for event {}", key, event); + return false; + } + + if (expectResolver == null) { + logger.debug( + "Correlation key '{}': no expect expression, accepting event value '{}'", + key, + eventValue); + return true; + } + + String expectedValue = expectResolver.apply(workflow, task, task.input()); + boolean result = Objects.equals(eventValue, expectedValue); + logger.debug( + "Correlation key '{}': eventValue='{}', expectedValue='{}', match={}", + key, + eventValue, + expectedValue, + result); + return result; + } + + private String extractFromEvent(CloudEvent event, WorkflowContext workflow, TaskContext task) { + WorkflowModel eventModel = modelFactory.from(event); + return fromResolver.apply(workflow, task, eventModel); + } + + public String key() { + return key; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistration.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistration.java index 288fbbe7c..6c75a8d32 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistration.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistration.java @@ -18,12 +18,25 @@ import io.cloudevents.CloudEvent; import io.serverlessworkflow.impl.TaskContext; import io.serverlessworkflow.impl.WorkflowContext; +import java.util.Collection; +import java.util.Collections; import java.util.function.Consumer; public record TypeEventRegistration( String type, Consumer consumer, CloudEventPredicate predicate, + Collection correlationPredicates, WorkflowContext workflow, TaskContext task) - implements EventRegistration {} + implements EventRegistration { + + public TypeEventRegistration( + String type, + Consumer consumer, + CloudEventPredicate predicate, + WorkflowContext workflow, + TaskContext task) { + this(type, consumer, predicate, Collections.emptyList(), workflow, task); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistrationBuilder.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistrationBuilder.java index 39a2a6998..d3ab2f438 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistrationBuilder.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/TypeEventRegistrationBuilder.java @@ -15,5 +15,16 @@ */ package io.serverlessworkflow.impl.events; -public record TypeEventRegistrationBuilder(String type, CloudEventPredicate cePredicate) - implements EventRegistrationBuilder {} +import java.util.Collection; +import java.util.Collections; + +public record TypeEventRegistrationBuilder( + String type, + CloudEventPredicate cePredicate, + Collection correlationPredicates) + implements EventRegistrationBuilder { + + public TypeEventRegistrationBuilder(String type, CloudEventPredicate cePredicate) { + this(type, cePredicate, Collections.emptyList()); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/CorrelationTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/CorrelationTest.java new file mode 100644 index 000000000..62c2756bf --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/CorrelationTest.java @@ -0,0 +1,166 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; +import static io.serverlessworkflow.fluent.spec.dsl.DSL.*; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.jackson.JsonCloudEventData; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.spec.WorkflowBuilder; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.jackson.JsonUtils; +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +class CorrelationTest { + + private static WorkflowApplication appl; + private static int idCounter; + + @BeforeAll + static void init() { + appl = WorkflowApplication.builder().build(); + } + + @AfterAll + static void tearDown() { + appl.close(); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("correlateWorkflowSources") + void testCorrelateMatch(String sourceName, Workflow workflow) throws Exception { + WorkflowDefinition def = appl.workflowDefinition(workflow); + WorkflowInstance instance = def.instance(Map.of("patientId", "P123")); + CompletableFuture future = instance.start(); + + await() + .pollDelay(Duration.ofMillis(5)) + .atMost(Duration.ofMillis(500)) + .until(() -> instance.status() == WorkflowStatus.WAITING); + + appl.eventPublishers() + .forEach( + p -> + p.publish( + buildCloudEvent( + "com.example.hospital.patient.admitted", + Map.of("patientId", "P123", "name", "John")))); + + WorkflowModel result = future.get(2, TimeUnit.SECONDS); + List output = (List) JsonUtils.toJavaValue(JsonUtils.modelToJson(result)); + assertThat(output).hasSize(1); + Map eventData = (Map) output.get(0); + assertThat(eventData).containsEntry("patientId", "P123"); + assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("correlateWorkflowSources") + void testCorrelateNoMatch(String sourceName, Workflow workflow) throws Exception { + assertCorrelateNoMatch(workflow); + } + + @Test + void testCorrelateNoMatchDsl() throws Exception { + assertCorrelateNoMatch(listenCorrelateWorkflow()); + } + + private static Stream correlateWorkflowSources() throws IOException { + return Stream.of( + readWorkflowFromClasspath("workflows-samples/listen-correlate.yaml"), + listenCorrelateWorkflow()) + .map(wf -> Arguments.of(wf.getDocument().getName(), wf)); + } + + private static Workflow listenCorrelateWorkflow() { + return WorkflowBuilder.workflow("listen-correlate-java-dsl", "test", "0.1.0") + .input(i -> i.from("{ patientId: .patientId }")) + .tasks( + doTasks( + listen( + "waitForPatient", + l -> + l.to( + listenTo -> + listenTo.one( + filter -> + filter + .with( + props -> + props.type( + "com.example.hospital.patient.admitted")) + .correlate( + "patientId", + cp -> + cp.from(".data.patientId") + .expect(".patientId"))))))) + .build(); + } + + private void assertCorrelateNoMatch(Workflow workflow) throws Exception { + WorkflowDefinition def = appl.workflowDefinition(workflow); + WorkflowInstance instance = def.instance(Map.of("patientId", "P123")); + CompletableFuture future = instance.start(); + + await() + .pollDelay(Duration.ofMillis(5)) + .atMost(Duration.ofMillis(500)) + .until(() -> instance.status() == WorkflowStatus.WAITING); + + appl.eventPublishers() + .forEach( + p -> + p.publish( + buildCloudEvent( + "com.example.hospital.patient.admitted", + Map.of("patientId", "P456", "name", "Jane")))); + + assertThat(instance.status()).isEqualTo(WorkflowStatus.WAITING); + assertThat(future.isDone()).isFalse(); + instance.cancel(); + } + + private static CloudEvent buildCloudEvent(String type, Object data) { + return CloudEventBuilder.v1() + .withId(Integer.toString(++idCounter)) + .withType(type) + .withSource(URI.create("http://www.example.com")) + .withData(JsonCloudEventData.wrap(JsonUtils.fromValue(data))) + .build(); + } +} diff --git a/impl/test/src/test/resources/workflows-samples/listen-correlate.yaml b/impl/test/src/test/resources/workflows-samples/listen-correlate.yaml new file mode 100644 index 000000000..926816c46 --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/listen-correlate.yaml @@ -0,0 +1,18 @@ +document: + dsl: '1.0.0-alpha5' + namespace: test + name: listen-correlate + version: '0.1.0' +input: + from: '{ patientId: .patientId }' +do: + - waitForPatient: + listen: + to: + one: + with: + type: com.example.hospital.patient.admitted + correlate: + patientId: + from: .data.patientId + expect: .patientId