diff --git a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/configurer/ListenConfigurer.java b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/configurers/AgentListenConfigurer.java similarity index 84% rename from experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/configurer/ListenConfigurer.java rename to experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/configurers/AgentListenConfigurer.java index b8db664c..78df2921 100644 --- a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/configurer/ListenConfigurer.java +++ b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/configurers/AgentListenConfigurer.java @@ -13,10 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.fluent.agentic.configurer; +package io.serverlessworkflow.fluent.agentic.configurers; import io.serverlessworkflow.fluent.agentic.AgentListenTaskBuilder; import java.util.function.Consumer; @FunctionalInterface -public interface ListenConfigurer extends Consumer {} +public interface AgentListenConfigurer extends Consumer {} diff --git a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/configurer/AgentTaskConfigurer.java b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/configurers/AgentTaskConfigurer.java similarity index 93% rename from experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/configurer/AgentTaskConfigurer.java rename to experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/configurers/AgentTaskConfigurer.java index 9473eb1a..d0d2417e 100644 --- a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/configurer/AgentTaskConfigurer.java +++ b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/configurers/AgentTaskConfigurer.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.fluent.agentic.configurer; +package io.serverlessworkflow.fluent.agentic.configurers; import io.serverlessworkflow.fluent.agentic.AgentDoTaskBuilder; import java.util.function.Consumer; diff --git a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/dsl/AgentListenSpec.java b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/dsl/AgentListenSpec.java new file mode 100644 index 00000000..1f75daa3 --- /dev/null +++ b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/dsl/AgentListenSpec.java @@ -0,0 +1,40 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.agentic.dsl; + +import io.serverlessworkflow.fluent.agentic.AgentListenTaskBuilder; +import io.serverlessworkflow.fluent.agentic.configurers.AgentListenConfigurer; +import io.serverlessworkflow.fluent.func.dsl.BaseFuncListenSpec; +import io.serverlessworkflow.fluent.spec.AbstractListenTaskBuilder; + +public final class AgentListenSpec + extends BaseFuncListenSpec + implements AgentListenConfigurer { + + public AgentListenSpec() { + super(AbstractListenTaskBuilder::to); + } + + @Override + protected AgentListenSpec self() { + return this; + } + + @Override + public void accept(AgentListenTaskBuilder agentListenTaskBuilder) { + acceptInto(agentListenTaskBuilder); + } +} diff --git a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/dsl/AgenticDSL.java b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/dsl/AgenticDSL.java index ccf77f37..495b26a2 100644 --- a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/dsl/AgenticDSL.java +++ b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/dsl/AgenticDSL.java @@ -19,12 +19,15 @@ import io.cloudevents.CloudEventData; import io.serverlessworkflow.api.types.FlowDirectiveEnum; import io.serverlessworkflow.fluent.agentic.AgentDoTaskBuilder; -import io.serverlessworkflow.fluent.agentic.configurer.AgentTaskConfigurer; -import io.serverlessworkflow.fluent.agentic.configurer.FuncPredicateEventConfigurer; -import io.serverlessworkflow.fluent.agentic.configurer.SwitchCaseConfigurer; +import io.serverlessworkflow.fluent.agentic.configurers.AgentTaskConfigurer; import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder; import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder; import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder; +import io.serverlessworkflow.fluent.func.configurers.FuncPredicateEventConfigurer; +import io.serverlessworkflow.fluent.func.configurers.SwitchCaseConfigurer; +import io.serverlessworkflow.fluent.func.dsl.ReflectionUtils; +import io.serverlessworkflow.fluent.func.dsl.SwitchCaseSpec; +import io.serverlessworkflow.fluent.func.dsl.internal.CommonFuncOps; import java.util.List; import java.util.Objects; import java.util.function.Consumer; @@ -33,79 +36,75 @@ public final class AgenticDSL { + private static final CommonFuncOps OPS = new CommonFuncOps() {}; + private AgenticDSL() {} public static Consumer fn( Function function, Class argClass) { - return f -> f.function(function, argClass); + return OPS.fn(function, argClass); } public static Consumer fn(Function function) { - return f -> f.function(function); + return OPS.fn(function); } public static Consumer cases(SwitchCaseConfigurer... cases) { - return s -> { - for (SwitchCaseConfigurer c : cases) { - s.onPredicate(c); - } - }; + return OPS.cases(cases); } - public static SwitchCaseSpec on(Predicate when, Class whenClass) { - return new SwitchCaseSpec().when(when, whenClass); + public static SwitchCaseSpec caseOf(Predicate when, Class whenClass) { + return OPS.caseOf(when, whenClass); } - public static SwitchCaseSpec on(Predicate when) { - return new SwitchCaseSpec().when(when); + public static SwitchCaseSpec caseOf(Predicate when) { + return OPS.caseOf(when); } - public static SwitchCaseConfigurer onDefault(String task) { - return s -> s.then(task); + public static SwitchCaseConfigurer caseDefault(String task) { + return OPS.caseDefault(task); } - public static SwitchCaseConfigurer onDefault(FlowDirectiveEnum directive) { - return s -> s.then(directive); + public static SwitchCaseConfigurer caseDefault(FlowDirectiveEnum directive) { + return OPS.caseDefault(directive); } - public static ListenSpec to() { - return new ListenSpec(); + public static AgentListenSpec to() { + return new AgentListenSpec(); } - public static ListenSpec toOne(String type) { - return new ListenSpec().one(e -> e.type(type)); + public static AgentListenSpec toOne(String type) { + return new AgentListenSpec().one(e -> e.type(type)); } - public static ListenSpec toAll(String... types) { + public static AgentListenSpec toAll(String... types) { FuncPredicateEventConfigurer[] events = new FuncPredicateEventConfigurer[types.length]; for (int i = 0; i < types.length; i++) { events[i] = event(types[i]); } - return new ListenSpec().all(events); + return new AgentListenSpec().all(events); } - public static ListenSpec toAny(String... types) { + public static AgentListenSpec toAny(String... types) { FuncPredicateEventConfigurer[] events = new FuncPredicateEventConfigurer[types.length]; for (int i = 0; i < types.length; i++) { events[i] = event(types[i]); } - return new ListenSpec().any(events); + return new AgentListenSpec().any(events); } public static FuncPredicateEventConfigurer event(String type) { - return e -> e.type(type); + return OPS.event(type); } - // TODO: expand the `event` static ref with more attributes based on community feedback - public static Consumer event( String type, Function function) { - return event -> event.event(e -> e.type(type).data(function)); + return OPS.event(type, function); } public static Consumer event( String type, Function function, Class clazz) { - return event -> event.event(e -> e.type(type).data(function, clazz)); + return OPS.event(type, function, clazz); } // -------- Agentic Workflow Patterns -------- // @@ -148,7 +147,8 @@ public static AgentTaskConfigurer function(Function function, Class } public static AgentTaskConfigurer function(Function function) { - return list -> list.callFn(fn(function)); + Class clazz = ReflectionUtils.inferInputType(function); + return list -> list.callFn(fn(function, clazz)); } public static AgentTaskConfigurer agent(Object agent) { diff --git a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/dsl/ListenSpec.java b/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/dsl/ListenSpec.java deleted file mode 100644 index 7e890ae4..00000000 --- a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/dsl/ListenSpec.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.fluent.agentic.dsl; - -import io.serverlessworkflow.fluent.agentic.AgentListenTaskBuilder; -import io.serverlessworkflow.fluent.agentic.configurer.FuncPredicateEventConfigurer; -import io.serverlessworkflow.fluent.agentic.configurer.ListenConfigurer; -import io.serverlessworkflow.fluent.func.FuncEventFilterBuilder; -import io.serverlessworkflow.fluent.func.FuncListenToBuilder; -import java.util.Objects; -import java.util.function.Consumer; - -public class ListenSpec implements ListenConfigurer { - - private Consumer strategyStep; - private Consumer untilStep; - - @SuppressWarnings("unchecked") - private static Consumer[] asFilters( - FuncPredicateEventConfigurer[] events) { - Consumer[] filters = new Consumer[events.length]; - for (int i = 0; i < events.length; i++) { - FuncPredicateEventConfigurer ev = Objects.requireNonNull(events[i], "events[" + i + "]"); - filters[i] = f -> f.with(ev); - } - return filters; - } - - public final ListenSpec all(FuncPredicateEventConfigurer... events) { - strategyStep = t -> t.all(asFilters(events)); - return this; - } - - public ListenSpec one(FuncPredicateEventConfigurer e) { - strategyStep = t -> t.one(f -> f.with(e)); - return this; - } - - public final ListenSpec any(FuncPredicateEventConfigurer... events) { - strategyStep = t -> t.any(asFilters(events)); - return this; - } - - public ListenSpec until(String expression) { - untilStep = t -> t.until(expression); - return this; - } - - @Override - public void accept(AgentListenTaskBuilder agentListenTaskBuilder) { - agentListenTaskBuilder.to( - t -> { - strategyStep.accept(t); - if (untilStep != null) { - untilStep.accept(t); - } - }); - } -} diff --git a/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/EmailDrafterIT.java b/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/EmailDrafterIT.java index 35022e27..1e6069c4 100644 --- a/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/EmailDrafterIT.java +++ b/experimental/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/EmailDrafterIT.java @@ -18,8 +18,6 @@ import static io.serverlessworkflow.fluent.agentic.dsl.AgenticDSL.cases; import static io.serverlessworkflow.fluent.agentic.dsl.AgenticDSL.event; import static io.serverlessworkflow.fluent.agentic.dsl.AgenticDSL.fn; -import static io.serverlessworkflow.fluent.agentic.dsl.AgenticDSL.on; -import static io.serverlessworkflow.fluent.agentic.dsl.AgenticDSL.onDefault; import static io.serverlessworkflow.fluent.agentic.dsl.AgenticDSL.toAny; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.fail; @@ -30,6 +28,7 @@ import io.serverlessworkflow.api.types.EventFilter; import io.serverlessworkflow.api.types.EventProperties; import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.agentic.dsl.AgenticDSL; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.jackson.JsonUtils; @@ -68,11 +67,11 @@ void email_drafter_agent() { .switchCase( "needsHumanReview?", cases( - on( + AgenticDSL.caseOf( d -> !EmailPolicies.Decision.AUTO_SEND.equals(d.decision()), PolicyDecision.class) .then("requestReview"), - onDefault("emailFinished"))) + AgenticDSL.caseDefault("emailFinished"))) .emit( "requestReview", event( diff --git a/experimental/fluent/func/pom.xml b/experimental/fluent/func/pom.xml index 6f2ea803..9b652a18 100644 --- a/experimental/fluent/func/pom.xml +++ b/experimental/fluent/func/pom.xml @@ -21,6 +21,10 @@ io.serverlessworkflow serverlessworkflow-experimental-types + + io.serverlessworkflow + serverlessworkflow-impl-json + io.serverlessworkflow serverlessworkflow-fluent-spec diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallTaskBuilder.java index 30d874f0..56301a1e 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncCallTaskBuilder.java @@ -18,12 +18,12 @@ import io.serverlessworkflow.api.types.func.CallJava; import io.serverlessworkflow.api.types.func.CallTaskJava; import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; -import io.serverlessworkflow.fluent.func.spi.FuncTransformations; +import io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations; import io.serverlessworkflow.fluent.spec.TaskBaseBuilder; import java.util.function.Function; public class FuncCallTaskBuilder extends TaskBaseBuilder - implements FuncTransformations, + implements FuncTaskTransformations, ConditionalTaskBuilder { private CallTaskJava callTaskJava; diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java index 613f76a2..2b03dedf 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncDoTaskBuilder.java @@ -17,12 +17,12 @@ import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; import io.serverlessworkflow.fluent.func.spi.FuncDoFluent; -import io.serverlessworkflow.fluent.func.spi.FuncTransformations; +import io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations; import io.serverlessworkflow.fluent.spec.BaseDoTaskBuilder; import java.util.function.Consumer; public class FuncDoTaskBuilder extends BaseDoTaskBuilder - implements FuncTransformations, + implements FuncTaskTransformations, ConditionalTaskBuilder, FuncDoFluent { diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java index 2db03ae0..5f63d8d4 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java @@ -16,13 +16,13 @@ package io.serverlessworkflow.fluent.func; import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; -import io.serverlessworkflow.fluent.func.spi.FuncTransformations; +import io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations; import io.serverlessworkflow.fluent.spec.AbstractEmitTaskBuilder; public class FuncEmitTaskBuilder extends AbstractEmitTaskBuilder implements ConditionalTaskBuilder, - FuncTransformations { + FuncTaskTransformations { FuncEmitTaskBuilder() { super(); } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForTaskBuilder.java index d8c63e6f..98b9883c 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForTaskBuilder.java @@ -25,7 +25,7 @@ import io.serverlessworkflow.api.types.func.LoopPredicate; import io.serverlessworkflow.api.types.func.LoopPredicateIndex; import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; -import io.serverlessworkflow.fluent.func.spi.FuncTransformations; +import io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations; import io.serverlessworkflow.fluent.spec.TaskBaseBuilder; import io.serverlessworkflow.fluent.spec.spi.ForEachTaskFluent; import java.util.ArrayList; @@ -36,7 +36,7 @@ import java.util.function.Function; public class FuncForTaskBuilder extends TaskBaseBuilder - implements FuncTransformations, + implements FuncTaskTransformations, ConditionalTaskBuilder, ForEachTaskFluent { diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java index 372da744..8db4e136 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncForkTaskBuilder.java @@ -22,7 +22,7 @@ import io.serverlessworkflow.api.types.func.CallJava; import io.serverlessworkflow.api.types.func.CallTaskJava; import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; -import io.serverlessworkflow.fluent.func.spi.FuncTransformations; +import io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations; import io.serverlessworkflow.fluent.spec.TaskBaseBuilder; import io.serverlessworkflow.fluent.spec.spi.ForkTaskFluent; import java.util.ArrayList; @@ -32,7 +32,7 @@ import java.util.function.Function; public class FuncForkTaskBuilder extends TaskBaseBuilder - implements FuncTransformations, + implements FuncTaskTransformations, ConditionalTaskBuilder, ForkTaskFluent { diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java index b5168f62..a539414e 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java @@ -19,14 +19,14 @@ import io.serverlessworkflow.api.types.ListenTask; import io.serverlessworkflow.api.types.func.UntilPredicate; import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; -import io.serverlessworkflow.fluent.func.spi.FuncTransformations; +import io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations; import io.serverlessworkflow.fluent.spec.AbstractListenTaskBuilder; import java.util.function.Predicate; public class FuncListenTaskBuilder extends AbstractListenTaskBuilder implements ConditionalTaskBuilder, - FuncTransformations { + FuncTaskTransformations { private UntilPredicate untilPredicate; diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenToBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenToBuilder.java index a21315cc..8162d078 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenToBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenToBuilder.java @@ -20,7 +20,9 @@ import io.serverlessworkflow.api.types.ListenTo; import io.serverlessworkflow.api.types.OneEventConsumptionStrategy; import io.serverlessworkflow.api.types.Until; +import io.serverlessworkflow.api.types.func.UntilPredicate; import io.serverlessworkflow.fluent.spec.AbstractEventConsumptionStrategyBuilder; +import java.util.function.Predicate; public class FuncListenToBuilder extends AbstractEventConsumptionStrategyBuilder< @@ -56,7 +58,12 @@ protected ListenTo getEventConsumptionStrategy() { } @Override - protected void setUntil(Until until) { + protected void setUntilForAny(Until until) { this.listenTo.getAnyEventConsumptionStrategy().setUntil(until); } + + public FuncListenToBuilder until(Predicate predicate, Class predClass) { + this.setUntil(new UntilPredicate().withPredicate(predicate, predClass)); + return this; + } } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncSetTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncSetTaskBuilder.java index fc9753b0..a9d1bd6c 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncSetTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncSetTaskBuilder.java @@ -15,11 +15,28 @@ */ package io.serverlessworkflow.fluent.func; +import io.serverlessworkflow.api.types.Set; +import io.serverlessworkflow.api.types.SetTask; +import io.serverlessworkflow.api.types.func.MapSetTaskConfiguration; import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; import io.serverlessworkflow.fluent.spec.SetTaskBuilder; +import java.util.Map; public class FuncSetTaskBuilder extends SetTaskBuilder implements ConditionalTaskBuilder { - FuncSetTaskBuilder() {} + private final SetTask task; + + FuncSetTaskBuilder() { + this.task = new SetTask(); + this.setTask(task); + } + + public FuncSetTaskBuilder expr(Map map) { + if (this.task.getSet() == null) { + this.task.setSet(new Set()); + } + this.task.getSet().withSetTaskConfiguration(new MapSetTaskConfiguration(map)); + return this; + } } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncSwitchTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncSwitchTaskBuilder.java index c4ee2b09..c6d51409 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncSwitchTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncSwitchTaskBuilder.java @@ -22,7 +22,7 @@ import io.serverlessworkflow.api.types.SwitchTask; import io.serverlessworkflow.api.types.func.SwitchCaseFunction; import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; -import io.serverlessworkflow.fluent.func.spi.FuncTransformations; +import io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations; import io.serverlessworkflow.fluent.spec.TaskBaseBuilder; import io.serverlessworkflow.fluent.spec.spi.SwitchTaskFluent; import java.util.ArrayList; @@ -33,7 +33,7 @@ import java.util.function.Predicate; public class FuncSwitchTaskBuilder extends TaskBaseBuilder - implements FuncTransformations, + implements FuncTaskTransformations, ConditionalTaskBuilder, SwitchTaskFluent { diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java index 6ef8d7b0..e31faf60 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncTaskItemListBuilder.java @@ -46,7 +46,7 @@ protected FuncTaskItemListBuilder newItemListBuilder() { @Override public FuncTaskItemListBuilder callFn(String name, Consumer consumer) { - this.requireNameAndConfig(name, consumer); + name = this.defaultNameAndRequireConfig(name, consumer); final FuncCallTaskBuilder callTaskJavaBuilder = new FuncCallTaskBuilder(); consumer.accept(callTaskJavaBuilder); return addTaskItem(new TaskItem(name, new Task().withCallTask(callTaskJavaBuilder.build()))); @@ -59,7 +59,7 @@ public FuncTaskItemListBuilder callFn(Consumer consumer) { @Override public FuncTaskItemListBuilder set(String name, Consumer itemsConfigurer) { - this.requireNameAndConfig(name, itemsConfigurer); + name = this.defaultNameAndRequireConfig(name, itemsConfigurer); final FuncSetTaskBuilder funcSetTaskBuilder = new FuncSetTaskBuilder(); itemsConfigurer.accept(funcSetTaskBuilder); return this.addTaskItem(new TaskItem(name, new Task().withSetTask(funcSetTaskBuilder.build()))); @@ -72,7 +72,7 @@ public FuncTaskItemListBuilder set(String name, String expr) { @Override public FuncTaskItemListBuilder emit(String name, Consumer itemsConfigurer) { - this.requireNameAndConfig(name, itemsConfigurer); + name = this.defaultNameAndRequireConfig(name, itemsConfigurer); final FuncEmitTaskBuilder emitTaskJavaBuilder = new FuncEmitTaskBuilder(); itemsConfigurer.accept(emitTaskJavaBuilder); return this.addTaskItem( @@ -82,7 +82,7 @@ public FuncTaskItemListBuilder emit(String name, Consumer i @Override public FuncTaskItemListBuilder listen( String name, Consumer itemsConfigurer) { - this.requireNameAndConfig(name, itemsConfigurer); + name = this.defaultNameAndRequireConfig(name, itemsConfigurer); final FuncListenTaskBuilder listenTaskJavaBuilder = new FuncListenTaskBuilder(); itemsConfigurer.accept(listenTaskJavaBuilder); return this.addTaskItem( @@ -92,7 +92,7 @@ public FuncTaskItemListBuilder listen( @Override public FuncTaskItemListBuilder forEach( String name, Consumer itemsConfigurer) { - this.requireNameAndConfig(name, itemsConfigurer); + name = this.defaultNameAndRequireConfig(name, itemsConfigurer); final FuncForTaskBuilder forTaskJavaBuilder = new FuncForTaskBuilder(); itemsConfigurer.accept(forTaskJavaBuilder); return this.addTaskItem(new TaskItem(name, new Task().withForTask(forTaskJavaBuilder.build()))); @@ -101,7 +101,7 @@ public FuncTaskItemListBuilder forEach( @Override public FuncTaskItemListBuilder switchCase( String name, Consumer itemsConfigurer) { - this.requireNameAndConfig(name, itemsConfigurer); + name = this.defaultNameAndRequireConfig(name, itemsConfigurer); final FuncSwitchTaskBuilder funcSwitchTaskBuilder = new FuncSwitchTaskBuilder(); itemsConfigurer.accept(funcSwitchTaskBuilder); return this.addTaskItem( @@ -110,7 +110,7 @@ public FuncTaskItemListBuilder switchCase( @Override public FuncTaskItemListBuilder fork(String name, Consumer itemsConfigurer) { - this.requireNameAndConfig(name, itemsConfigurer); + name = this.defaultNameAndRequireConfig(name, itemsConfigurer); final FuncForkTaskBuilder forkTaskJavaBuilder = new FuncForkTaskBuilder(); itemsConfigurer.accept(forkTaskJavaBuilder); return this.addTaskItem( diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncEmitConfigurer.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncEmitConfigurer.java new file mode 100644 index 00000000..44e5fbd9 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncEmitConfigurer.java @@ -0,0 +1,22 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.configurers; + +import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder; +import java.util.function.Consumer; + +@FunctionalInterface +public interface FuncEmitConfigurer extends Consumer {} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncEventConfigurer.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncEventConfigurer.java new file mode 100644 index 00000000..aac39074 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncEventConfigurer.java @@ -0,0 +1,22 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.configurers; + +import io.serverlessworkflow.fluent.func.FuncEventPropertiesBuilder; +import java.util.function.Consumer; + +@FunctionalInterface +public interface FuncEventConfigurer extends Consumer {} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncListenConfigurer.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncListenConfigurer.java new file mode 100644 index 00000000..ee18a05b --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncListenConfigurer.java @@ -0,0 +1,22 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.configurers; + +import io.serverlessworkflow.fluent.func.FuncListenTaskBuilder; +import java.util.function.Consumer; + +@FunctionalInterface +public interface FuncListenConfigurer extends Consumer {} diff --git a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/configurer/FuncPredicateEventConfigurer.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncPredicateEventConfigurer.java similarity index 93% rename from experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/configurer/FuncPredicateEventConfigurer.java rename to experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncPredicateEventConfigurer.java index fce8875d..5507193f 100644 --- a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/configurer/FuncPredicateEventConfigurer.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncPredicateEventConfigurer.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.fluent.agentic.configurer; +package io.serverlessworkflow.fluent.func.configurers; import io.serverlessworkflow.fluent.func.FuncPredicateEventPropertiesBuilder; import java.util.function.Consumer; diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncTaskConfigurer.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncTaskConfigurer.java new file mode 100644 index 00000000..e3815ad7 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/FuncTaskConfigurer.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.configurers; + +import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder; +import java.util.function.Consumer; + +public interface FuncTaskConfigurer extends Consumer {} diff --git a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/configurer/SwitchCaseConfigurer.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/SwitchCaseConfigurer.java similarity index 93% rename from experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/configurer/SwitchCaseConfigurer.java rename to experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/SwitchCaseConfigurer.java index 3e302ff2..55cf25c5 100644 --- a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/configurer/SwitchCaseConfigurer.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/configurers/SwitchCaseConfigurer.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.fluent.agentic.configurer; +package io.serverlessworkflow.fluent.func.configurers; import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder; import java.util.function.Consumer; diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/BaseFuncListenSpec.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/BaseFuncListenSpec.java new file mode 100644 index 00000000..6c7f2d11 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/BaseFuncListenSpec.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.dsl; + +import io.serverlessworkflow.fluent.func.FuncEventFilterBuilder; +import io.serverlessworkflow.fluent.func.FuncListenToBuilder; +import io.serverlessworkflow.fluent.func.configurers.FuncPredicateEventConfigurer; +import io.serverlessworkflow.fluent.spec.dsl.BaseListenSpec; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Predicate; + +public abstract class BaseFuncListenSpec + extends BaseListenSpec< + SELF, LB, FuncListenToBuilder, FuncEventFilterBuilder, FuncPredicateEventConfigurer> { + + protected BaseFuncListenSpec(ToInvoker toInvoker) { + super( + toInvoker, + FuncEventFilterBuilder::with, + // allApplier + (tb, filters) -> tb.all(castFilters(filters)), + // anyApplier + (tb, filters) -> tb.any(castFilters(filters)), + // oneApplier + FuncListenToBuilder::one); + } + + @SuppressWarnings("unchecked") + private static Consumer[] castFilters(Consumer[] arr) { + return (Consumer[]) arr; + } + + public SELF until(Predicate predicate, Class predClass) { + Objects.requireNonNull(predicate, "predicate"); + this.setUntilStep(u -> u.until(predicate, predClass)); + return self(); + } +} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/EmitStep.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/EmitStep.java new file mode 100644 index 00000000..89c2402d --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/EmitStep.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.dsl; + +import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder; +import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder; +import java.util.Objects; +import java.util.function.Consumer; + +/** Chainable emit step; applies FuncEmitSpec then queued export/when. */ +public final class EmitStep extends Step { + + private final String name; // nullable + private final Consumer cfg; + + EmitStep(String name, Consumer cfg) { + this.name = name; + this.cfg = Objects.requireNonNull(cfg, "cfg"); + } + + @Override + protected void configure(FuncTaskItemListBuilder list, Consumer postApply) { + if (name == null) { + list.emit( + e -> { + cfg.accept(e); + postApply.accept(e); + }); + } else { + list.emit( + name, + e -> { + cfg.accept(e); + postApply.accept(e); + }); + } + } +} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallStep.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallStep.java new file mode 100644 index 00000000..d4cff435 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncCallStep.java @@ -0,0 +1,56 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.dsl; + +import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder; +import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder; +import java.util.function.Consumer; +import java.util.function.Function; + +// FuncCallStep +public final class FuncCallStep extends Step, FuncCallTaskBuilder> { + private final String name; // may be null + private final Function fn; + private final Class argClass; + + FuncCallStep(Function fn, Class argClass) { + this(null, fn, argClass); + } + + FuncCallStep(String name, Function fn, Class argClass) { + this.name = name; + this.fn = fn; + this.argClass = argClass; + } + + @Override + protected void configure(FuncTaskItemListBuilder list, Consumer post) { + if (name == null) { + list.callFn( + cb -> { + cb.function(fn, argClass); + post.accept(cb); + }); + } else { + list.callFn( + name, + cb -> { + cb.function(fn, argClass); + post.accept(cb); + }); + } + } +} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java new file mode 100644 index 00000000..9a8bb046 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java @@ -0,0 +1,249 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.dsl; + +import io.cloudevents.CloudEventData; +import io.serverlessworkflow.api.types.FlowDirectiveEnum; +import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder; +import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder; +import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder; +import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder; +import io.serverlessworkflow.fluent.func.configurers.FuncPredicateEventConfigurer; +import io.serverlessworkflow.fluent.func.configurers.FuncTaskConfigurer; +import io.serverlessworkflow.fluent.func.configurers.SwitchCaseConfigurer; +import io.serverlessworkflow.fluent.func.dsl.internal.CommonFuncOps; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; + +public final class FuncDSL { + private static final CommonFuncOps OPS = new CommonFuncOps() {}; + + public static Consumer fn( + Function function, Class argClass) { + return OPS.fn(function, argClass); + } + + public static Consumer fn(Function function) { + return f -> f.function(function); + } + + public static Consumer cases(SwitchCaseConfigurer... cases) { + return OPS.cases(cases); + } + + public static SwitchCaseSpec caseOf(Predicate when, Class whenClass) { + return OPS.caseOf(when, whenClass); + } + + public static SwitchCaseSpec caseOf(Predicate when) { + return OPS.caseOf(when); + } + + public static SwitchCaseConfigurer caseDefault(String task) { + return OPS.caseDefault(task); + } + + public static SwitchCaseConfigurer caseDefault(FlowDirectiveEnum directive) { + return OPS.caseDefault(directive); + } + + public static FuncListenSpec to() { + return new FuncListenSpec(); + } + + public static FuncListenSpec toOne(String type) { + return new FuncListenSpec().one(e -> e.type(type)); + } + + public static FuncListenSpec toAll(String... types) { + FuncPredicateEventConfigurer[] events = new FuncPredicateEventConfigurer[types.length]; + for (int i = 0; i < types.length; i++) { + events[i] = event(types[i]); + } + return new FuncListenSpec().all(events); + } + + public static FuncListenSpec toAny(String... types) { + FuncPredicateEventConfigurer[] events = new FuncPredicateEventConfigurer[types.length]; + for (int i = 0; i < types.length; i++) { + events[i] = event(types[i]); + } + return new FuncListenSpec().any(events); + } + + public static Consumer event( + String type, Function function) { + return OPS.event(type, function); + } + + public static Consumer event( + String type, Function function, Class clazz) { + return OPS.event(type, function, clazz); + } + + /** Emit a JSON CloudEvent (PojoCloudEventData) from a POJO payload. */ + public static Consumer eventJson(String type, Class clazz) { + return b -> new FuncEmitSpec().type(type).jsonData(clazz).accept(b); + } + + public static Consumer eventBytes( + String type, Function serializer, Class clazz) { + return b -> new FuncEmitSpec().type(type).bytesData(serializer, clazz).accept(b); + } + + public static Consumer eventBytesUtf8(String type) { + return b -> new FuncEmitSpec().type(type).bytesDataUtf8().accept(b); + } + + public static FuncPredicateEventConfigurer event(String type) { + return OPS.event(type); + } + + public static FuncCallStep function(Function fn, Class clazz) { + return new FuncCallStep<>(fn, clazz); + } + + public static FuncCallStep function(Function fn) { + Class clazz = ReflectionUtils.inferInputType(fn); + return new FuncCallStep<>(fn, clazz); + } + + public static FuncCallStep function(String name, Function fn) { + Class clazz = ReflectionUtils.inferInputType(fn); + return new FuncCallStep<>(name, fn, clazz); + } + + public static FuncCallStep function(String name, Function fn, Class clazz) { + return new FuncCallStep<>(name, fn, clazz); + } + + // ------------------ tasks ---------------- // + + public static Consumer tasks(FuncTaskConfigurer... steps) { + Objects.requireNonNull(steps, "Steps in a tasks are required"); + final List snapshot = List.of(steps.clone()); + return list -> snapshot.forEach(s -> s.accept(list)); + } + + public static EmitStep emit(Consumer cfg) { + return new EmitStep(null, cfg); + } + + public static EmitStep emit(String name, Consumer cfg) { + return new EmitStep(name, cfg); + } + + public static EmitStep emit(String type, Function fn) { + // `event(type, fn)` is your Consumer for EMIT + return new EmitStep(null, event(type, fn)); + } + + public static EmitStep emit(String name, String type, Function fn) { + return new EmitStep(name, event(type, fn)); + } + + public static EmitStep emit( + String name, String type, Function serializer, Class clazz) { + return new EmitStep(name, eventBytes(type, serializer, clazz)); + } + + public static EmitStep emit(String type, Function serializer, Class clazz) { + return new EmitStep(null, eventBytes(type, serializer, clazz)); + } + + public static EmitStep emitJson(String type, Class clazz) { + return new EmitStep(null, eventJson(type, clazz)); + } + + public static EmitStep emitJson(String name, String type, Class clazz) { + return new EmitStep(name, eventJson(type, clazz)); + } + + public static ListenStep listen(FuncListenSpec spec) { + return new ListenStep(null, spec); + } + + public static ListenStep listen(String name, FuncListenSpec spec) { + return new ListenStep(name, spec); + } + + public static FuncTaskConfigurer switchCase( + String taskName, Consumer switchCase) { + return list -> list.switchCase(taskName, switchCase); + } + + public static FuncTaskConfigurer switchCase(Consumer switchCase) { + return list -> list.switchCase(switchCase); + } + + public static FuncTaskConfigurer switchCase(SwitchCaseConfigurer... cases) { + return switchCase(null, cases); + } + + public static FuncTaskConfigurer switchCase(String taskName, SwitchCaseConfigurer... cases) { + Objects.requireNonNull(cases, "cases are required"); + final List snapshot = List.of(cases.clone()); + return list -> list.switchCase(taskName, s -> snapshot.forEach(s::onPredicate)); + } + + // Single predicate -> then task + public static FuncTaskConfigurer switchWhen(Predicate pred, String thenTask) { + return list -> list.switchCase(cases(caseOf(pred).then(thenTask))); + } + + // With default directive + public static FuncTaskConfigurer switchWhenOrElse( + Predicate pred, String thenTask, FlowDirectiveEnum otherwise) { + return list -> + list.switchCase(FuncDSL.cases(caseOf(pred).then(thenTask), caseDefault(otherwise))); + } + + public static FuncTaskConfigurer switchWhenOrElse( + Predicate pred, String thenTask, String otherwiseTask) { + return list -> + list.switchCase(FuncDSL.cases(caseOf(pred).then(thenTask), caseDefault(otherwiseTask))); + } + + public static FuncTaskConfigurer forEach( + Function> collection, Consumer body) { + return list -> list.forEach(j -> j.collection(collection).tasks(body)); + } + + public static FuncTaskConfigurer forEach( + Collection collection, Consumer body) { + Function> f = ctx -> (Collection) collection; + return list -> list.forEach(j -> j.collection(f).tasks(body)); + } + + // Overload with simple constant collection + public static FuncTaskConfigurer forEach( + List collection, Consumer body) { + return list -> list.forEach(j -> j.collection(ctx -> collection).tasks(body)); + } + + public static FuncTaskConfigurer set(String expr) { + return list -> list.set(expr); + } + + public static FuncTaskConfigurer set(Map map) { + return list -> list.set(s -> s.expr(map)); + } +} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEmitSpec.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEmitSpec.java new file mode 100644 index 00000000..115419ad --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEmitSpec.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.dsl; + +import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder; +import io.serverlessworkflow.fluent.func.configurers.FuncEmitConfigurer; + +public class FuncEmitSpec extends FuncEventFilterSpec implements FuncEmitConfigurer { + + @Override + public void accept(FuncEmitTaskBuilder funcEmitTaskBuilder) { + funcEmitTaskBuilder.event(e -> getSteps().forEach(step -> step.accept(e))); + } + + @Override + protected FuncEmitSpec self() { + return this; + } +} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java new file mode 100644 index 00000000..1e254467 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java @@ -0,0 +1,70 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.dsl; + +import io.cloudevents.CloudEventData; +import io.cloudevents.core.data.BytesCloudEventData; +import io.cloudevents.core.data.PojoCloudEventData; +import io.serverlessworkflow.api.types.func.EventDataFunction; +import io.serverlessworkflow.fluent.func.FuncEventPropertiesBuilder; +import io.serverlessworkflow.fluent.spec.dsl.EventFilterSpec; +import io.serverlessworkflow.impl.jackson.JsonUtils; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.function.Function; + +public abstract class FuncEventFilterSpec + extends EventFilterSpec { + + FuncEventFilterSpec() { + super(new ArrayList<>()); + } + + /** Sets the event data and the contentType to `application/json` */ + public SELF jsonData(Function function) { + Class clazz = ReflectionUtils.inferInputType(function); + addStep(e -> e.data(new EventDataFunction().withFunction(function, clazz))); + return JSON(); + } + + /** Sets the event data and the contentType to `application/octet-stream` */ + public SELF bytesData(Function serializer, Class clazz) { + addStep(e -> e.data(payload -> BytesCloudEventData.wrap(serializer.apply(payload)), clazz)); + return OCTET_STREAM(); + } + + public SELF bytesDataUtf8() { + return bytesData((String s) -> s.getBytes(StandardCharsets.UTF_8), String.class); + } + + /** Sets the event data and the contentType to `application/json` */ + public SELF jsonData(Function function, Class clazz) { + addStep(e -> e.data(new EventDataFunction().withFunction(function, clazz))); + return JSON(); + } + + /** JSON with default mapper (PojoCloudEventData + application/json). */ + public SELF jsonData(Class clazz) { + addStep( + e -> + e.data( + payload -> + PojoCloudEventData.wrap( + payload, p -> JsonUtils.mapper().writeValueAsString(p).getBytes()), + clazz)); + return JSON(); + } +} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncListenSpec.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncListenSpec.java new file mode 100644 index 00000000..334a219a --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncListenSpec.java @@ -0,0 +1,37 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.dsl; + +import io.serverlessworkflow.fluent.func.FuncListenTaskBuilder; +import io.serverlessworkflow.fluent.func.configurers.FuncListenConfigurer; + +public final class FuncListenSpec extends BaseFuncListenSpec + implements FuncListenConfigurer { + + public FuncListenSpec() { + super(FuncListenTaskBuilder::to); + } + + @Override + protected FuncListenSpec self() { + return this; + } + + @Override + public void accept(FuncListenTaskBuilder funcListenTaskBuilder) { + acceptInto(funcListenTaskBuilder); + } +} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/ListenStep.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/ListenStep.java new file mode 100644 index 00000000..debd3b96 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/ListenStep.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.dsl; + +import io.serverlessworkflow.fluent.func.FuncListenTaskBuilder; +import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder; +import java.util.function.Consumer; + +/** Chainable listen step; applies FuncListenSpec then queued export/when. */ +public final class ListenStep extends Step { + + private final String name; // nullable + private final FuncListenSpec spec; + + ListenStep(String name, FuncListenSpec spec) { + this.name = name; + this.spec = spec; + } + + @Override + protected void configure( + FuncTaskItemListBuilder list, Consumer postApply) { + if (name == null) { + list.listen( + lb -> { + spec.accept(lb); + postApply.accept(lb); + }); + } else { + list.listen( + name, + lb -> { + spec.accept(lb); + postApply.accept(lb); + }); + } + } +} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/ReflectionUtils.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/ReflectionUtils.java new file mode 100644 index 00000000..d9f3793a --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/ReflectionUtils.java @@ -0,0 +1,82 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.dsl; + +import java.lang.invoke.MethodHandleInfo; +import java.lang.invoke.MethodType; +import java.lang.reflect.Method; +import java.util.Optional; +import java.util.function.Function; + +/** + * Specially used by {@link Function} parameters in the Java Function. + * + * @see Serialize a Lambda in Java + */ +public final class ReflectionUtils { + + private ReflectionUtils() {} + + @SuppressWarnings("unchecked") + static Optional> safeInferInputType(Function fn) { + try { + Method m = fn.getClass().getDeclaredMethod("writeReplace"); + m.setAccessible(true); + java.lang.invoke.SerializedLambda sl = (java.lang.invoke.SerializedLambda) m.invoke(fn); + + // Owner class of the referenced implementation + String ownerName = sl.getImplClass().replace('/', '.'); + ClassLoader cl = fn.getClass().getClassLoader(); + Class owner = Class.forName(ownerName, false, cl); + + // Parse the impl method descriptor to get raw param types + MethodType mt = MethodType.fromMethodDescriptorString(sl.getImplMethodSignature(), cl); + Class[] params = mt.parameterArray(); + int kind = sl.getImplMethodKind(); + + switch (kind) { + case MethodHandleInfo.REF_invokeStatic: + case MethodHandleInfo.REF_newInvokeSpecial: + // static method or constructor: T is the first parameter + return params.length >= 1 ? Optional.of((Class) params[0]) : Optional.empty(); + + case MethodHandleInfo.REF_invokeVirtual: + case MethodHandleInfo.REF_invokeInterface: + case MethodHandleInfo.REF_invokeSpecial: + // instance method ref like Foo::bar + // For Function, if bar has no params, T is the receiver type (owner). + // If bar has one param, that pattern would usually map to BiFunction, not Function, + // but keep a defensive branch anyway: + return (params.length == 0) + ? Optional.of((Class) owner) + : Optional.of((Class) params[0]); + + default: + return Optional.empty(); + } + } catch (Exception ignore) { + return Optional.empty(); + } + } + + public static Class inferInputType(Function fn) { + return safeInferInputType(fn) + .orElseThrow( + () -> + new IllegalStateException( + "Cannot infer input type from lambda. Pass Class or use a method reference.")); + } +} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/Step.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/Step.java new file mode 100644 index 00000000..07d14104 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/Step.java @@ -0,0 +1,182 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.dsl; + +import io.serverlessworkflow.api.types.func.JavaContextFunction; +import io.serverlessworkflow.api.types.func.JavaFilterFunction; +import io.serverlessworkflow.fluent.func.FuncTaskItemListBuilder; +import io.serverlessworkflow.fluent.func.configurers.FuncTaskConfigurer; +import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; +import io.serverlessworkflow.fluent.func.spi.FuncTaskTransformations; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; + +/** A deferred configurer that can chain when/inputFrom/outputAs/exportAs and apply them later. */ +abstract class Step, B> implements FuncTaskConfigurer { + + private final List> postConfigurers = new ArrayList<>(); + + @SuppressWarnings("unchecked") + protected final SELF self() { + return (SELF) this; + } + + // ---------- ConditionalTaskBuilder passthroughs ---------- + + /** Queue a ConditionalTaskBuilder#when(Predicate) to be applied on the concrete builder. */ + public SELF when(Predicate predicate) { + postConfigurers.add(b -> ((ConditionalTaskBuilder) b).when(predicate)); + return self(); + } + + /** Queue a ConditionalTaskBuilder#when(Predicate, Class) to be applied later. */ + public SELF when(Predicate predicate, Class argClass) { + postConfigurers.add(b -> ((ConditionalTaskBuilder) b).when(predicate, argClass)); + return self(); + } + + // ---------- FuncTaskTransformations passthroughs: exportAs ---------- + + /** Queue a FuncTaskTransformations#exportAs(Function) to be applied later. */ + public SELF exportAs(Function function) { + postConfigurers.add(b -> ((FuncTaskTransformations) b).exportAs(function)); + return self(); + } + + /** Queue a FuncTaskTransformations#exportAs(Function, Class) to be applied later. */ + public SELF exportAs(Function function, Class argClass) { + postConfigurers.add(b -> ((FuncTaskTransformations) b).exportAs(function, argClass)); + return self(); + } + + /** Queue a FuncTaskTransformations#exportAs(JavaFilterFunction) to be applied later. */ + public SELF exportAs(JavaFilterFunction function) { + postConfigurers.add(b -> ((FuncTaskTransformations) b).exportAs(function)); + return self(); + } + + /** Queue a FuncTaskTransformations#exportAs(JavaFilterFunction, Class) to be applied later. */ + public SELF exportAs(JavaFilterFunction function, Class argClass) { + postConfigurers.add(b -> ((FuncTaskTransformations) b).exportAs(function, argClass)); + return self(); + } + + /** Queue a FuncTaskTransformations#exportAs(JavaContextFunction) to be applied later. */ + public SELF exportAs(JavaContextFunction function) { + postConfigurers.add(b -> ((FuncTaskTransformations) b).exportAs(function)); + return self(); + } + + /** Queue a FuncTaskTransformations#exportAs(JavaContextFunction, Class) to be applied later. */ + public SELF exportAs(JavaContextFunction function, Class argClass) { + postConfigurers.add(b -> ((FuncTaskTransformations) b).exportAs(function, argClass)); + return self(); + } + + // ---------- FuncTaskTransformations passthroughs: outputAs ---------- + + /** Queue a FuncTaskTransformations#outputAs(Function) to be applied later. */ + public SELF outputAs(Function function) { + postConfigurers.add(b -> ((FuncTaskTransformations) b).outputAs(function)); + return self(); + } + + /** Queue a FuncTaskTransformations#outputAs(Function, Class) to be applied later. */ + public SELF outputAs(Function function, Class argClass) { + postConfigurers.add(b -> ((FuncTaskTransformations) b).outputAs(function, argClass)); + return self(); + } + + /** Queue a FuncTaskTransformations#outputAs(JavaFilterFunction) to be applied later. */ + public SELF outputAs(JavaFilterFunction function) { + postConfigurers.add(b -> ((FuncTaskTransformations) b).outputAs(function)); + return self(); + } + + /** Queue a FuncTaskTransformations#outputAs(JavaFilterFunction, Class) to be applied later. */ + public SELF outputAs(JavaFilterFunction function, Class argClass) { + postConfigurers.add(b -> ((FuncTaskTransformations) b).outputAs(function, argClass)); + return self(); + } + + /** Queue a FuncTaskTransformations#outputAs(JavaContextFunction) to be applied later. */ + public SELF outputAs(JavaContextFunction function) { + postConfigurers.add(b -> ((FuncTaskTransformations) b).outputAs(function)); + return self(); + } + + /** Queue a FuncTaskTransformations#outputAs(JavaContextFunction, Class) to be applied later. */ + public SELF outputAs(JavaContextFunction function, Class argClass) { + postConfigurers.add(b -> ((FuncTaskTransformations) b).outputAs(function, argClass)); + return self(); + } + + // ---------- FuncTaskTransformations passthroughs: inputFrom ---------- + + /** Queue a FuncTaskTransformations#inputFrom(Function) to be applied later. */ + public SELF inputFrom(Function function) { + postConfigurers.add(b -> ((FuncTaskTransformations) b).inputFrom(function)); + return self(); + } + + /** Queue a FuncTaskTransformations#inputFrom(Function, Class) to be applied later. */ + public SELF inputFrom(Function function, Class argClass) { + postConfigurers.add(b -> ((FuncTaskTransformations) b).inputFrom(function, argClass)); + return self(); + } + + /** Queue a FuncTaskTransformations#inputFrom(JavaFilterFunction) to be applied later. */ + public SELF inputFrom(JavaFilterFunction function) { + postConfigurers.add(b -> ((FuncTaskTransformations) b).inputFrom(function)); + return self(); + } + + /** Queue a FuncTaskTransformations#inputFrom(JavaFilterFunction, Class) to be applied later. */ + public SELF inputFrom(JavaFilterFunction function, Class argClass) { + postConfigurers.add(b -> ((FuncTaskTransformations) b).inputFrom(function, argClass)); + return self(); + } + + /** Queue a FuncTaskTransformations#inputFrom(JavaContextFunction) to be applied later. */ + public SELF inputFrom(JavaContextFunction function) { + postConfigurers.add(b -> ((FuncTaskTransformations) b).inputFrom(function)); + return self(); + } + + /** Queue a FuncTaskTransformations#inputFrom(JavaContextFunction, Class) to be applied later. */ + public SELF inputFrom(JavaContextFunction function, Class argClass) { + postConfigurers.add(b -> ((FuncTaskTransformations) b).inputFrom(function, argClass)); + return self(); + } + + // ---------- wiring into the underlying list/builder ---------- + + @Override + public final void accept(FuncTaskItemListBuilder list) { + configure(list, this::applyPost); + } + + /** Implement per-step to attach to the correct builder and run {@code post} at the end. */ + protected abstract void configure(FuncTaskItemListBuilder list, Consumer post); + + /** Applies all queued post-configurers to the concrete builder. */ + private void applyPost(B builder) { + for (Consumer c : postConfigurers) c.accept(builder); + } +} diff --git a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/dsl/SwitchCaseSpec.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SwitchCaseSpec.java similarity index 92% rename from experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/dsl/SwitchCaseSpec.java rename to experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SwitchCaseSpec.java index 171d722e..63d05a16 100644 --- a/experimental/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/dsl/SwitchCaseSpec.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SwitchCaseSpec.java @@ -13,10 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.fluent.agentic.dsl; +package io.serverlessworkflow.fluent.func.dsl; -import io.serverlessworkflow.fluent.agentic.configurer.SwitchCaseConfigurer; import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder; +import io.serverlessworkflow.fluent.func.configurers.SwitchCaseConfigurer; import java.util.function.Predicate; public class SwitchCaseSpec implements SwitchCaseConfigurer { diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/internal/CommonFuncOps.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/internal/CommonFuncOps.java new file mode 100644 index 00000000..fe0c05dd --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/internal/CommonFuncOps.java @@ -0,0 +1,79 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.dsl.internal; + +import io.cloudevents.CloudEventData; +import io.serverlessworkflow.api.types.FlowDirectiveEnum; +import io.serverlessworkflow.fluent.func.FuncCallTaskBuilder; +import io.serverlessworkflow.fluent.func.FuncEmitTaskBuilder; +import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder; +import io.serverlessworkflow.fluent.func.configurers.FuncPredicateEventConfigurer; +import io.serverlessworkflow.fluent.func.configurers.SwitchCaseConfigurer; +import io.serverlessworkflow.fluent.func.dsl.ReflectionUtils; +import io.serverlessworkflow.fluent.func.dsl.SwitchCaseSpec; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; + +public interface CommonFuncOps { + + default Consumer fn(Function function, Class argClass) { + return f -> f.function(function, argClass); + } + + default Consumer fn(Function function) { + Class clazz = ReflectionUtils.inferInputType(function); + return f -> f.function(function, clazz); + } + + default Consumer cases(SwitchCaseConfigurer... cases) { + return s -> { + for (SwitchCaseConfigurer c : cases) { + s.onPredicate(c); + } + }; + } + + default SwitchCaseSpec caseOf(Predicate when, Class whenClass) { + return new SwitchCaseSpec().when(when, whenClass); + } + + default SwitchCaseSpec caseOf(Predicate when) { + return new SwitchCaseSpec().when(when); + } + + default SwitchCaseConfigurer caseDefault(String task) { + return s -> s.then(task); + } + + default SwitchCaseConfigurer caseDefault(FlowDirectiveEnum directive) { + return s -> s.then(directive); + } + + default Consumer event( + String type, Function function) { + return event -> event.event(e -> e.type(type).data(function)); + } + + default Consumer event( + String type, Function function, Class clazz) { + return event -> event.event(e -> e.type(type).data(function, clazz)); + } + + default FuncPredicateEventConfigurer event(String type) { + return e -> e.type(type); + } +} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/ConditionalTaskBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/ConditionalTaskBuilder.java index 383bf7f3..6c4ca97a 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/ConditionalTaskBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/ConditionalTaskBuilder.java @@ -24,11 +24,13 @@ public interface ConditionalTaskBuilder { TaskBase getTask(); + @SuppressWarnings("unchecked") default SELF when(Predicate predicate) { ConditionalTaskBuilderHelper.setMetadata(getTask(), predicate); return (SELF) this; } + @SuppressWarnings("unchecked") default SELF when(Predicate predicate, Class argClass) { Objects.requireNonNull(argClass); ConditionalTaskBuilderHelper.setMetadata(getTask(), new TypedPredicate<>(predicate, argClass)); diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncTaskTransformations.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncTaskTransformations.java new file mode 100644 index 00000000..16d2be28 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncTaskTransformations.java @@ -0,0 +1,63 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.spi; + +import io.serverlessworkflow.api.types.Export; +import io.serverlessworkflow.api.types.func.ExportAsFunction; +import io.serverlessworkflow.api.types.func.JavaContextFunction; +import io.serverlessworkflow.api.types.func.JavaFilterFunction; +import io.serverlessworkflow.fluent.spec.spi.TaskTransformationHandlers; +import java.util.function.Function; + +public interface FuncTaskTransformations> + extends TaskTransformationHandlers, FuncTransformations { + + @SuppressWarnings("unchecked") + default SELF exportAs(Function function) { + setExport(new Export().withAs(new ExportAsFunction().withFunction(function))); + return (SELF) this; + } + + @SuppressWarnings("unchecked") + default SELF exportAs(Function function, Class argClass) { + setExport(new Export().withAs(new ExportAsFunction().withFunction(function, argClass))); + return (SELF) this; + } + + @SuppressWarnings("unchecked") + default SELF exportAs(JavaFilterFunction function) { + setExport(new Export().withAs(new ExportAsFunction().withFunction(function))); + return (SELF) this; + } + + @SuppressWarnings("unchecked") + default SELF exportAs(JavaFilterFunction function, Class argClass) { + setExport(new Export().withAs(new ExportAsFunction().withFunction(function, argClass))); + return (SELF) this; + } + + @SuppressWarnings("unchecked") + default SELF exportAs(JavaContextFunction function) { + setExport(new Export().withAs(new ExportAsFunction().withFunction(function))); + return (SELF) this; + } + + @SuppressWarnings("unchecked") + default SELF exportAs(JavaContextFunction function, Class argClass) { + setExport(new Export().withAs(new ExportAsFunction().withFunction(function, argClass))); + return (SELF) this; + } +} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncTransformations.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncTransformations.java index db257dd0..c603b02c 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncTransformations.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/spi/FuncTransformations.java @@ -15,11 +15,11 @@ */ package io.serverlessworkflow.fluent.func.spi; -import io.serverlessworkflow.api.types.Export; import io.serverlessworkflow.api.types.Input; import io.serverlessworkflow.api.types.Output; -import io.serverlessworkflow.api.types.func.ExportAsFunction; import io.serverlessworkflow.api.types.func.InputFromFunction; +import io.serverlessworkflow.api.types.func.JavaContextFunction; +import io.serverlessworkflow.api.types.func.JavaFilterFunction; import io.serverlessworkflow.api.types.func.OutputAsFunction; import io.serverlessworkflow.fluent.spec.spi.TransformationHandlers; import java.util.function.Function; @@ -27,33 +27,75 @@ public interface FuncTransformations> extends TransformationHandlers { - default SELF exportAsFn(Function function) { - setExport(new Export().withAs(new ExportAsFunction().withFunction(function))); + @SuppressWarnings("unchecked") + default SELF inputFrom(Function function) { + setInput(new Input().withFrom(new InputFromFunction().withFunction(function))); return (SELF) this; } - default SELF exportAsFn(Function function, Class argClass) { - setExport(new Export().withAs(new ExportAsFunction().withFunction(function, argClass))); + @SuppressWarnings("unchecked") + default SELF inputFrom(Function function, Class argClass) { + setInput(new Input().withFrom(new InputFromFunction().withFunction(function, argClass))); return (SELF) this; } - default SELF inputFrom(Function function) { + @SuppressWarnings("unchecked") + default SELF inputFrom(JavaFilterFunction function) { setInput(new Input().withFrom(new InputFromFunction().withFunction(function))); return (SELF) this; } - default SELF inputFrom(Function function, Class argClass) { + @SuppressWarnings("unchecked") + default SELF inputFrom(JavaFilterFunction function, Class argClass) { setInput(new Input().withFrom(new InputFromFunction().withFunction(function, argClass))); return (SELF) this; } + @SuppressWarnings("unchecked") + default SELF inputFrom(JavaContextFunction function) { + setInput(new Input().withFrom(new InputFromFunction().withFunction(function))); + return (SELF) this; + } + + @SuppressWarnings("unchecked") + default SELF inputFrom(JavaContextFunction function, Class argClass) { + setInput(new Input().withFrom(new InputFromFunction().withFunction(function, argClass))); + return (SELF) this; + } + + @SuppressWarnings("unchecked") default SELF outputAs(Function function) { setOutput(new Output().withAs(new OutputAsFunction().withFunction(function))); return (SELF) this; } + @SuppressWarnings("unchecked") default SELF outputAs(Function function, Class argClass) { setOutput(new Output().withAs(new OutputAsFunction().withFunction(function, argClass))); return (SELF) this; } + + @SuppressWarnings("unchecked") + default SELF outputAs(JavaFilterFunction function) { + setOutput(new Output().withAs(new OutputAsFunction().withFunction(function))); + return (SELF) this; + } + + @SuppressWarnings("unchecked") + default SELF outputAs(JavaFilterFunction function, Class argClass) { + setOutput(new Output().withAs(new OutputAsFunction().withFunction(function, argClass))); + return (SELF) this; + } + + @SuppressWarnings("unchecked") + default SELF outputAs(JavaContextFunction function) { + setOutput(new Output().withAs(new OutputAsFunction().withFunction(function))); + return (SELF) this; + } + + @SuppressWarnings("unchecked") + default SELF outputAs(JavaContextFunction function, Class argClass) { + setOutput(new Output().withAs(new OutputAsFunction().withFunction(function, argClass))); + return (SELF) this; + } } diff --git a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLTest.java b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLTest.java new file mode 100644 index 00000000..eed79dcd --- /dev/null +++ b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLTest.java @@ -0,0 +1,176 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func; + +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.emit; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.event; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.listen; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.toOne; +import static org.junit.jupiter.api.Assertions.*; + +import io.cloudevents.core.data.BytesCloudEventData; +import io.serverlessworkflow.api.types.Export; +import io.serverlessworkflow.api.types.Task; +import io.serverlessworkflow.api.types.TaskItem; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.api.types.func.CallJava; +import io.serverlessworkflow.api.types.func.JavaFilterFunction; +import io.serverlessworkflow.fluent.func.dsl.FuncEmitSpec; +import io.serverlessworkflow.fluent.func.dsl.FuncListenSpec; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +/** Tests for Step chaining (exportAs/when) over function/emit/listen. */ +class FuncDSLTest { + + @Test + void function_step_exportAs_function_sets_export() { + Workflow wf = + FuncWorkflowBuilder.workflow("step-function-export") + .tasks( + // call + chain exportAs + function(String::trim, String.class) + .exportAs((String s) -> Map.of("len", s.length()))) + .build(); + + List items = wf.getDo(); + assertEquals(1, items.size()); + + Task t = items.get(0).getTask(); + assertNotNull(t.getCallTask(), "CallTask expected"); + Export ex = ((CallJava) t.getCallTask().get()).getExport(); + assertNotNull(ex, "Export should be set via Step.exportAs(Function)"); + assertNotNull(ex.getAs(), "'as' should be populated"); + // functional export should not produce a literal string + assertNull( + ex.getAs().getString(), "Export 'as' must not be a literal string when using Function"); + } + + @Test + void function_step_when_compiles_and_builds() { + Workflow wf = + FuncWorkflowBuilder.workflow("step-function-when") + .tasks( + function((Integer v) -> v + 1, Integer.class) + .when((Integer v) -> v > 0, Integer.class)) + .build(); + + List items = wf.getDo(); + assertEquals(1, items.size()); + assertNotNull(items.get(0).getTask().getCallTask(), "CallTask should still be present"); + // We don't assert internal predicate storage details; just ensure build success & presence. + } + + @Test + void emit_step_exportAs_javaFilter_sets_export() { + // Build an emit spec using your DSL (type + data function) + FuncEmitSpec spec = + new FuncEmitSpec() + .type("org.acme.signal") + .bytesData((String s) -> s.getBytes(StandardCharsets.UTF_8), String.class); + + // JavaFilterFunction is (T, WorkflowContextData, TaskContextData) -> R + JavaFilterFunction> jf = + (val, wfCtx, taskCtx) -> Map.of("wrapped", val, "wfId", wfCtx.instanceData().id()); + + Workflow wf = + FuncWorkflowBuilder.workflow("step-emit-export") + .tasks(emit("emitWrapped", spec).exportAs(jf)) // chaining on Step + .build(); + + List items = wf.getDo(); + assertEquals(1, items.size()); + Task t = items.get(0).getTask(); + assertNotNull(t.getEmitTask(), "EmitTask expected"); + + // Export is attached to Task + Export ex = t.getEmitTask().getExport(); + assertNotNull(ex, "Export should be set via Step.exportAs(JavaFilterFunction)"); + assertNotNull(ex.getAs(), "'as' should be populated"); + assertNull( + ex.getAs().getString(), "Export 'as' must not be a literal string when using function"); + } + + @Test + @DisplayName("listen(spec).exportAs(Function) sets Export on ListenTask holder") + void listen_step_exportAs_function_sets_export() { + FuncListenSpec spec = toOne("org.acme.review.done"); // using your existing DSL helper + + Workflow wf = + FuncWorkflowBuilder.workflow("step-listen-export") + .tasks(listen("waitHumanReview", spec).exportAs((Object e) -> Map.of("seen", true))) + .build(); + + List items = wf.getDo(); + assertEquals(1, items.size()); + Task t = items.get(0).getTask(); + assertNotNull(t.getListenTask(), "ListenTask expected"); + + Export ex = t.getListenTask().getExport(); + assertNotNull(ex, "Export should be set via Step.exportAs(Function)"); + assertNotNull(ex.getAs(), "'as' should be populated"); + assertNull( + ex.getAs().getString(), "Export 'as' must not be a literal string when using function"); + } + + @Test + @DisplayName("emit(event(type, fn)).when(...) -> still an EmitTask and builds") + void emit_step_when_compiles_and_builds() { + Workflow wf = + FuncWorkflowBuilder.workflow("step-emit-when") + .tasks( + emit(event("org.acme.sig", (String s) -> BytesCloudEventData.wrap(s.getBytes()))) + .when((Object ctx) -> true)) + .build(); + + List items = wf.getDo(); + assertEquals(1, items.size()); + assertNotNull(items.get(0).getTask().getEmitTask(), "EmitTask should still be present"); + } + + @Test + @DisplayName("Mixed chaining: function.exportAs -> emit.when -> listen.exportAs") + void mixed_chaining_order_and_exports() { + Workflow wf = + FuncWorkflowBuilder.workflow("step-mixed") + .tasks( + function(String::strip, String.class).exportAs((String s) -> Map.of("s", s)), + emit(event( + "org.acme.kickoff", (String s) -> BytesCloudEventData.wrap(s.getBytes()))) + .when((Object ignore) -> true), + listen(toOne("org.acme.done")).exportAs((Object e) -> Map.of("ok", true))) + .build(); + + List items = wf.getDo(); + assertEquals(3, items.size()); + + Task t0 = items.get(0).getTask(); + Task t1 = items.get(1).getTask(); + Task t2 = items.get(2).getTask(); + + assertNotNull(t0.getCallTask()); + assertNotNull(t1.getEmitTask()); + assertNotNull(t2.getListenTask()); + + assertNotNull( + ((CallJava) t0.getCallTask().get()).getExport(), "function step should carry export"); + assertNotNull(t2.getListenTask().getExport(), "listen step should carry export"); + } +} diff --git a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/JavaWorkflowBuilderTest.java b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/JavaWorkflowBuilderTest.java deleted file mode 100644 index 24de3e7d..00000000 --- a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/JavaWorkflowBuilderTest.java +++ /dev/null @@ -1,281 +0,0 @@ -/* - * Copyright 2020-Present The Serverless Workflow Specification Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.serverlessworkflow.fluent.func; - -import static org.junit.jupiter.api.Assertions.*; - -import io.serverlessworkflow.api.types.Document; -import io.serverlessworkflow.api.types.Export; -import io.serverlessworkflow.api.types.Output; -import io.serverlessworkflow.api.types.SetTask; -import io.serverlessworkflow.api.types.Task; -import io.serverlessworkflow.api.types.TaskBase; -import io.serverlessworkflow.api.types.TaskItem; -import io.serverlessworkflow.api.types.Workflow; -import io.serverlessworkflow.api.types.func.*; -import io.serverlessworkflow.fluent.spec.BaseWorkflowBuilder; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - -/** Tests for FuncWorkflowBuilder + Java DSL extensions. */ -class JavaWorkflowBuilderTest { - - @Test - @DisplayName("Default Java workflow has auto-generated name and default namespace/version") - void testDefaults() { - Workflow wf = FuncWorkflowBuilder.workflow().build(); - assertNotNull(wf); - Document doc = wf.getDocument(); - assertNotNull(doc); - assertEquals(BaseWorkflowBuilder.DEFAULT_NAMESPACE, doc.getNamespace()); - assertEquals(BaseWorkflowBuilder.DEFAULT_VERSION, doc.getVersion()); - assertEquals(BaseWorkflowBuilder.DSL, doc.getDsl()); - assertNotNull(doc.getName()); - } - - @Test - @DisplayName("Spec style forE still works inside Java workflow") - void testSpecForEachInJavaWorkflow() { - Workflow wf = - FuncWorkflowBuilder.workflow("specLoopFlow") - .tasks( - d -> - d.forEach(f -> f.each("pet").in("$.pets")) - .set("markDone", s -> s.expr("$.done = true"))) - .build(); - - List items = wf.getDo(); - assertEquals(2, items.size()); - - TaskItem loopItem = items.get(0); - assertNotNull(loopItem.getTask().getForTask(), "Spec ForTask should be present"); - - TaskItem setItem = items.get(1); - assertNotNull(setItem.getTask().getSetTask()); - SetTask st = setItem.getTask().getSetTask(); - assertEquals("$.done = true", st.getSet().getString()); - } - - @Test - @DisplayName("Java style forE with collection + whileC builds ForTaskFunction") - void testJavaForEach() { - Workflow wf = - FuncWorkflowBuilder.workflow("javaLoopFlow") - .tasks( - d -> - d.forEach( - j -> - j.collection(ctx -> List.of("a", "b", "c")) - .whileC((String val, Object ctx) -> !val.equals("c")) - .tasks( - inner -> inner.set("loopFlag", s -> s.expr("$.flag = true"))))) - .build(); - - List items = wf.getDo(); - assertEquals(1, items.size()); - - TaskItem loopItem = items.get(0); - Task task = loopItem.getTask(); - - assertNotNull(task.getForTask(), "Java ForTaskFunction should be present"); - - // Basic structural checks on nested do inside the function loop - ForTaskFunction fn = (ForTaskFunction) task.getForTask(); - assertNotNull(fn.getDo(), "Nested 'do' list inside ForTaskFunction should be populated"); - assertEquals(1, fn.getDo().size()); - Task nested = fn.getDo().get(0).getTask(); - assertNotNull(nested.getSetTask()); - } - - @Test - @DisplayName("Mixed spec and Java loops in one workflow") - void testMixedLoops() { - Workflow wf = - FuncWorkflowBuilder.workflow("mixed") - .tasks( - d -> - d.forEach(f -> f.each("item").in("$.array")) // spec - .forEach(j -> j.collection(ctx -> List.of(1, 2, 3))) // java - ) - .build(); - - List items = wf.getDo(); - assertEquals(2, items.size()); - - Task specLoop = items.get(0).getTask(); - Task javaLoop = items.get(1).getTask(); - - assertNotNull(specLoop.getForTask()); - assertNotNull(javaLoop.getForTask()); - } - - @Test - @DisplayName("Java functional exportAsFn/inputFrom/outputAs set function wrappers (not literals)") - void testJavaFunctionalIO() { - AtomicBoolean exportCalled = new AtomicBoolean(false); - AtomicBoolean inputCalled = new AtomicBoolean(false); - AtomicBoolean outputCalled = new AtomicBoolean(false); - - Workflow wf = - FuncWorkflowBuilder.workflow("fnIO") - .tasks( - d -> - d.set("init", s -> s.expr("$.x = 1")) - .forEach( - j -> - j.collection( - ctx -> { - inputCalled.set(true); - return List.of("x", "y"); - }) - .tasks(inner -> inner.set("calc", s -> s.expr("$.y = $.x + 1"))) - .exportAsFn( - item -> { - exportCalled.set(true); - return Map.of("computed", 42); - }) - .outputAs( - item -> { - outputCalled.set(true); - return Map.of("out", true); - }))) - .build(); - - // Top-level 'do' structure - assertEquals(2, wf.getDo().size()); - - // Find nested forTaskFunction - Task forTaskFnHolder = wf.getDo().get(1).getTask(); - ForTaskFunction fn = (ForTaskFunction) forTaskFnHolder.getForTask(); - assertNotNull(fn); - - // Inspect nested branche inside the function loop - List nested = fn.getDo(); - assertEquals(1, nested.size()); - TaskBase nestedTask = nested.get(0).getTask().getSetTask(); - assertNotNull(nestedTask); - - // Because functions are likely stored as opaque objects, we check that - // export / output structures exist and are not expression-based. - Export export = fn.getExport(); - assertNotNull(export, "Export should be set via functional variant"); - assertNull( - export.getAs() != null ? export.getAs().getString() : null, - "Export 'as' should not be a plain string when using function variant"); - - Output out = fn.getOutput(); - // If functional output maps to an OutputAsFunction wrapper, adapt the checks: - if (out != null && out.getAs() != null) { - // Expect no literal string if function used - assertNull(out.getAs().getString(), "Output 'as' should not be a literal string"); - } - - // We can't *invoke* lambdas here (unless your runtime exposes them), - // but we verified structural placement. Flipping AtomicBooleans in creation lambdas - // (collection) at least shows one function executed during build (if it is executed now; - // if they are deferred, remove those assertions.) - } - - @Test - @DisplayName("callFn task added and retains name + CallTask union") - void testCallJavaTask() { - Workflow wf = - FuncWorkflowBuilder.workflow("callJavaFlow") - .tasks( - d -> - d.callFn( - "invokeHandler", - cj -> { - // configure your FuncCallTaskBuilder here - // e.g., cj.className("com.acme.Handler").arg("key", "value"); - })) - .build(); - - List items = wf.getDo(); - assertEquals(1, items.size()); - TaskItem ti = items.get(0); - - assertEquals("invokeHandler", ti.getName()); - Task task = ti.getTask(); - assertNotNull(task.getCallTask(), "CallTask should be present for callFn"); - // Additional assertions if FuncCallTaskBuilder populates fields - // e.g., assertEquals("com.acme.Handler", task.getCallTask().getCallJava().getClassName()); - } - - @Test - @DisplayName("switchCaseFn (Java variant) coexists with spec branche") - void testSwitchCaseJava() { - Workflow wf = - FuncWorkflowBuilder.workflow("switchJava") - .tasks( - d -> - d.set("prepare", s -> s.expr("$.ready = true")) - .switchCase( - sw -> { - // configure Java switch builder (cases / predicates) - })) - .build(); - - List items = wf.getDo(); - assertEquals(2, items.size()); - - Task specSet = items.get(0).getTask(); - Task switchTask = items.get(1).getTask(); - - assertNotNull(specSet.getSetTask()); - assertNotNull(switchTask.getSwitchTask(), "SwitchTask union should be present"); - } - - @Test - @DisplayName("Combined: spec set + java forE + callFn inside nested do") - void testCompositeScenario() { - Workflow wf = - FuncWorkflowBuilder.workflow("composite") - .tasks( - d -> - d.set("init", s -> s.expr("$.val = 0")) - .forEach( - j -> - j.collection(ctx -> List.of("a", "b")) - .tasks( - inner -> - inner - .callFn( - cj -> { - // customizing Java call - }) - .set("flag", s -> s.expr("$.flag = true"))))) - .build(); - - assertEquals(2, wf.getDo().size()); - - Task loopHolder = wf.getDo().get(1).getTask(); - ForTaskFunction fn = (ForTaskFunction) loopHolder.getForTask(); - assertNotNull(fn); - - List nested = fn.getDo(); - assertEquals(2, nested.size()); - - Task nestedCall = nested.get(0).getTask(); - Task nestedSet = nested.get(1).getTask(); - - assertNotNull(nestedCall.getCallTask()); - assertNotNull(nestedSet.getSetTask()); - } -} diff --git a/experimental/fluent/pom.xml b/experimental/fluent/pom.xml index 3156456e..57154e0b 100644 --- a/experimental/fluent/pom.xml +++ b/experimental/fluent/pom.xml @@ -28,6 +28,11 @@ serverlessworkflow-fluent-spec ${project.version} + + io.serverlessworkflow + serverlessworkflow-impl-json + ${project.version} + io.serverlessworkflow serverlessworkflow-experimental-agentic diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventConsumptionStrategyBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventConsumptionStrategyBuilder.java index 2857cd7e..2b539471 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventConsumptionStrategyBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventConsumptionStrategyBuilder.java @@ -134,12 +134,16 @@ public final T build() { } if (anySet) { - this.setUntil(until); + this.setUntilForAny(until); } return this.getEventConsumptionStrategy(); } protected abstract T getEventConsumptionStrategy(); - protected abstract void setUntil(Until until); + protected abstract void setUntilForAny(Until until); + + protected void setUntil(Until until) { + this.until = until; + } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventPropertiesBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventPropertiesBuilder.java index dc471399..b9d9baa5 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventPropertiesBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventPropertiesBuilder.java @@ -72,7 +72,12 @@ public SELF data(String expr) { } public SELF data(Object obj) { - eventProperties.setData(new EventData().withObject(obj)); + if (obj instanceof EventData) { + eventProperties.setData((EventData) obj); + } else { + eventProperties.setData(new EventData().withObject(obj)); + } + return self(); } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseDoTaskBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseDoTaskBuilder.java index 476e0e26..94e0c873 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseDoTaskBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseDoTaskBuilder.java @@ -16,6 +16,8 @@ package io.serverlessworkflow.fluent.spec; import io.serverlessworkflow.api.types.DoTask; +import java.util.Objects; +import java.util.function.Consumer; public abstract class BaseDoTaskBuilder< SELF extends BaseDoTaskBuilder, LIST extends BaseTaskItemListBuilder> @@ -39,6 +41,13 @@ protected final LIST listBuilder() { return (LIST) itemsListBuilder; } + @SuppressWarnings("unchecked") + public SELF tasks(Consumer itemsConfigurer) { + Objects.requireNonNull(itemsConfigurer, "itemsConfigurer is required"); + itemsConfigurer.accept(this.listBuilder()); + return (SELF) this; + } + public DoTask build() { doTask.setDo(itemsListBuilder.build()); return doTask; diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseTaskItemListBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseTaskItemListBuilder.java index 33a424fc..d6d2e229 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseTaskItemListBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseTaskItemListBuilder.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.UUID; import java.util.function.Consumer; /** @@ -58,9 +59,12 @@ protected final SELF addTaskItem(TaskItem taskItem) { return self(); } - protected final void requireNameAndConfig(String name, Consumer cfg) { - Objects.requireNonNull(name, "Task name must not be null"); + protected final String defaultNameAndRequireConfig(String name, Consumer cfg) { + if (name == null || name.isBlank()) { + name = UUID.randomUUID().toString(); + } Objects.requireNonNull(cfg, "Configurer must not be null"); + return name; } /** diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseWorkflowBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseWorkflowBuilder.java index 993f325f..b81b2c22 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseWorkflowBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseWorkflowBuilder.java @@ -16,14 +16,12 @@ package io.serverlessworkflow.fluent.spec; import io.serverlessworkflow.api.types.Document; -import io.serverlessworkflow.api.types.Export; import io.serverlessworkflow.api.types.Input; import io.serverlessworkflow.api.types.Output; import io.serverlessworkflow.api.types.TaskItem; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.fluent.spec.spi.TransformationHandlers; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.function.Consumer; @@ -63,13 +61,6 @@ public void setOutput(Output output) { this.workflow.setOutput(output); } - @Override - public void setExport(Export export) { - // TODO: build another interface with only Output and Input - throw new UnsupportedOperationException( - "export() is not supported on the workflow root; only tasks may export"); - } - @Override public void setInput(Input input) { this.workflow.setInput(input); @@ -89,15 +80,36 @@ public SELF use(Consumer useBuilderConsumer) { } public SELF tasks(Consumer doTaskConsumer) { - final DBuilder doTaskBuilder = newDo(); - doTaskConsumer.accept(doTaskBuilder); - if (this.workflow.getDo() == null) { - this.workflow.setDo(doTaskBuilder.build().getDo()); - } else { - List existingTasks = new ArrayList<>(this.workflow.getDo()); - existingTasks.addAll(doTaskBuilder.build().getDo()); - this.workflow.setDo(Collections.unmodifiableList(existingTasks)); - } + return appendDo(doTaskConsumer); + } + + @SafeVarargs + public final SELF tasks(Consumer... tasks) { + // Snapshot and adapt IListBuilder-consumers into a single DBuilder-consumer + final Consumer configurer = + db -> { + if (tasks == null || tasks.length == 0) return; + for (Consumer c : List.of(tasks.clone())) { + if (c != null) db.tasks(c); + } + }; + return appendDo(configurer); + } + + private SELF appendDo(Consumer configurer) { + if (configurer == null) return self(); + + final DBuilder doBuilder = newDo(); + configurer.accept(doBuilder); + + final List newItems = doBuilder.build().getDo(); + if (newItems == null || newItems.isEmpty()) return self(); + + final List merged = + new ArrayList<>(this.workflow.getDo() != null ? this.workflow.getDo() : List.of()); + merged.addAll(newItems); + + this.workflow.setDo(List.copyOf(merged)); return self(); } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EventConsumptionStrategyBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EventConsumptionStrategyBuilder.java index b4591e0d..c9bb9883 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EventConsumptionStrategyBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EventConsumptionStrategyBuilder.java @@ -55,7 +55,7 @@ protected EventConsumptionStrategy getEventConsumptionStrategy() { } @Override - protected void setUntil(Until until) { + protected void setUntilForAny(Until until) { this.eventConsumptionStrategy.getAnyEventConsumptionStrategy().setUntil(until); } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/ListenToBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/ListenToBuilder.java index ca01805e..876be816 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/ListenToBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/ListenToBuilder.java @@ -54,7 +54,7 @@ protected ListenTo getEventConsumptionStrategy() { } @Override - protected void setUntil(Until until) { + protected void setUntilForAny(Until until) { this.listenTo.getAnyEventConsumptionStrategy().setUntil(until); } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/SetTaskBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/SetTaskBuilder.java index ede8f202..6ee2cbf8 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/SetTaskBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/SetTaskBuilder.java @@ -21,13 +21,13 @@ public class SetTaskBuilder extends TaskBaseBuilder { - private final SetTask setTask; + private final SetTask task; private final SetTaskConfiguration setTaskConfiguration; public SetTaskBuilder() { - this.setTask = new SetTask(); + this.task = new SetTask(); this.setTaskConfiguration = new SetTaskConfiguration(); - this.setTask(setTask); + this.setTask(task); } @Override @@ -36,7 +36,7 @@ protected SetTaskBuilder self() { } public SetTaskBuilder expr(String expression) { - this.setTask.setSet(new Set().withString(expression)); + this.task.setSet(new Set().withString(expression)); return this; } @@ -46,10 +46,10 @@ public SetTaskBuilder put(String key, Object value) { } public SetTask build() { - if (this.setTask.getSet() == null) { - this.setTask.setSet(new Set().withSetTaskConfiguration(setTaskConfiguration)); + if (this.task.getSet() == null) { + this.task.setSet(new Set().withSetTaskConfiguration(setTaskConfiguration)); } - return setTask; + return task; } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/SubscriptionIteratorBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/SubscriptionIteratorBuilder.java index 0c9d509b..fe7201a9 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/SubscriptionIteratorBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/SubscriptionIteratorBuilder.java @@ -67,7 +67,7 @@ public SubscriptionIteratorBuilder export(Consumer exportConsu } @Override - public SubscriptionIteratorBuilder exportAs(Object exportAs) { + public SubscriptionIteratorBuilder exportAs(String exportAs) { this.subscriptionIterator.setExport(new ExportBuilder().as(exportAs).build()); return this; } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskBaseBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskBaseBuilder.java index cd6e3a8e..e7c41695 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskBaseBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskBaseBuilder.java @@ -22,11 +22,11 @@ import io.serverlessworkflow.api.types.Output; import io.serverlessworkflow.api.types.TaskBase; import io.serverlessworkflow.fluent.spec.spi.OutputFluent; -import io.serverlessworkflow.fluent.spec.spi.TransformationHandlers; +import io.serverlessworkflow.fluent.spec.spi.TaskTransformationHandlers; import java.util.function.Consumer; public abstract class TaskBaseBuilder> - implements TransformationHandlers, OutputFluent { + implements TaskTransformationHandlers, OutputFluent { private TaskBase task; protected TaskBaseBuilder() {} @@ -80,7 +80,7 @@ public T then(String taskName) { return self(); } - public T exportAs(Object exportAs) { + public T exportAs(String exportAs) { this.task.setExport(new ExportBuilder().as(exportAs).build()); return self(); } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java index 4c82f62a..5cfba36c 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java @@ -45,7 +45,7 @@ protected TaskItemListBuilder newItemListBuilder() { @Override public TaskItemListBuilder set(String name, Consumer itemsConfigurer) { - requireNameAndConfig(name, itemsConfigurer); + name = defaultNameAndRequireConfig(name, itemsConfigurer); final SetTaskBuilder setBuilder = new SetTaskBuilder(); itemsConfigurer.accept(setBuilder); return addTaskItem(new TaskItem(name, new Task().withSetTask(setBuilder.build()))); @@ -59,7 +59,7 @@ public TaskItemListBuilder set(String name, final String expr) { @Override public TaskItemListBuilder forEach( String name, Consumer> itemsConfigurer) { - requireNameAndConfig(name, itemsConfigurer); + name = defaultNameAndRequireConfig(name, itemsConfigurer); final ForEachTaskBuilder forBuilder = new ForEachTaskBuilder<>(newItemListBuilder()); itemsConfigurer.accept(forBuilder); @@ -68,7 +68,7 @@ public TaskItemListBuilder forEach( @Override public TaskItemListBuilder switchCase(String name, Consumer itemsConfigurer) { - requireNameAndConfig(name, itemsConfigurer); + name = defaultNameAndRequireConfig(name, itemsConfigurer); final SwitchTaskBuilder switchBuilder = new SwitchTaskBuilder(); itemsConfigurer.accept(switchBuilder); return addTaskItem(new TaskItem(name, new Task().withSwitchTask(switchBuilder.build()))); @@ -76,7 +76,7 @@ public TaskItemListBuilder switchCase(String name, Consumer i @Override public TaskItemListBuilder raise(String name, Consumer itemsConfigurer) { - requireNameAndConfig(name, itemsConfigurer); + name = defaultNameAndRequireConfig(name, itemsConfigurer); final RaiseTaskBuilder raiseBuilder = new RaiseTaskBuilder(); itemsConfigurer.accept(raiseBuilder); return addTaskItem(new TaskItem(name, new Task().withRaiseTask(raiseBuilder.build()))); @@ -84,7 +84,7 @@ public TaskItemListBuilder raise(String name, Consumer itemsCo @Override public TaskItemListBuilder fork(String name, Consumer itemsConfigurer) { - requireNameAndConfig(name, itemsConfigurer); + name = defaultNameAndRequireConfig(name, itemsConfigurer); final ForkTaskBuilder forkBuilder = new ForkTaskBuilder(); itemsConfigurer.accept(forkBuilder); return addTaskItem(new TaskItem(name, new Task().withForkTask(forkBuilder.build()))); @@ -92,7 +92,7 @@ public TaskItemListBuilder fork(String name, Consumer itemsConf @Override public TaskItemListBuilder listen(String name, Consumer itemsConfigurer) { - requireNameAndConfig(name, itemsConfigurer); + name = defaultNameAndRequireConfig(name, itemsConfigurer); final ListenTaskBuilder listenBuilder = new ListenTaskBuilder(); itemsConfigurer.accept(listenBuilder); return addTaskItem(new TaskItem(name, new Task().withListenTask(listenBuilder.build()))); @@ -100,7 +100,7 @@ public TaskItemListBuilder listen(String name, Consumer items @Override public TaskItemListBuilder emit(String name, Consumer itemsConfigurer) { - requireNameAndConfig(name, itemsConfigurer); + name = defaultNameAndRequireConfig(name, itemsConfigurer); final EmitTaskBuilder emitBuilder = new EmitTaskBuilder(); itemsConfigurer.accept(emitBuilder); return addTaskItem(new TaskItem(name, new Task().withEmitTask(emitBuilder.build()))); @@ -109,7 +109,7 @@ public TaskItemListBuilder emit(String name, Consumer itemsConf @Override public TaskItemListBuilder tryCatch( String name, Consumer> itemsConfigurer) { - requireNameAndConfig(name, itemsConfigurer); + name = defaultNameAndRequireConfig(name, itemsConfigurer); final TryTaskBuilder tryBuilder = new TryTaskBuilder<>(this.newItemListBuilder()); itemsConfigurer.accept(tryBuilder); @@ -118,7 +118,7 @@ public TaskItemListBuilder tryCatch( @Override public TaskItemListBuilder callHTTP(String name, Consumer itemsConfigurer) { - requireNameAndConfig(name, itemsConfigurer); + name = defaultNameAndRequireConfig(name, itemsConfigurer); final CallHTTPTaskBuilder callHTTPBuilder = new CallHTTPTaskBuilder(); itemsConfigurer.accept(callHTTPBuilder); return addTaskItem( diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/BaseListenSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/BaseListenSpec.java new file mode 100644 index 00000000..fbf45545 --- /dev/null +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/BaseListenSpec.java @@ -0,0 +1,123 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.spec.dsl; + +import java.util.Objects; +import java.util.function.Consumer; + +/** + * Generic base for Listen specs. + * + *

Type params: SELF - fluent self type (the concrete spec) LB - ListenTaskBuilder type (e.g., + * ListenTaskBuilder, AgentListenTaskBuilder, FuncListenTaskBuilder) TB - ListenToBuilder type + * (e.g., ListenToBuilder, FuncListenToBuilder) FB - EventFilterBuilder type (e.g., + * EventFilterBuilder, FuncEventFilterBuilder) EC - Event configurer type (e.g., EventConfigurer, + * FuncPredicateEventConfigurer) + */ +public abstract class BaseListenSpec { + + @FunctionalInterface + public interface ToInvoker { + void to(LB listenTaskBuilder, Consumer toStep); + } + + @FunctionalInterface + public interface WithApplier { + void with(FB filterBuilder, EC eventConfigurer); + } + + @FunctionalInterface + public interface FiltersApplier { + void apply(TB toBuilder, @SuppressWarnings("rawtypes") Consumer[] filters); + } + + @FunctionalInterface + public interface OneFilterApplier { + void apply(TB toBuilder, Consumer filter); + } + + private final ToInvoker toInvoker; + private final WithApplier withApplier; + private final FiltersApplier allApplier; + private final FiltersApplier anyApplier; + private final OneFilterApplier oneApplier; + + private Consumer strategyStep; + private Consumer untilStep; + + protected BaseListenSpec( + ToInvoker toInvoker, + WithApplier withApplier, + FiltersApplier allApplier, + FiltersApplier anyApplier, + OneFilterApplier oneApplier) { + + this.toInvoker = Objects.requireNonNull(toInvoker, "toInvoker"); + this.withApplier = Objects.requireNonNull(withApplier, "withApplier"); + this.allApplier = Objects.requireNonNull(allApplier, "allApplier"); + this.anyApplier = Objects.requireNonNull(anyApplier, "anyApplier"); + this.oneApplier = Objects.requireNonNull(oneApplier, "oneApplier"); + } + + protected abstract SELF self(); + + protected final void setUntilStep(Consumer untilStep) { + this.untilStep = untilStep; + } + + /** Convert EC[] -> Consumer[] that call `filterBuilder.with(event)` */ + @SuppressWarnings("unchecked") + protected final Consumer[] asFilters(EC... events) { + Objects.requireNonNull(events, "events"); + Consumer[] filters = new Consumer[events.length]; + for (int i = 0; i < events.length; i++) { + EC ev = Objects.requireNonNull(events[i], "events[" + i + "]"); + filters[i] = fb -> withApplier.with(fb, ev); + } + return filters; + } + + @SafeVarargs + public final SELF all(EC... events) { + strategyStep = t -> allApplier.apply(t, asFilters(events)); + return self(); + } + + @SafeVarargs + public final SELF any(EC... events) { + strategyStep = t -> anyApplier.apply(t, asFilters(events)); + return self(); + } + + public final SELF one(EC event) { + Objects.requireNonNull(event, "event"); + strategyStep = t -> oneApplier.apply(t, fb -> withApplier.with(fb, event)); + return self(); + } + + /** Concrete 'accept' should delegate here with its concrete LB. */ + protected final void acceptInto(LB listenTaskBuilder) { + Objects.requireNonNull(strategyStep, "listening strategy must be set (all/any/one)"); + toInvoker.to( + listenTaskBuilder, + t -> { + strategyStep.accept(t); + if (untilStep != null) { + untilStep.accept(t); + } + }); + } +} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/DSL.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/DSL.java index 8da5bdbe..0863ef25 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/DSL.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/DSL.java @@ -16,6 +16,7 @@ package io.serverlessworkflow.fluent.spec.dsl; import io.serverlessworkflow.api.types.OAuth2AuthenticationData; +import io.serverlessworkflow.fluent.spec.DoTaskBuilder; import io.serverlessworkflow.fluent.spec.EmitTaskBuilder; import io.serverlessworkflow.fluent.spec.ForkTaskBuilder; import io.serverlessworkflow.fluent.spec.TaskItemListBuilder; @@ -236,7 +237,15 @@ public static TasksConfigurer tryCatch(TryConfigurer configurer) { } // ----- Tasks that requires tasks list --// - public static Consumer tasks(TasksConfigurer... steps) { + + /** Main task list to be used in `workflow().tasks()` consumer. */ + public static Consumer doTasks(TasksConfigurer... steps) { + final Consumer tasks = tasks(steps); + return d -> d.tasks(tasks); + } + + /** Task list for tasks that requires it such as `for`, `try`, and so on. */ + public static TasksConfigurer tasks(TasksConfigurer... steps) { Objects.requireNonNull(steps, "Steps in a tasks are required"); final List snapshot = List.of(steps.clone()); return list -> snapshot.forEach(s -> s.accept(list)); diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EmitSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EmitSpec.java index 8c4115bc..f07dd77c 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EmitSpec.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EmitSpec.java @@ -17,9 +17,8 @@ import io.serverlessworkflow.fluent.spec.EmitTaskBuilder; import io.serverlessworkflow.fluent.spec.configurers.EmitConfigurer; -import io.serverlessworkflow.fluent.spec.configurers.EventConfigurer; -public final class EmitSpec extends EventFilterSpec implements EmitConfigurer { +public final class EmitSpec extends ExprEventFilterSpec implements EmitConfigurer { @Override protected EmitSpec self() { @@ -28,11 +27,6 @@ protected EmitSpec self() { @Override public void accept(EmitTaskBuilder emitTaskBuilder) { - emitTaskBuilder.event( - e -> { - for (EventConfigurer step : steps) { - step.accept(e); - } - }); + emitTaskBuilder.event(e -> getSteps().forEach(step -> step.accept(e))); } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventFilterSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventFilterSpec.java index 7c47a07b..87388369 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventFilterSpec.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventFilterSpec.java @@ -15,21 +15,32 @@ */ package io.serverlessworkflow.fluent.spec.dsl; -import io.serverlessworkflow.fluent.spec.configurers.EventConfigurer; +import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; import java.net.URI; import java.time.Instant; -import java.util.ArrayList; import java.util.Date; import java.util.List; -import java.util.Map; import java.util.UUID; +import java.util.function.Consumer; -public abstract class EventFilterSpec { +public abstract class EventFilterSpec> { - protected final List steps = new ArrayList<>(); + private final List> steps; + + protected EventFilterSpec(List> steps) { + this.steps = steps; + } protected abstract SELF self(); + protected void addStep(Consumer step) { + steps.add(step); + } + + protected List> getSteps() { + return steps; + } + public SELF type(String eventType) { steps.add(e -> e.type(eventType)); return self(); @@ -47,22 +58,20 @@ public SELF now() { return self(); } + public SELF contentType(String ct) { + steps.add(e -> e.dataContentType(ct)); + return self(); + } + /** Sets the CloudEvent dataContentType to `application/json` */ public SELF JSON() { steps.add(e -> e.dataContentType("application/json")); return self(); } - /** Sets the event data and the contentType to `application/json` */ - public SELF jsonData(String expr) { - steps.add(e -> e.data(expr)); - return JSON(); - } - - /** Sets the event data and the contentType to `application/json` */ - public SELF jsonData(Map data) { - steps.add(e -> e.data(data)); - return JSON(); + public SELF OCTET_STREAM() { + steps.add(e -> e.dataContentType("application/octet-stream")); + return self(); } public SELF source(String source) { diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventSpec.java index a7a41a80..3e93c9d2 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventSpec.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/EventSpec.java @@ -18,7 +18,7 @@ import io.serverlessworkflow.fluent.spec.EventPropertiesBuilder; import io.serverlessworkflow.fluent.spec.configurers.EventConfigurer; -public final class EventSpec extends EventFilterSpec implements EventConfigurer { +public final class EventSpec extends ExprEventFilterSpec implements EventConfigurer { @Override protected EventSpec self() { @@ -27,6 +27,6 @@ protected EventSpec self() { @Override public void accept(EventPropertiesBuilder eventPropertiesBuilder) { - steps.forEach(step -> step.accept(eventPropertiesBuilder)); + getSteps().forEach(step -> step.accept(eventPropertiesBuilder)); } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ExprEventFilterSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ExprEventFilterSpec.java new file mode 100644 index 00000000..4d485038 --- /dev/null +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ExprEventFilterSpec.java @@ -0,0 +1,40 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.spec.dsl; + +import io.serverlessworkflow.fluent.spec.EventPropertiesBuilder; +import java.util.ArrayList; +import java.util.Map; + +public abstract class ExprEventFilterSpec + extends EventFilterSpec { + + ExprEventFilterSpec() { + super(new ArrayList<>()); + } + + /** Sets the event data and the contentType to `application/json` */ + public SELF jsonData(String expr) { + addStep(e -> e.data(expr)); + return JSON(); + } + + /** Sets the event data and the contentType to `application/json` */ + public SELF jsonData(Map data) { + addStep(e -> e.data(data)); + return JSON(); + } +} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ListenSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ListenSpec.java index ca538911..56d7a38f 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ListenSpec.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/ListenSpec.java @@ -15,57 +15,53 @@ */ package io.serverlessworkflow.fluent.spec.dsl; +import io.serverlessworkflow.fluent.spec.AbstractEventConsumptionStrategyBuilder; +import io.serverlessworkflow.fluent.spec.AbstractEventFilterBuilder; +import io.serverlessworkflow.fluent.spec.AbstractListenTaskBuilder; import io.serverlessworkflow.fluent.spec.EventFilterBuilder; import io.serverlessworkflow.fluent.spec.ListenTaskBuilder; import io.serverlessworkflow.fluent.spec.ListenToBuilder; import io.serverlessworkflow.fluent.spec.configurers.EventConfigurer; -import io.serverlessworkflow.fluent.spec.configurers.ListenConfigurer; import java.util.Objects; import java.util.function.Consumer; -public final class ListenSpec implements ListenConfigurer { +public final class ListenSpec + extends BaseListenSpec< + ListenSpec, ListenTaskBuilder, ListenToBuilder, EventFilterBuilder, EventConfigurer> + implements io.serverlessworkflow.fluent.spec.configurers.ListenConfigurer { - private Consumer strategyStep; - private Consumer untilStep; - - @SuppressWarnings("unchecked") - private static Consumer[] asFilters(EventConfigurer[] events) { - Consumer[] filters = new Consumer[events.length]; - for (int i = 0; i < events.length; i++) { - EventConfigurer ev = Objects.requireNonNull(events[i], "events[" + i + "]"); - filters[i] = f -> f.with(ev); - } - return filters; - } - - public ListenSpec all(EventConfigurer... events) { - strategyStep = t -> t.all(asFilters(events)); - return this; + public ListenSpec() { + super( + // toInvoker + AbstractListenTaskBuilder::to, + // withApplier + AbstractEventFilterBuilder::with, + // allApplier + (tb, filters) -> tb.all(castFilters(filters)), + // anyApplier + (tb, filters) -> tb.any(castFilters(filters)), + // oneApplier + AbstractEventConsumptionStrategyBuilder::one); } - public ListenSpec one(EventConfigurer e) { - strategyStep = t -> t.one(f -> f.with(e)); - return this; + @SuppressWarnings("unchecked") + private static Consumer[] castFilters(Consumer[] arr) { + return (Consumer[]) arr; } - public ListenSpec any(EventConfigurer... events) { - strategyStep = t -> t.any(asFilters(events)); + @Override + protected ListenSpec self() { return this; } public ListenSpec until(String expression) { - untilStep = t -> t.until(expression); - return this; + Objects.requireNonNull(expression, "expression"); + this.setUntilStep(u -> u.until(expression)); + return self(); } @Override public void accept(ListenTaskBuilder listenTaskBuilder) { - listenTaskBuilder.to( - t -> { - strategyStep.accept(t); - if (untilStep != null) { - untilStep.accept(t); - } - }); + acceptInto(listenTaskBuilder); } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/OutputFluent.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/OutputFluent.java index cc49a43c..b66bbd75 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/OutputFluent.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/OutputFluent.java @@ -25,5 +25,5 @@ public interface OutputFluent { SELF export(Consumer exportConsumer); - SELF exportAs(Object exportAs); + SELF exportAs(String exportAs); } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/TaskTransformationHandlers.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/TaskTransformationHandlers.java new file mode 100644 index 00000000..531e02b9 --- /dev/null +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/TaskTransformationHandlers.java @@ -0,0 +1,23 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.spec.spi; + +import io.serverlessworkflow.api.types.Export; + +public interface TaskTransformationHandlers extends TransformationHandlers { + + void setExport(final Export export); +} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/TransformationHandlers.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/TransformationHandlers.java index 5894109f..c4183ef3 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/TransformationHandlers.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/TransformationHandlers.java @@ -15,15 +15,11 @@ */ package io.serverlessworkflow.fluent.spec.spi; -import io.serverlessworkflow.api.types.Export; import io.serverlessworkflow.api.types.Input; import io.serverlessworkflow.api.types.Output; public interface TransformationHandlers { - void setOutput(final Output output); - void setExport(final Export export); - void setInput(final Input input); }