Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ public SELF with(Consumer<P> c) {
}

public SELF correlate(String key, Consumer<ListenTaskBuilder.CorrelatePropertyBuilder> c) {
throw new UnsupportedOperationException(
"correlate is not supported in the engine level: https://github.com/serverlessworkflow/sdk-java/issues/1206");
ListenTaskBuilder.CorrelatePropertyBuilder cb =
new ListenTaskBuilder.CorrelatePropertyBuilder();
c.accept(cb);
correlate.setAdditionalProperty(key, cb.build());
return self();
}

public EventFilter build() {
Expand Down
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UseError builder also belongs to a different Pr, why wont you file first the PR with the DSL changes (exception correlation) and then, once it is merge, you file the correlation, which deserve separate discussion?

Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,17 @@
*/
package io.serverlessworkflow.fluent.spec;

import io.serverlessworkflow.api.types.Error;
import io.serverlessworkflow.api.types.ErrorDetails;
import io.serverlessworkflow.api.types.ErrorTitle;
import io.serverlessworkflow.api.types.ErrorType;
import io.serverlessworkflow.api.types.UriTemplate;
import io.serverlessworkflow.api.types.Use;
import io.serverlessworkflow.api.types.UseAuthentications;
import io.serverlessworkflow.api.types.UseErrors;
import io.serverlessworkflow.api.types.UseRetries;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.function.Consumer;

Expand Down Expand Up @@ -47,9 +56,99 @@ public UseBuilder authentications(Consumer<UseAuthenticationsBuilder> authentica
return this;
}

// TODO: implement the remaining `use` attributes
public UseBuilder errors(Consumer<UseErrorsBuilder> errorsConsumer) {
final UseErrorsBuilder builder = new UseErrorsBuilder();
errorsConsumer.accept(builder);
this.use.setErrors(builder.build());
return this;
}

public UseBuilder retries(Consumer<UseRetriesBuilder> retriesConsumer) {
final UseRetriesBuilder builder = new UseRetriesBuilder();
retriesConsumer.accept(builder);
this.use.setRetries(builder.build());
return this;
}

public Use build() {
return use;
}

public static final class UseErrorsBuilder {
private final UseErrors useErrors;

UseErrorsBuilder() {
this.useErrors = new UseErrors();
}

public UseErrorsBuilder error(String name, Consumer<UseErrorBuilder> errorConsumer) {
final UseErrorBuilder builder = new UseErrorBuilder();
errorConsumer.accept(builder);
this.useErrors.setAdditionalProperty(name, builder.build());
return this;
}

public UseErrors build() {
return useErrors;
}
}

public static final class UseErrorBuilder {
private final Error error;

UseErrorBuilder() {
this.error = new Error();
}

public UseErrorBuilder type(String expression) {
ErrorType errorType = new ErrorType();
try {
errorType.withLiteralErrorType(new UriTemplate().withLiteralUri(new URI(expression)));
} catch (URISyntaxException ex) {
errorType.withExpressionErrorType(expression);
}
this.error.setType(errorType);
return this;
}

public UseErrorBuilder status(int status) {
this.error.setStatus(status);
return this;
}

public UseErrorBuilder title(String expression) {
this.error.setTitle(new ErrorTitle().withExpressionErrorTitle(expression));
return this;
}

public UseErrorBuilder detail(String expression) {
this.error.setDetail(new ErrorDetails().withExpressionErrorDetails(expression));
return this;
}

public Error build() {
return error;
}
}

public static final class UseRetriesBuilder {
private final UseRetries useRetries;

UseRetriesBuilder() {
this.useRetries = new UseRetries();
}

public UseRetriesBuilder retry(
String name, Consumer<BaseTryTaskBuilder.RetryPolicyBuilder> retryConsumer) {
final BaseTryTaskBuilder.RetryPolicyBuilder builder =
new BaseTryTaskBuilder.RetryPolicyBuilder();
retryConsumer.accept(builder);
this.useRetries.setAdditionalProperty(name, builder.build());
return this;
}

public UseRetries build() {
return useRetries;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.serverlessworkflow.fluent.spec.AbstractEventFilterBuilder;
import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder;
import io.serverlessworkflow.fluent.spec.AbstractListenTaskBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
Expand All @@ -41,13 +42,11 @@ protected List<Consumer<EVENT_FILTER>> getFilterSteps() {
return filterSteps;
}

// TODO: "correlate is not supported in the engine level:
// https://github.com/serverlessworkflow/sdk-java/issues/1206". Keeping the code for a future
// reference.
// public SELF correlate(String key, Consumer<ListenTaskBuilder.CorrelatePropertyBuilder> c) {
// filterSteps.add(f -> f.correlate(key, c));
// return self();
// }
public SELF correlate(
String key, Consumer<AbstractListenTaskBuilder.CorrelatePropertyBuilder> c) {
addFilterStep(f -> f.correlate(key, c));
return self();
}

@Override
public void accept(EVENT_FILTER filterBuilder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.serverlessworkflow.fluent.spec.TaskItemListBuilder;
import io.serverlessworkflow.fluent.spec.TimeoutBuilder;
import io.serverlessworkflow.fluent.spec.TryTaskBuilder;
import io.serverlessworkflow.fluent.spec.UseBuilder;
import io.serverlessworkflow.fluent.spec.configurers.AuthenticationConfigurer;
import io.serverlessworkflow.fluent.spec.configurers.CallGrpcConfigurer;
import io.serverlessworkflow.fluent.spec.configurers.CallHttpConfigurer;
Expand Down Expand Up @@ -1227,4 +1228,14 @@ public static Consumer<ScheduleBuilder> on(
Consumer<EventFilterBuilder>... filters) {
return b -> ((AbstractEventConsumptionStrategyBuilder) b).any(filters);
}

public static Consumer<UseBuilder.UseErrorsBuilder> errors(
Consumer<UseBuilder.UseErrorsBuilder> errorsConsumer) {
return errorsConsumer;
}

public static Consumer<UseBuilder.UseRetriesBuilder> retries(
Consumer<UseBuilder.UseRetriesBuilder> retriesConsumer) {
return retriesConsumer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.serverlessworkflow.api.types.AuthenticationPolicyUnion;
import io.serverlessworkflow.api.types.CallHTTP;
import io.serverlessworkflow.api.types.CatchErrors;
import io.serverlessworkflow.api.types.CorrelateProperty;
import io.serverlessworkflow.api.types.Document;
import io.serverlessworkflow.api.types.EmitEventDefinition;
import io.serverlessworkflow.api.types.EmitTask;
Expand Down Expand Up @@ -310,8 +311,12 @@ void testDoTaskListenOne() {
to ->
to.one(
f ->
f.with(
p -> p.type("com.fake.pet").source("mySource"))))))
f.with(p -> p.type("com.fake.pet").source("mySource"))
.correlate(
"orderId",
c ->
c.from("$.data.orderId")
.expect("$.input.orderId"))))))
.build();

List<TaskItem> items = wf.getDo();
Expand All @@ -327,6 +332,10 @@ void testDoTaskListenOne() {
EventFilter filter = one.getOne();
assertNotNull(filter, "EventFilter should be present");
assertEquals("com.fake.pet", filter.getWith().getType(), "Filter type should match");
CorrelateProperty correlate = filter.getCorrelate().getAdditionalProperties().get("orderId");
assertNotNull(correlate, "Correlate property should be present");
assertEquals("$.data.orderId", correlate.getFrom(), "Correlate from should match");
assertEquals("$.input.orderId", correlate.getExpect(), "Correlate expect should match");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static io.serverlessworkflow.fluent.spec.dsl.DSL.workflow;
import static org.assertj.core.api.Assertions.assertThat;

import io.serverlessworkflow.api.types.CorrelateProperty;
import io.serverlessworkflow.api.types.HTTPArguments;
import io.serverlessworkflow.api.types.ListenTaskConfiguration;
import io.serverlessworkflow.api.types.RunTaskConfiguration;
Expand Down Expand Up @@ -166,7 +167,15 @@ public void when_listen_any_with_until() {
public void when_listen_one() {
Workflow wf =
WorkflowBuilder.workflow("f", "ns", "1")
.tasks(t -> t.listen(to().one(event().type("only-once"))))
.tasks(
t ->
t.listen(
to().one(
event()
.type("only-once")
.correlate(
"workflowInstanceId",
c -> c.from("$.metadata.instanceId")))))
.build();

var to = wf.getDo().get(0).getTask().getListenTask().getListen().getTo();
Expand All @@ -178,6 +187,10 @@ public void when_listen_one() {
var one = to.getOneEventConsumptionStrategy().getOne();
assertThat(one.getWith()).isNotNull();
assertThat(one.getWith().getType()).isEqualTo("only-once");
CorrelateProperty correlate =
one.getCorrelate().getAdditionalProperties().get("workflowInstanceId");
assertThat(correlate).isNotNull();
assertThat(correlate.getFrom()).isEqualTo("$.metadata.instanceId");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
package io.serverlessworkflow.impl.events;

import io.cloudevents.CloudEvent;
import io.serverlessworkflow.api.types.CorrelateProperty;
import io.serverlessworkflow.api.types.EventFilter;
import io.serverlessworkflow.api.types.EventFilterCorrelate;
import io.serverlessworkflow.api.types.EventProperties;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowContext;
import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -52,8 +55,31 @@ public TypeEventRegistrationBuilder listen(
EventFilter register, WorkflowApplication application) {
EventProperties properties = register.getWith();
String type = properties.getType();
return new TypeEventRegistrationBuilder(
type, application.cloudEventPredicateFactory().build(application, properties));
logger.info(
"listen: type={}, correlate={}, with.additionalProperties={}",
type,
register.getCorrelate(),
properties.getAdditionalProperties());
CloudEventPredicate cePredicate =
application.cloudEventPredicateFactory().build(application, properties);
Collection<CorrelationPredicate> correlationPredicates =
buildCorrelationPredicates(register.getCorrelate(), application);
return correlationPredicates.isEmpty()
? new TypeEventRegistrationBuilder(type, cePredicate)
: new TypeEventRegistrationBuilder(type, cePredicate, correlationPredicates);
}

private Collection<CorrelationPredicate> buildCorrelationPredicates(
EventFilterCorrelate correlate, WorkflowApplication application) {
if (correlate == null || correlate.getAdditionalProperties().isEmpty()) {
return List.of();
}
Collection<CorrelationPredicate> predicates = new ArrayList<>();
for (Map.Entry<String, CorrelateProperty> entry :
correlate.getAdditionalProperties().entrySet()) {
predicates.add(CorrelationPredicate.from(entry.getKey(), entry.getValue(), application));
}
return predicates;
}

@Override
Expand All @@ -67,12 +93,35 @@ private static class CloudEventConsumer extends AbstractCollection<TypeEventRegi

@Override
public void accept(CloudEvent ce) {
logger.debug("Received cloud event {}", ce);
for (TypeEventRegistration registration : registrations) {
if (registration.predicate().test(ce, registration.workflow(), registration.task())) {
registration.consumer().accept(ce);
try {
boolean predResult =
registration.predicate().test(ce, registration.workflow(), registration.task());
logger.info(
"Predicate result for '{}': {}, correlation={}",
registration.type(),
predResult,
registration.correlationPredicates().size());
if (predResult && testCorrelation(ce, registration)) {
registration.consumer().accept(ce);
}
} catch (Exception e) {
logger.error("Error processing event for registration type='{}'", registration.type(), e);
}
Comment on lines +108 to +110
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty suspicious pece of code, why did you add it?

}
}

private boolean testCorrelation(CloudEvent ce, TypeEventRegistration registration) {
Collection<CorrelationPredicate> predicates = registration.correlationPredicates();
if (predicates.isEmpty()) {
return true;
}
for (CorrelationPredicate pred : predicates) {
if (!pred.test(ce, registration.workflow(), registration.task())) {
return false;
}
}
return true;
}

@Override
Expand Down Expand Up @@ -107,7 +156,13 @@ public TypeEventRegistration register(
return new TypeEventRegistration(null, ce, null, workflow, task);
} else {
TypeEventRegistration registration =
new TypeEventRegistration(builder.type(), ce, builder.cePredicate(), workflow, task);
new TypeEventRegistration(
builder.type(),
ce,
builder.cePredicate(),
builder.correlationPredicates(),
workflow,
task);
registrations
.computeIfAbsent(
registration.type(),
Expand Down
Loading
Loading