From 392693cf586cfc13911b033e61ff5cf747315044 Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Wed, 20 Aug 2025 17:16:08 -0400 Subject: [PATCH] Add HITL Use Cases; Fix DSL Event Consumption Signed-off-by: Ricardo Zanini --- .../workflow/impl/FluentDSLCallTest.java | 2 +- .../api/types/func/EventDataFunction.java | 34 ++++ .../api/types/func/EventDataPredicate.java | 34 ++++ fluent/agentic/pom.xml | 10 ++ .../agentic/AgentListenTaskBuilder.java | 18 ++- .../fluent/agentic/Agents.java | 153 ++++++++++++------ .../fluent/agentic/ChatBotIT.java | 145 +++++------------ .../agentic/CloudEventsTestBuilder.java | 42 +++++ .../fluent/agentic/EmailDrafterIT.java | 148 +++++++++++++++++ .../fluent/agentic/MixedWorkflowIT.java | 104 ++++++++++++ .../src/test/java/org/acme/EmailDraft.java | 20 +++ .../src/test/java/org/acme/EmailDrafts.java | 38 +++++ .../src/test/java/org/acme/EmailPolicies.java | 69 ++++++++ .../test/java/org/acme/PolicyDecision.java | 21 +++ fluent/func/pom.xml | 4 + .../fluent/func/FuncEmitTaskBuilder.java | 15 +- .../fluent/func/FuncEventFilterBuilder.java | 33 ++++ .../func/FuncEventPropertiesBuilder.java | 40 +++++ .../fluent/func/FuncListenTaskBuilder.java | 15 +- .../fluent/func/FuncListenToBuilder.java | 62 +++++++ .../FuncPredicateEventPropertiesBuilder.java | 35 ++++ .../fluent/func/FuncSwitchTaskBuilder.java | 26 +-- .../fluent/spec/AbstractEmitTaskBuilder.java | 48 ++++++ ...stractEventConsumptionStrategyBuilder.java | 54 +++++-- .../spec/AbstractEventFilterBuilder.java | 51 ++++++ .../spec/AbstractEventPropertiesBuilder.java | 82 ++++++++++ .../spec/AbstractListenTaskBuilder.java | 87 ++++++++++ .../fluent/spec/DoTaskBuilder.java | 3 +- .../fluent/spec/EmitTaskBuilder.java | 29 +--- .../spec/EventConsumptionStrategyBuilder.java | 17 +- .../fluent/spec/EventFilterBuilder.java | 33 +--- .../fluent/spec/EventPropertiesBuilder.java | 60 +------ .../fluent/spec/ListenTaskBuilder.java | 69 +------- .../fluent/spec/ListenToBuilder.java | 19 ++- .../fluent/spec/SwitchTaskBuilder.java | 2 +- .../fluent/spec/TaskItemListBuilder.java | 6 +- .../fluent/spec/spi/DoFluent.java | 2 +- .../spi/EventConsumptionStrategyFluent.java | 12 +- .../fluent/spec/spi/SwitchTaskFluent.java | 20 ++- impl/pom.xml | 11 -- pom.xml | 11 ++ 41 files changed, 1285 insertions(+), 399 deletions(-) create mode 100644 experimental/types/src/main/java/io/serverlessworkflow/api/types/func/EventDataFunction.java create mode 100644 experimental/types/src/main/java/io/serverlessworkflow/api/types/func/EventDataPredicate.java create mode 100644 fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/CloudEventsTestBuilder.java create mode 100644 fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/EmailDrafterIT.java create mode 100644 fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/MixedWorkflowIT.java create mode 100644 fluent/agentic/src/test/java/org/acme/EmailDraft.java create mode 100644 fluent/agentic/src/test/java/org/acme/EmailDrafts.java create mode 100644 fluent/agentic/src/test/java/org/acme/EmailPolicies.java create mode 100644 fluent/agentic/src/test/java/org/acme/PolicyDecision.java create mode 100644 fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterBuilder.java create mode 100644 fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventPropertiesBuilder.java create mode 100644 fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenToBuilder.java create mode 100644 fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncPredicateEventPropertiesBuilder.java create mode 100644 fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEmitTaskBuilder.java create mode 100644 fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java create mode 100644 fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventPropertiesBuilder.java create mode 100644 fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractListenTaskBuilder.java diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/FluentDSLCallTest.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/FluentDSLCallTest.java index cc34108d..a3f08c1e 100644 --- a/experimental/lambda/src/test/java/io/serverless/workflow/impl/FluentDSLCallTest.java +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/FluentDSLCallTest.java @@ -82,7 +82,7 @@ void testSwitch() throws InterruptedException, ExecutionException { tasks .switchCase( switchOdd -> - switchOdd.functions( + switchOdd.onPredicate( item -> item.when(CallTest::isOdd).then(FlowDirectiveEnum.END))) .callFn(callJava -> callJava.function(CallTest::zero))) diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/EventDataFunction.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/EventDataFunction.java new file mode 100644 index 00000000..7c719389 --- /dev/null +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/EventDataFunction.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.api.types.func; + +import io.serverlessworkflow.api.types.EventData; +import java.util.Objects; +import java.util.function.Function; + +public class EventDataFunction extends EventData { + + public EventData withFunction(Function value) { + setObject(value); + return this; + } + + public EventData withFunction(Function value, Class argClass) { + Objects.requireNonNull(argClass); + setObject(new TypedFunction<>(value, argClass)); + return this; + } +} diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/EventDataPredicate.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/EventDataPredicate.java new file mode 100644 index 00000000..e42612ca --- /dev/null +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/EventDataPredicate.java @@ -0,0 +1,34 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.api.types.func; + +import io.serverlessworkflow.api.types.EventData; +import java.util.Objects; +import java.util.function.Predicate; + +public class EventDataPredicate extends EventData { + + public EventDataPredicate withPredicate(Predicate predicate) { + setObject(predicate); + return this; + } + + public EventDataPredicate withPredicate(Predicate predicate, Class clazz) { + Objects.requireNonNull(clazz); + setObject(new TypedPredicate<>(predicate, clazz)); + return this; + } +} diff --git a/fluent/agentic/pom.xml b/fluent/agentic/pom.xml index 8946e189..e168d9bd 100644 --- a/fluent/agentic/pom.xml +++ b/fluent/agentic/pom.xml @@ -57,6 +57,16 @@ serverlessworkflow-experimental-agentic test + + io.cloudevents + cloudevents-json-jackson + test + + + io.serverlessworkflow + serverlessworkflow-impl-jackson + ${project.version} + diff --git a/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentListenTaskBuilder.java b/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentListenTaskBuilder.java index 5a9b9359..141e711c 100644 --- a/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentListenTaskBuilder.java +++ b/fluent/agentic/src/main/java/io/serverlessworkflow/fluent/agentic/AgentListenTaskBuilder.java @@ -18,10 +18,14 @@ import io.serverlessworkflow.api.types.AnyEventConsumptionStrategy; import io.serverlessworkflow.api.types.ListenTask; import io.serverlessworkflow.api.types.func.UntilPredicate; -import io.serverlessworkflow.fluent.spec.ListenTaskBuilder; +import io.serverlessworkflow.fluent.func.FuncListenToBuilder; +import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; +import io.serverlessworkflow.fluent.spec.AbstractListenTaskBuilder; import java.util.function.Predicate; -public class AgentListenTaskBuilder extends ListenTaskBuilder { +public class AgentListenTaskBuilder + extends AbstractListenTaskBuilder + implements ConditionalTaskBuilder { private UntilPredicate untilPredicate; @@ -29,6 +33,16 @@ public AgentListenTaskBuilder() { super(new AgentTaskItemListBuilder()); } + @Override + protected AgentListenTaskBuilder self() { + return this; + } + + @Override + protected FuncListenToBuilder newEventConsumptionStrategyBuilder() { + return new FuncListenToBuilder(); + } + public AgentListenTaskBuilder until(Predicate predicate, Class predClass) { untilPredicate = new UntilPredicate().withPredicate(predicate, predClass); return this; diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Agents.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Agents.java index e8b53006..5c10764e 100644 --- a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Agents.java +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/Agents.java @@ -26,9 +26,9 @@ public interface Agents { interface ChatBot { @UserMessage( """ - You are a happy chat bot, reply to my message: - {{userInput}}. - """) + You are a happy chat bot, reply to my message: + {{userInput}}. + """) @Agent String chat(@V("userInput") String userInput); } @@ -37,11 +37,11 @@ interface MovieExpert { @UserMessage( """ - You are a great evening planner. - Propose a list of 3 movies matching the given mood. - The mood is {{mood}}. - Provide a list with the 3 items and nothing else. - """) + You are a great evening planner. + Propose a list of 3 movies matching the given mood. + The mood is {{mood}}. + Provide a list with the 3 items and nothing else. + """) @Agent List findMovie(@V("mood") String mood); } @@ -50,10 +50,10 @@ interface SettingAgent extends AgentSpecification { @UserMessage( """ - Create a vivid {{style}} setting. It should include the time period, the state of technology, - key locations, and a brief description of the world’s political or social situation. - Make it imaginative, atmospheric, and suitable for a {{style}} novel. - """) + Create a vivid {{style}} setting. It should include the time period, the state of technology, + key locations, and a brief description of the world’s political or social situation. + Make it imaginative, atmospheric, and suitable for a {{style}} novel. + """) @Agent( "Generates an imaginative setting including timeline, technology level, and world structure") String invoke(@V("style") String style); @@ -63,9 +63,9 @@ interface HeroAgent extends AgentSpecification { @UserMessage( """ - Invent a compelling protagonist for a {{style}} story. Describe their background, personality, - motivations, and any unique skills or traits. - """) + Invent a compelling protagonist for a {{style}} story. Describe their background, personality, + motivations, and any unique skills or traits. + """) @Agent("Creates a unique and relatable protagonist with rich backstory and motivations.") String invoke(@V("style") String style); } @@ -74,10 +74,10 @@ interface ConflictAgent extends AgentSpecification { @UserMessage( """ - Generate a central conflict or threat for a {{style}} plot. It can be external or - internal (e.g. moral dilemma, personal transformation). - Make it high-stakes and thematically rich. - """) + Generate a central conflict or threat for a {{style}} plot. It can be external or + internal (e.g. moral dilemma, personal transformation). + Make it high-stakes and thematically rich. + """) @Agent("Proposes a central conflict or dramatic tension to drive a compelling narrative.") String invoke(@V("style") String style); } @@ -86,8 +86,8 @@ interface FactAgent extends AgentSpecification { @UserMessage( """ - Generate a unique sci-fi fact about an alien civilization's {{goal}} environment or evolutionary history. Make it imaginative and specific. - """) + Generate a unique sci-fi fact about an alien civilization's {{goal}} environment or evolutionary history. Make it imaginative and specific. + """) @Agent("Generates a core fact that defines the foundation of an civilization.") String invoke(@V("fact") String fact); } @@ -96,10 +96,10 @@ interface CultureAgent extends AgentSpecification { @UserMessage( """ - Given the following sci-fi fact about an civilization, describe 3–5 unique cultural traits, traditions, or societal structures that naturally emerge from this environment. - Fact: - {{fact}} - """) + Given the following sci-fi fact about an civilization, describe 3–5 unique cultural traits, traditions, or societal structures that naturally emerge from this environment. + Fact: + {{fact}} + """) @Agent("Derives cultural traits from the environmental/evolutionary fact.") List invoke(@V("fact") String fact); } @@ -108,10 +108,10 @@ interface TechnologyAgent extends AgentSpecification { @UserMessage( """ - Given the following sci-fi fact about an alien civilization, describe 3–5 technologies or engineering solutions they might have developed. Focus on tools, transportation, communication, and survival systems. - Fact: - {{fact}} - """) + Given the following sci-fi fact about an alien civilization, describe 3–5 technologies or engineering solutions they might have developed. Focus on tools, transportation, communication, and survival systems. + Fact: + {{fact}} + """) @Agent("Derives plausible technological inventions from the fact.") List invoke(@V("fact") String fact); } @@ -120,9 +120,9 @@ interface StorySeedAgent extends AgentSpecification { @UserMessage( """ - You are a science fiction writer. Given the following title, come up with a short story premise. Describe the world, the central concept, and the thematic direction (e.g., dystopia, exploration, AI ethics). - Title: {{title}} - """) + You are a science fiction writer. Given the following title, come up with a short story premise. Describe the world, the central concept, and the thematic direction (e.g., dystopia, exploration, AI ethics). + Title: {{title}} + """) @Agent("Generates a high-level sci-fi premise based on a title.") String invoke(@V("title") String title); } @@ -131,10 +131,10 @@ interface PlotAgent extends AgentSpecification { @UserMessage( """ - Using the following premise, outline a three-act structure for a science fiction short story. Include a brief description of the main character, the inciting incident, the rising conflict, and the resolution. - Premise: - {{premise}} - """) + Using the following premise, outline a three-act structure for a science fiction short story. Include a brief description of the main character, the inciting incident, the rising conflict, and the resolution. + Premise: + {{premise}} + """) @Agent("Transforms a premise into a structured sci-fi plot.") String invoke(@V("premise") String premise); } @@ -143,10 +143,10 @@ interface SceneAgent extends AgentSpecification { @UserMessage( """ - Write the opening scene of a science fiction short story based on the following plot outline. Introduce the main character and immerse the reader in the setting. Use vivid, cinematic language. - Plot: - {{plot}} - """) + Write the opening scene of a science fiction short story based on the following plot outline. Introduce the main character and immerse the reader in the setting. Use vivid, cinematic language. + Plot: + {{plot}} + """) @Agent("Generates the opening scene of the story from a plot outline.") String invoke(@V("plot") String plot); } @@ -155,13 +155,13 @@ interface MeetingInvitationDraft extends AgentSpecification { @UserMessage( """ - You are a professional meeting invitation writer. Draft a concise and clear meeting invitation email based on the following details: - Subject: {{subject}} - Date: {{date}} - Time: {{time}} - Location: {{location}} - Agenda: {{agenda}} - """) + You are a professional meeting invitation writer. Draft a concise and clear meeting invitation email based on the following details: + Subject: {{subject}} + Date: {{date}} + Time: {{time}} + Location: {{location}} + Agenda: {{agenda}} + """) @Agent("Drafts a professional meeting invitation email.") String invoke( @V("subject") String subject, @@ -175,10 +175,63 @@ interface MeetingInvitationStyle extends AgentSpecification { @UserMessage( """ - You are a professional meeting invitation writer. Rewrite the following meeting invitation email to better fit the {{style}} style: - Original Invitation: {{invitation}} - """) + You are a professional meeting invitation writer. Rewrite the following meeting invitation email to better fit the {{style}} style: + Original Invitation: {{invitation}} + """) @Agent("Edits a meeting invitation email to better fit a given style.") String invoke(@V("invitation") String invitation, @V("style") String style); } + + interface EmailDrafter { + + @UserMessage( + """ + You are a precise email drafting assistant. + + GOAL + - Draft a professional email that achieves the stated purpose. + - Keep it concise and skimmable. + + INPUT + recipient_name: {{recipientName}} + sender_name: {{senderName}} + purpose: {{purpose}} // e.g., follow-up, scheduling, proposal, apology, onboarding + key_points: {{keyPoints}} // bullet list or comma-separated facts + tone: {{tone}} // e.g., friendly, neutral, formal + length: {{length}} // short|medium + call_to_action: {{cta}} // e.g., "reply with a time", "confirm receipt", or empty + signature: {{signature}} // prebuilt block; do NOT invent + allowed_domains: {{allowedDomains}} // e.g., ["acme.com","example.com"] + known_links: {{links}} // URLs you may use; if not in allowed_domains, do not include + + HARD RULES + - Never fabricate facts, prices, or promises. + - Only include links from allowed_domains and only those listed in known_links. + - Do not include internal/confidential URLs. + - If you lack a detail, write a neutral placeholder (e.g., "[DATE]"). + - Keep subject <= 60 characters if possible. + - One clear CTA max. + + OUTPUT + Return ONLY a compact JSON object with keys: + { + "subject": "...", + "body_plain": "...", + "links": ["..."] // subset of known_links, or empty + } + No markdown, no explanations, no extra text. + """) + @Agent("Drafts a new outbound email from structured inputs; returns JSON.") + String draftNew( + @V("recipientName") String recipientName, + @V("senderName") String senderName, + @V("purpose") String purpose, + @V("keyPoints") List keyPoints, + @V("tone") String tone, + @V("length") String length, + @V("cta") String cta, + @V("signature") String signature, + @V("allowedDomains") List allowedDomains, + @V("links") List links); + } } diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java index 8f696ef1..74e4c417 100644 --- a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/ChatBotIT.java @@ -17,46 +17,41 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.spy; +import com.fasterxml.jackson.databind.ObjectMapper; import dev.langchain4j.agentic.AgenticServices; -import dev.langchain4j.agentic.scope.AgenticScope; import dev.langchain4j.memory.chat.MessageWindowChatMemory; import io.cloudevents.CloudEvent; -import io.cloudevents.core.v1.CloudEventBuilder; +import io.cloudevents.jackson.JsonCloudEventData; import io.serverlessworkflow.api.types.Workflow; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.events.InMemoryEvents; -import java.net.URI; -import java.time.OffsetDateTime; +import io.serverlessworkflow.impl.jackson.JsonUtils; import java.util.Map; -import java.util.Optional; -import java.util.UUID; +import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; public class ChatBotIT { @Test @SuppressWarnings("unchecked") - @Disabled("Figuring out event processing") void chat_bot() { + final ObjectMapper mapper = new ObjectMapper(); Agents.ChatBot chatBot = spy( AgenticServices.agentBuilder(Agents.ChatBot.class) .chatModel(Models.BASE_MODEL) - // .chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10)) + .chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10)) .outputName("conversation") .build()); BlockingQueue replyEvents = new LinkedBlockingQueue<>(); @@ -98,7 +93,23 @@ void chat_bot() { emit.event( e -> e.type( - "org.acme.chatbot.reply")))))) + "org.acme.chatbot.reply") + .data( + convo -> { + var node = + JsonUtils + .object() + .put( + "conversation", + convo + .getOrDefault( + "conversation", + "") + .toString()); + return JsonCloudEventData + .wrap(node); + }, + Map.class)))))) .emit(emit -> emit.event(e -> e.type("org.acme.chatbot.finished")))) .build(); @@ -119,12 +130,24 @@ void chat_bot() { assertEquals(WorkflowStatus.WAITING, waitingInstance.status()); // Publish the events - eventBroker.publish(newMessageEvent("Hello World!")); + + eventBroker.publish(newRequestMessage("Hi! Can you tell me a good duck joke?")); + CloudEvent reply = replyEvents.poll(60, TimeUnit.SECONDS); assertNotNull(reply); + eventBroker.publish(newRequestMessage("Oh I didn't like this one, please tell me another.")); + reply = replyEvents.poll(60, TimeUnit.SECONDS); + assertNotNull(reply); + assertThat( + ((JsonCloudEventData) Objects.requireNonNull(reply.getData())) + .getNode() + .get("conversation") + .asText()) + .contains("No worries"); + // Empty message completes the workflow - eventBroker.publish(newMessageEvent("", "org.acme.chatbot.finalize")); + eventBroker.publish(newFinalizeMessage()); CloudEvent finished = finishedEvents.poll(60, TimeUnit.SECONDS); assertNotNull(finished); assertThat(finishedEvents).isEmpty(); @@ -134,99 +157,15 @@ void chat_bot() { } catch (InterruptedException e) { fail(e.getMessage()); - } finally { } } - /** - * In this test we validate a workflow mixed with agents and regular Java calls - * - *

- * - *

    - *
  1. The first function prints the message input and converts the data into a Map for the - * agent ingestion - *
  2. Internally, our factories will add the output to a new AgenticScope since under the hood, - * we are call `as(AgenticScope)` - *
  3. The agent is then called with a scope with a state as `message="input"` - *
  4. The agent updates the state automatically in the AgenticScope and returns the message as - * a string, this string is then served to the next task - *
  5. The next task process the agent response and returns it ending the workflow. Meanwhile, - * the AgenticScope is always updated with the latest result from the given task. - *
- */ - @Test - void mixed_workflow() { - Agents.ChatBot chatBot = - spy( - AgenticServices.agentBuilder(Agents.ChatBot.class) - .chatModel(Models.BASE_MODEL) - .chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10)) - .outputName("userInput") - .build()); - - final Workflow mixedWorkflow = - AgentWorkflowBuilder.workflow("chat-bot") - .tasks( - t -> - t.callFn( - callJ -> - callJ.function( - input -> { - System.out.println(input); - return Map.of("userInput", input); - }, - String.class)) - .agent(chatBot) - .callFn( - callJ -> - callJ.function( - input -> { - System.out.println(input); - // Here, we are return a simple string so the internal - // AgenticScope will add it to the default `input` key - // If we want to really manipulate it, we could return a - // Map<>(message, input) - return "I've changed the input [" + input + "]"; - }, - String.class))) - .build(); - - try (WorkflowApplication app = WorkflowApplication.builder().build()) { - WorkflowModel model = - app.workflowDefinition(mixedWorkflow).instance("Hello World!").start().join(); - - Optional resultAsString = model.as(String.class); - - assertTrue(resultAsString.isPresent()); - assertFalse(resultAsString.get().isEmpty()); - assertTrue(resultAsString.get().contains("changed the input")); - - Optional resultAsScope = model.as(AgenticScope.class); - - assertTrue(resultAsScope.isPresent()); - assertFalse(resultAsScope.get().readState("input").toString().isEmpty()); - assertTrue(resultAsScope.get().readState("input").toString().contains("changed the input")); - } - } - - private CloudEvent newMessageEvent(String message) { - return newMessageEvent(message, null); + private CloudEvent newRequestMessage(String message) { + return CloudEventsTestBuilder.newMessage( + String.format("{\"userInput\": \"%s\"}", message), "org.acme.chatbot.request"); } - private CloudEvent newMessageEvent(String message, String type) { - if (type == null || type.isEmpty()) { - type = "org.acme.chatbot.request"; - } - - return new CloudEventBuilder() - .withData(String.format("{\"userInput\": \"%s\"}", message).getBytes()) - .withType(type) - .withId(UUID.randomUUID().toString()) - .withDataContentType("application/json") - .withSource(URI.create("test://localhost")) - .withSubject("A chatbot message") - .withTime(OffsetDateTime.now()) - .build(); + private CloudEvent newFinalizeMessage() { + return CloudEventsTestBuilder.newMessage("", "org.acme.chatbot.finalize"); } } diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/CloudEventsTestBuilder.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/CloudEventsTestBuilder.java new file mode 100644 index 00000000..e2fec1e7 --- /dev/null +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/CloudEventsTestBuilder.java @@ -0,0 +1,42 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.agentic; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.v1.CloudEventBuilder; +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.UUID; + +public final class CloudEventsTestBuilder { + + private CloudEventsTestBuilder() {} + + public static CloudEvent newMessage(String data, String type) { + if (data == null) { + data = ""; + } + return new CloudEventBuilder() + .withData(data.getBytes()) + .withType(type) + .withId(UUID.randomUUID().toString()) + .withDataContentType("application/json") + .withSource(URI.create("test://localhost")) + .withSubject("A chatbot message") + .withTime(OffsetDateTime.now()) + .build(); + } +} diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/EmailDrafterIT.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/EmailDrafterIT.java new file mode 100644 index 00000000..a70f7a72 --- /dev/null +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/EmailDrafterIT.java @@ -0,0 +1,148 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.agentic; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; + +import dev.langchain4j.agentic.AgenticServices; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.data.PojoCloudEventData; +import io.serverlessworkflow.api.types.EventFilter; +import io.serverlessworkflow.api.types.EventProperties; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.jackson.JsonUtils; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.acme.EmailDraft; +import org.acme.EmailDrafts; +import org.acme.EmailPolicies; +import org.acme.PolicyDecision; +import org.junit.jupiter.api.Test; + +public class EmailDrafterIT { + + @Test + @SuppressWarnings("unchecked") + void email_drafter_agent() { + Agents.EmailDrafter emailDrafter = + AgenticServices.agentBuilder(Agents.EmailDrafter.class) + .chatModel(Models.BASE_MODEL) + .outputName("email_draft") + .build(); + + BlockingQueue finishedEvents = new LinkedBlockingQueue<>(); + + final Workflow emailDrafterWorkflow = + AgentWorkflowBuilder.workflow("emailDrafterAgentic") + .tasks( + tasks -> + tasks + .agent(emailDrafter) + .callFn(c -> c.function(EmailDrafts::parse, String.class)) + .callFn(c -> c.function(EmailPolicies::policyCheck, EmailDraft.class)) + .switchCase( + s -> + s.onPredicate( + c -> + c.when( + decision -> + !EmailPolicies.Decision.AUTO_SEND.equals( + decision.decision()), + PolicyDecision.class) + .then("requestReview")) + .onDefault("emailReady")) + .emit( + "requestReview", + emit -> + emit.event( + e -> + e.type("org.acme.email.review.required") + .data( + payload -> + PojoCloudEventData.wrap( + payload, + p -> + JsonUtils.mapper() + .writeValueAsString(payload) + .getBytes()), + PolicyDecision.class))) + .listen( + "waitForReview", + listen -> + listen.to( + e -> + e.any( + any -> any.with(r -> r.type("org.acme.email.approved")), + any -> any.with(r -> r.type("org.acme.email.denied"))))) + .emit( + "emailReady", emit -> emit.event(e -> e.type("org.acme.email.ready")))) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + // input + Map emailVars = + Map.ofEntries( + Map.entry("recipientName", "John Mars"), + Map.entry("senderName", "Rick Venus"), + Map.entry("purpose", "follow-up"), + Map.entry( + "keyPoints", + List.of( + "Thanks for the call yesterday", + "Attaching the one-page overview", + "Available Wed or Thu 2–4pm ET", + "Check the links for more")), + Map.entry("tone", "friendly"), // friendly | neutral | formal + Map.entry("length", "short"), // short | medium + Map.entry("cta", "Please reply with a 15-minute slot this week."), + Map.entry("signature", "Best regards,\nRick Venus\nEngineer\nAcme"), + Map.entry("allowedDomains", List.of("acme.com", "example.com")), + Map.entry( + "links", + List.of( + "https://acme.com/proposals/alpha", "https://example.com/schedule/rick"))); + + // Listen to de event + app.eventConsumer() + .register( + app.eventConsumer() + .listen( + new EventFilter() + .withWith(new EventProperties().withType("org.acme.email.ready")), + app), + ce -> finishedEvents.add((CloudEvent) ce)); + + var instance = app.workflowDefinition(emailDrafterWorkflow).instance(emailVars); + var running = instance.start().join(); + var policyDecision = running.as(PolicyDecision.class); + assertThat(policyDecision).isNotNull(); + assertThat(policyDecision.isPresent()).isTrue(); + assertThat(policyDecision.get().decision()).isEqualTo(EmailPolicies.Decision.AUTO_SEND); + assertThat(instance.status()).isEqualTo(WorkflowStatus.COMPLETED); + + CloudEvent finished = finishedEvents.poll(1, TimeUnit.SECONDS); + assertThat(finished).isNotNull(); + } catch (InterruptedException e) { + fail(e); + } + } +} diff --git a/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/MixedWorkflowIT.java b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/MixedWorkflowIT.java new file mode 100644 index 00000000..c551eb8a --- /dev/null +++ b/fluent/agentic/src/test/java/io/serverlessworkflow/fluent/agentic/MixedWorkflowIT.java @@ -0,0 +1,104 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.agentic; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.spy; + +import dev.langchain4j.agentic.AgenticServices; +import dev.langchain4j.agentic.scope.AgenticScope; +import dev.langchain4j.memory.chat.MessageWindowChatMemory; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.Test; + +public class MixedWorkflowIT { + /** + * In this test we validate a workflow mixed with agents and regular Java calls + * + *

+ * + *

    + *
  1. The first function prints the message input and converts the data into a Map for the + * agent ingestion + *
  2. Internally, our factories will add the output to a new AgenticScope since under the hood, + * we are call `as(AgenticScope)` + *
  3. The agent is then called with a scope with a state as `message="input"` + *
  4. The agent updates the state automatically in the AgenticScope and returns the message as + * a string, this string is then served to the next task + *
  5. The next task process the agent response and returns it ending the workflow. Meanwhile, + * the AgenticScope is always updated with the latest result from the given task. + *
+ */ + @Test + void mixed_workflow() { + Agents.ChatBot chatBot = + spy( + AgenticServices.agentBuilder(Agents.ChatBot.class) + .chatModel(Models.BASE_MODEL) + .chatMemoryProvider(memoryId -> MessageWindowChatMemory.withMaxMessages(10)) + .outputName("userInput") + .build()); + + final Workflow mixedWorkflow = + AgentWorkflowBuilder.workflow("chat-bot") + .tasks( + t -> + t.callFn( + callJ -> + callJ.function( + input -> { + System.out.println(input); + return Map.of("userInput", input); + }, + String.class)) + .agent(chatBot) + .callFn( + callJ -> + callJ.function( + input -> { + System.out.println(input); + // Here, we are return a simple string so the internal + // AgenticScope will add it to the default `input` key + // If we want to really manipulate it, we could return a + // Map<>(message, input) + return "I've changed the input [" + input + "]"; + }, + String.class))) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + WorkflowModel model = + app.workflowDefinition(mixedWorkflow).instance("Hello World!").start().join(); + + Optional resultAsString = model.as(String.class); + + assertTrue(resultAsString.isPresent()); + assertFalse(resultAsString.get().isEmpty()); + assertTrue(resultAsString.get().contains("changed the input")); + + Optional resultAsScope = model.as(AgenticScope.class); + + assertTrue(resultAsScope.isPresent()); + assertFalse(resultAsScope.get().readState("input").toString().isEmpty()); + assertTrue(resultAsScope.get().readState("input").toString().contains("changed the input")); + } + } +} diff --git a/fluent/agentic/src/test/java/org/acme/EmailDraft.java b/fluent/agentic/src/test/java/org/acme/EmailDraft.java new file mode 100644 index 00000000..c73824fd --- /dev/null +++ b/fluent/agentic/src/test/java/org/acme/EmailDraft.java @@ -0,0 +1,20 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme; + +import java.util.List; + +public record EmailDraft(String subject, String bodyPlain, List links) {} diff --git a/fluent/agentic/src/test/java/org/acme/EmailDrafts.java b/fluent/agentic/src/test/java/org/acme/EmailDrafts.java new file mode 100644 index 00000000..c6ae51a4 --- /dev/null +++ b/fluent/agentic/src/test/java/org/acme/EmailDrafts.java @@ -0,0 +1,38 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public final class EmailDrafts { + private static final ObjectMapper MAPPER = new ObjectMapper(); + + public static EmailDraft parse(String json) { + try { + var node = MAPPER.readTree(json); + var subject = node.path("subject").asText(""); + var bodyPlain = node.path("body_plain").asText(""); + var links = new ArrayList(); + node.path("links").forEach(n -> links.add(n.asText())); + return new EmailDraft(subject, bodyPlain, links); + } catch (IOException e) { + return new EmailDraft("", "", List.of()); + } + } +} diff --git a/fluent/agentic/src/test/java/org/acme/EmailPolicies.java b/fluent/agentic/src/test/java/org/acme/EmailPolicies.java new file mode 100644 index 00000000..8f2d72ab --- /dev/null +++ b/fluent/agentic/src/test/java/org/acme/EmailPolicies.java @@ -0,0 +1,69 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; + +public final class EmailPolicies { + + public static PolicyDecision policyCheck(EmailDraft d) { + var notes = new ArrayList(); + var subject = safeTrim(d.subject()); + var body = safeTrim(d.bodyPlain()); + + if (subject.isEmpty()) notes.add("Missing subject"); + if (body.isEmpty()) notes.add("Missing body"); + + var secret = Pattern.compile("(?i)(api[-_]?key|secret|password|token)\\s*[:=]\\s*\\S+"); + if (secret.matcher(body).find() || secret.matcher(subject).find()) { + notes.add("Suspected secret detected"); + return new PolicyDecision(Decision.BLOCKED, d, notes); + } + + var allow = Set.of("example.com", "acme.com"); + var badLinks = new ArrayList(); + for (String url : d.links() == null ? List.of() : d.links()) { + try { + String host = URI.create(url).getHost(); + if (host == null || allow.stream().noneMatch(host::endsWith)) { + badLinks.add(url); + } + } catch (IllegalArgumentException ignored) { + badLinks.add(url); + } + } + if (!badLinks.isEmpty()) { + notes.add("Non-allowed or malformed links: " + badLinks); + } + + var decision = notes.isEmpty() ? Decision.AUTO_SEND : Decision.REVIEW; + return new PolicyDecision(decision, new EmailDraft(subject, body, d.links()), notes); + } + + private static String safeTrim(String s) { + return s == null ? "" : s.trim(); + } + + public enum Decision { + AUTO_SEND, + REVIEW, + BLOCKED + } +} diff --git a/fluent/agentic/src/test/java/org/acme/PolicyDecision.java b/fluent/agentic/src/test/java/org/acme/PolicyDecision.java new file mode 100644 index 00000000..2acfd435 --- /dev/null +++ b/fluent/agentic/src/test/java/org/acme/PolicyDecision.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.acme; + +import java.util.List; + +public record PolicyDecision( + EmailPolicies.Decision decision, EmailDraft draft, List notes) {} diff --git a/fluent/func/pom.xml b/fluent/func/pom.xml index 9f67dcce..0a90cf79 100644 --- a/fluent/func/pom.xml +++ b/fluent/func/pom.xml @@ -27,6 +27,10 @@ io.serverlessworkflow serverlessworkflow-fluent-spec + + io.cloudevents + cloudevents-core + org.junit.jupiter diff --git a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java index 28c8b8a4..2db03ae0 100644 --- a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java +++ b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEmitTaskBuilder.java @@ -17,12 +17,23 @@ import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; import io.serverlessworkflow.fluent.func.spi.FuncTransformations; -import io.serverlessworkflow.fluent.spec.EmitTaskBuilder; +import io.serverlessworkflow.fluent.spec.AbstractEmitTaskBuilder; -public class FuncEmitTaskBuilder extends EmitTaskBuilder +public class FuncEmitTaskBuilder + extends AbstractEmitTaskBuilder implements ConditionalTaskBuilder, FuncTransformations { FuncEmitTaskBuilder() { super(); } + + @Override + protected FuncEmitTaskBuilder self() { + return this; + } + + @Override + protected FuncEventPropertiesBuilder newEventPropertiesBuilder() { + return new FuncEventPropertiesBuilder(); + } } diff --git a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterBuilder.java b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterBuilder.java new file mode 100644 index 00000000..cac572ac --- /dev/null +++ b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventFilterBuilder.java @@ -0,0 +1,33 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func; + +import io.serverlessworkflow.fluent.spec.AbstractEventFilterBuilder; + +public class FuncEventFilterBuilder + extends AbstractEventFilterBuilder< + FuncEventFilterBuilder, FuncPredicateEventPropertiesBuilder> { + + @Override + protected FuncEventFilterBuilder self() { + return this; + } + + @Override + protected FuncPredicateEventPropertiesBuilder newEventPropertiesBuilder() { + return new FuncPredicateEventPropertiesBuilder(); + } +} diff --git a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventPropertiesBuilder.java b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventPropertiesBuilder.java new file mode 100644 index 00000000..27c691ce --- /dev/null +++ b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncEventPropertiesBuilder.java @@ -0,0 +1,40 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func; + +import io.cloudevents.CloudEventData; +import io.serverlessworkflow.api.types.func.EventDataFunction; +import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; +import java.util.function.Function; + +public class FuncEventPropertiesBuilder + extends AbstractEventPropertiesBuilder { + + @Override + protected FuncEventPropertiesBuilder self() { + return this; + } + + public FuncEventPropertiesBuilder data(Function function) { + this.eventProperties.setData(new EventDataFunction().withFunction(function)); + return this; + } + + public FuncEventPropertiesBuilder data(Function function, Class clazz) { + this.eventProperties.setData(new EventDataFunction().withFunction(function, clazz)); + return this; + } +} diff --git a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java index 8b5a57fc..b5168f62 100644 --- a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java +++ b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenTaskBuilder.java @@ -20,10 +20,11 @@ import io.serverlessworkflow.api.types.func.UntilPredicate; import io.serverlessworkflow.fluent.func.spi.ConditionalTaskBuilder; import io.serverlessworkflow.fluent.func.spi.FuncTransformations; -import io.serverlessworkflow.fluent.spec.ListenTaskBuilder; +import io.serverlessworkflow.fluent.spec.AbstractListenTaskBuilder; import java.util.function.Predicate; -public class FuncListenTaskBuilder extends ListenTaskBuilder +public class FuncListenTaskBuilder + extends AbstractListenTaskBuilder implements ConditionalTaskBuilder, FuncTransformations { @@ -38,6 +39,16 @@ public FuncListenTaskBuilder until(Predicate predicate, Class predClas return this; } + @Override + protected FuncListenTaskBuilder self() { + return this; + } + + @Override + protected FuncListenToBuilder newEventConsumptionStrategyBuilder() { + return new FuncListenToBuilder(); + } + @Override public ListenTask build() { ListenTask task = super.build(); diff --git a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenToBuilder.java b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenToBuilder.java new file mode 100644 index 00000000..a21315cc --- /dev/null +++ b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenToBuilder.java @@ -0,0 +1,62 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func; + +import io.serverlessworkflow.api.types.AllEventConsumptionStrategy; +import io.serverlessworkflow.api.types.AnyEventConsumptionStrategy; +import io.serverlessworkflow.api.types.ListenTo; +import io.serverlessworkflow.api.types.OneEventConsumptionStrategy; +import io.serverlessworkflow.api.types.Until; +import io.serverlessworkflow.fluent.spec.AbstractEventConsumptionStrategyBuilder; + +public class FuncListenToBuilder + extends AbstractEventConsumptionStrategyBuilder< + FuncListenToBuilder, ListenTo, FuncEventFilterBuilder> { + + private final ListenTo listenTo = new ListenTo(); + + @Override + protected FuncEventFilterBuilder newEventFilterBuilder() { + return new FuncEventFilterBuilder(); + } + + // TODO: move these methods to default on an interface + + @Override + protected void setOne(OneEventConsumptionStrategy strategy) { + this.listenTo.setOneEventConsumptionStrategy(strategy); + } + + @Override + protected void setAll(AllEventConsumptionStrategy strategy) { + this.listenTo.setAllEventConsumptionStrategy(strategy); + } + + @Override + protected void setAny(AnyEventConsumptionStrategy strategy) { + this.listenTo.setAnyEventConsumptionStrategy(strategy); + } + + @Override + protected ListenTo getEventConsumptionStrategy() { + return this.listenTo; + } + + @Override + protected void setUntil(Until until) { + this.listenTo.getAnyEventConsumptionStrategy().setUntil(until); + } +} diff --git a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncPredicateEventPropertiesBuilder.java b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncPredicateEventPropertiesBuilder.java new file mode 100644 index 00000000..a0692d33 --- /dev/null +++ b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncPredicateEventPropertiesBuilder.java @@ -0,0 +1,35 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func; + +import io.cloudevents.CloudEventData; +import io.serverlessworkflow.api.types.func.EventDataPredicate; +import io.serverlessworkflow.fluent.spec.AbstractEventPropertiesBuilder; +import java.util.function.Predicate; + +public class FuncPredicateEventPropertiesBuilder + extends AbstractEventPropertiesBuilder { + + @Override + protected FuncPredicateEventPropertiesBuilder self() { + return this; + } + + public FuncPredicateEventPropertiesBuilder data(Predicate predicate) { + this.eventProperties.setData(new EventDataPredicate().withPredicate(predicate)); + return this; + } +} diff --git a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncSwitchTaskBuilder.java b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncSwitchTaskBuilder.java index d1a1b642..f0294861 100644 --- a/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncSwitchTaskBuilder.java +++ b/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncSwitchTaskBuilder.java @@ -50,20 +50,20 @@ protected FuncSwitchTaskBuilder self() { return this; } - public FuncSwitchTaskBuilder functions(Consumer consumer) { - return this.functions(UUID.randomUUID().toString(), consumer); + public FuncSwitchTaskBuilder onPredicate(Consumer consumer) { + return this.onPredicate(UUID.randomUUID().toString(), consumer); } - public FuncSwitchTaskBuilder functions( - String name, Consumer consumer) { - final SwitchCaseFunctionBuilder switchCase = new SwitchCaseFunctionBuilder(); + public FuncSwitchTaskBuilder onPredicate( + String name, Consumer consumer) { + final SwitchCasePredicateBuilder switchCase = new SwitchCasePredicateBuilder(); consumer.accept(switchCase); this.switchItems.add(new SwitchItem(name, switchCase.build())); return this; } @Override - public FuncSwitchTaskBuilder items(String name, Consumer switchCaseConsumer) { + public FuncSwitchTaskBuilder on(String name, Consumer switchCaseConsumer) { final SwitchCaseBuilder switchCase = new SwitchCaseBuilder(); switchCaseConsumer.accept(switchCase); this.switchItems.add(new SwitchItem(name, switchCase.build())); @@ -75,29 +75,29 @@ public SwitchTask build() { return switchTask; } - public static final class SwitchCaseFunctionBuilder { + public static final class SwitchCasePredicateBuilder { private final SwitchCaseFunction switchCase; - SwitchCaseFunctionBuilder() { + SwitchCasePredicateBuilder() { this.switchCase = new SwitchCaseFunction(); } - public SwitchCaseFunctionBuilder when(Predicate when) { + public SwitchCasePredicateBuilder when(Predicate when) { this.switchCase.withPredicate(when); return this; } - public SwitchCaseFunctionBuilder when(Predicate when, Class whenClass) { + public SwitchCasePredicateBuilder when(Predicate when, Class whenClass) { this.switchCase.withPredicate(when, whenClass); return this; } - public SwitchCaseFunctionBuilder then(FlowDirective then) { - this.switchCase.setThen(then); + public SwitchCasePredicateBuilder then(String taskName) { + this.switchCase.setThen(new FlowDirective().withString(taskName)); return this; } - public SwitchCaseFunctionBuilder then(FlowDirectiveEnum then) { + public SwitchCasePredicateBuilder then(FlowDirectiveEnum then) { this.switchCase.setThen(new FlowDirective().withFlowDirectiveEnum(then)); return this; } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEmitTaskBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEmitTaskBuilder.java new file mode 100644 index 00000000..3c66c286 --- /dev/null +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEmitTaskBuilder.java @@ -0,0 +1,48 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.spec; + +import io.serverlessworkflow.api.types.EmitEventDefinition; +import io.serverlessworkflow.api.types.EmitTask; +import io.serverlessworkflow.api.types.EmitTaskConfiguration; +import java.util.function.Consumer; + +public abstract class AbstractEmitTaskBuilder< + SELF extends AbstractEmitTaskBuilder, F extends AbstractEventPropertiesBuilder> + extends TaskBaseBuilder> { + + protected final EmitTask emitTask = new EmitTask(); + + protected AbstractEmitTaskBuilder() { + super.setTask(emitTask); + } + + protected abstract F newEventPropertiesBuilder(); + + @SuppressWarnings("unchecked") + public SELF event(Consumer consumer) { + final F eventPropertiesBuilder = this.newEventPropertiesBuilder(); + consumer.accept(eventPropertiesBuilder); + this.emitTask.setEmit( + new EmitTaskConfiguration() + .withEvent(new EmitEventDefinition().withWith(eventPropertiesBuilder.build()))); + return (SELF) this; + } + + public EmitTask build() { + return emitTask; + } +} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventConsumptionStrategyBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventConsumptionStrategyBuilder.java index bdcadb79..4c12d199 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventConsumptionStrategyBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventConsumptionStrategyBuilder.java @@ -17,31 +17,38 @@ import io.serverlessworkflow.api.types.AllEventConsumptionStrategy; import io.serverlessworkflow.api.types.AnyEventConsumptionStrategy; +import io.serverlessworkflow.api.types.EventFilter; import io.serverlessworkflow.api.types.OneEventConsumptionStrategy; import io.serverlessworkflow.api.types.Until; import io.serverlessworkflow.fluent.spec.spi.EventConsumptionStrategyFluent; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.function.Consumer; public abstract class AbstractEventConsumptionStrategyBuilder< - SELF extends EventConsumptionStrategyFluent, T extends Serializable> - implements EventConsumptionStrategyFluent { + SELF extends EventConsumptionStrategyFluent, + T extends Serializable, + F extends AbstractEventFilterBuilder> + implements EventConsumptionStrategyFluent { protected boolean oneSet, allSet, anySet; private Until until; - AbstractEventConsumptionStrategyBuilder() {} + protected AbstractEventConsumptionStrategyBuilder() {} @SuppressWarnings("unchecked") private SELF self() { return (SELF) this; } - public SELF one(Consumer c) { + protected abstract F newEventFilterBuilder(); + + public SELF one(Consumer c) { ensureNoneSet(); oneSet = true; - EventFilterBuilder fb = new EventFilterBuilder(); + F fb = this.newEventFilterBuilder(); c.accept(fb); OneEventConsumptionStrategy strat = new OneEventConsumptionStrategy(); strat.setOne(fb.build()); @@ -49,12 +56,12 @@ public SELF one(Consumer c) { return this.self(); } - abstract void setOne(OneEventConsumptionStrategy strategy); + protected abstract void setOne(OneEventConsumptionStrategy strategy); - public SELF all(Consumer c) { + public SELF all(Consumer c) { ensureNoneSet(); allSet = true; - EventFilterBuilder fb = new EventFilterBuilder(); + F fb = this.newEventFilterBuilder(); c.accept(fb); AllEventConsumptionStrategy strat = new AllEventConsumptionStrategy(); strat.setAll(List.of(fb.build())); @@ -62,20 +69,35 @@ public SELF all(Consumer c) { return this.self(); } - abstract void setAll(AllEventConsumptionStrategy strategy); + protected abstract void setAll(AllEventConsumptionStrategy strategy); - public SELF any(Consumer c) { + @SuppressWarnings("unchecked") + public SELF any(Consumer c) { + return (SELF) any(new Consumer[] {c}); + } + + @SuppressWarnings("unchecked") + @SafeVarargs + public final SELF any(Consumer... consumers) { ensureNoneSet(); anySet = true; - EventFilterBuilder fb = new EventFilterBuilder(); - c.accept(fb); + + List built = new ArrayList<>(consumers.length); // replace Object with your filter type + + for (Consumer c : consumers) { + Objects.requireNonNull(c, "consumer"); + F fb = this.newEventFilterBuilder(); // fresh builder per consumer + c.accept(fb); + built.add((T) fb.build()); + } + AnyEventConsumptionStrategy strat = new AnyEventConsumptionStrategy(); - strat.setAny(List.of(fb.build())); + strat.setAny((List) built); this.setAny(strat); return this.self(); } - abstract void setAny(AnyEventConsumptionStrategy strategy); + protected abstract void setAny(AnyEventConsumptionStrategy strategy); public SELF until(Consumer c) { final EventConsumptionStrategyBuilder eventConsumptionStrategyBuilder = @@ -108,7 +130,7 @@ public final T build() { return this.getEventConsumptionStrategy(); } - abstract T getEventConsumptionStrategy(); + protected abstract T getEventConsumptionStrategy(); - abstract void setUntil(Until until); + protected abstract void setUntil(Until until); } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java new file mode 100644 index 00000000..9d9099ce --- /dev/null +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventFilterBuilder.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.spec; + +import io.serverlessworkflow.api.types.EventFilter; +import io.serverlessworkflow.api.types.EventFilterCorrelate; +import java.util.function.Consumer; + +public abstract class AbstractEventFilterBuilder< + SELF extends AbstractEventFilterBuilder, P extends AbstractEventPropertiesBuilder> { + + protected final EventFilter filter = new EventFilter(); + protected final EventFilterCorrelate correlate = new EventFilterCorrelate(); + + protected abstract SELF self(); + + protected abstract P newEventPropertiesBuilder(); + + public SELF with(Consumer

c) { + P pb = this.newEventPropertiesBuilder(); + c.accept(pb); + filter.setWith(pb.build()); + return self(); + } + + public SELF correlate(String key, Consumer c) { + ListenTaskBuilder.CorrelatePropertyBuilder cpb = + new ListenTaskBuilder.CorrelatePropertyBuilder(); + c.accept(cpb); + correlate.withAdditionalProperty(key, cpb.build()); + return self(); + } + + public EventFilter build() { + filter.setCorrelate(correlate); + return filter; + } +} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventPropertiesBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventPropertiesBuilder.java new file mode 100644 index 00000000..dc471399 --- /dev/null +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractEventPropertiesBuilder.java @@ -0,0 +1,82 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.spec; + +import io.serverlessworkflow.api.types.EventData; +import io.serverlessworkflow.api.types.EventProperties; +import io.serverlessworkflow.api.types.EventSource; +import io.serverlessworkflow.api.types.EventTime; +import io.serverlessworkflow.api.types.UriTemplate; +import java.net.URI; +import java.util.Date; + +public abstract class AbstractEventPropertiesBuilder< + SELF extends AbstractEventPropertiesBuilder> { + + protected final EventProperties eventProperties = new EventProperties(); + + protected abstract SELF self(); + + public SELF id(String id) { + eventProperties.setId(id); + return self(); + } + + public SELF source(String expr) { + eventProperties.setSource(new EventSource().withRuntimeExpression(expr)); + return self(); + } + + public SELF source(URI uri) { + eventProperties.setSource( + new EventSource().withUriTemplate(new UriTemplate().withLiteralUri(uri))); + return self(); + } + + public SELF type(String type) { + eventProperties.setType(type); + return self(); + } + + public SELF time(Date time) { + eventProperties.setTime(new EventTime().withLiteralTime(time)); + return self(); + } + + public SELF subject(String subject) { + eventProperties.setSubject(subject); + return self(); + } + + public SELF dataContentType(String ct) { + eventProperties.setDatacontenttype(ct); + return self(); + } + + public SELF data(String expr) { + eventProperties.setData(new EventData().withRuntimeExpression(expr)); + return self(); + } + + public SELF data(Object obj) { + eventProperties.setData(new EventData().withObject(obj)); + return self(); + } + + public EventProperties build() { + return eventProperties; + } +} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractListenTaskBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractListenTaskBuilder.java new file mode 100644 index 00000000..28458d31 --- /dev/null +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/AbstractListenTaskBuilder.java @@ -0,0 +1,87 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.spec; + +import io.serverlessworkflow.api.types.CorrelateProperty; +import io.serverlessworkflow.api.types.ListenTask; +import io.serverlessworkflow.api.types.ListenTaskConfiguration; +import io.serverlessworkflow.api.types.ListenTo; +import java.util.function.Consumer; + +public abstract class AbstractListenTaskBuilder< + T extends BaseTaskItemListBuilder, + F extends AbstractEventConsumptionStrategyBuilder> + extends TaskBaseBuilder> { + + private final ListenTask listenTask; + private final ListenTaskConfiguration config; + private final T taskItemListBuilder; + + public AbstractListenTaskBuilder(T taskItemListBuilder) { + super(); + this.listenTask = new ListenTask(); + this.config = new ListenTaskConfiguration(); + this.config.setTo(new ListenTo()); + this.listenTask.setListen(config); + this.taskItemListBuilder = taskItemListBuilder; + super.setTask(listenTask); + } + + protected abstract F newEventConsumptionStrategyBuilder(); + + public AbstractListenTaskBuilder forEach(Consumer> c) { + final SubscriptionIteratorBuilder iteratorBuilder = + new SubscriptionIteratorBuilder<>(this.taskItemListBuilder); + c.accept(iteratorBuilder); + this.listenTask.setForeach(iteratorBuilder.build()); + return this; + } + + public AbstractListenTaskBuilder read( + ListenTaskConfiguration.ListenAndReadAs listenAndReadAs) { + this.config.setRead(listenAndReadAs); + return this; + } + + public AbstractListenTaskBuilder to(Consumer c) { + final F listenToBuilder = this.newEventConsumptionStrategyBuilder(); + c.accept(listenToBuilder); + this.config.setTo((ListenTo) listenToBuilder.build()); + return this; + } + + public ListenTask build() { + return listenTask; + } + + public static final class CorrelatePropertyBuilder { + private final CorrelateProperty prop = new CorrelateProperty(); + + public ListenTaskBuilder.CorrelatePropertyBuilder from(String expr) { + prop.setFrom(expr); + return this; + } + + public ListenTaskBuilder.CorrelatePropertyBuilder expect(String val) { + prop.setExpect(val); + return this; + } + + public CorrelateProperty build() { + return prop; + } + } +} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/DoTaskBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/DoTaskBuilder.java index 4fc04278..669b580f 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/DoTaskBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/DoTaskBuilder.java @@ -56,8 +56,7 @@ public DoTaskBuilder fork(String name, Consumer itemsConfigurer } @Override - public DoTaskBuilder listen( - String name, Consumer> itemsConfigurer) { + public DoTaskBuilder listen(String name, Consumer itemsConfigurer) { this.listBuilder().listen(name, itemsConfigurer); return this; } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EmitTaskBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EmitTaskBuilder.java index be4aff6e..823a80c2 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EmitTaskBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EmitTaskBuilder.java @@ -15,31 +15,12 @@ */ package io.serverlessworkflow.fluent.spec; -import io.serverlessworkflow.api.types.EmitEventDefinition; -import io.serverlessworkflow.api.types.EmitTask; -import io.serverlessworkflow.api.types.EmitTaskConfiguration; -import java.util.function.Consumer; +public class EmitTaskBuilder + extends AbstractEmitTaskBuilder { -public class EmitTaskBuilder extends TaskBaseBuilder { - - private final EmitTask emitTask; - - protected EmitTaskBuilder() { - this.emitTask = new EmitTask(); - super.setTask(emitTask); - } - - public EmitTaskBuilder event(Consumer consumer) { - final EventPropertiesBuilder eventPropertiesBuilder = new EventPropertiesBuilder(); - consumer.accept(eventPropertiesBuilder); - this.emitTask.setEmit( - new EmitTaskConfiguration() - .withEvent(new EmitEventDefinition().withWith(eventPropertiesBuilder.build()))); - return this; - } - - public EmitTask build() { - return emitTask; + @Override + protected EventPropertiesBuilder newEventPropertiesBuilder() { + return new EventPropertiesBuilder(); } @Override diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EventConsumptionStrategyBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EventConsumptionStrategyBuilder.java index a681ca0e..b4591e0d 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EventConsumptionStrategyBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EventConsumptionStrategyBuilder.java @@ -23,34 +23,39 @@ public class EventConsumptionStrategyBuilder extends AbstractEventConsumptionStrategyBuilder< - EventConsumptionStrategyBuilder, EventConsumptionStrategy> { + EventConsumptionStrategyBuilder, EventConsumptionStrategy, EventFilterBuilder> { private final EventConsumptionStrategy eventConsumptionStrategy = new EventConsumptionStrategy(); EventConsumptionStrategyBuilder() {} @Override - void setOne(OneEventConsumptionStrategy strategy) { + protected EventFilterBuilder newEventFilterBuilder() { + return new EventFilterBuilder(); + } + + @Override + protected void setOne(OneEventConsumptionStrategy strategy) { eventConsumptionStrategy.setOneEventConsumptionStrategy(strategy); } @Override - void setAll(AllEventConsumptionStrategy strategy) { + protected void setAll(AllEventConsumptionStrategy strategy) { eventConsumptionStrategy.setAllEventConsumptionStrategy(strategy); } @Override - void setAny(AnyEventConsumptionStrategy strategy) { + protected void setAny(AnyEventConsumptionStrategy strategy) { eventConsumptionStrategy.setAnyEventConsumptionStrategy(strategy); } @Override - EventConsumptionStrategy getEventConsumptionStrategy() { + protected EventConsumptionStrategy getEventConsumptionStrategy() { return this.eventConsumptionStrategy; } @Override - void setUntil(Until until) { + protected void setUntil(Until until) { this.eventConsumptionStrategy.getAnyEventConsumptionStrategy().setUntil(until); } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EventFilterBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EventFilterBuilder.java index e99d524f..90706a5c 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EventFilterBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EventFilterBuilder.java @@ -15,35 +15,16 @@ */ package io.serverlessworkflow.fluent.spec; -import io.serverlessworkflow.api.types.EventFilter; -import io.serverlessworkflow.api.types.EventFilterCorrelate; -import java.util.function.Consumer; +public class EventFilterBuilder + extends AbstractEventFilterBuilder { -/** Builder for event filters used in consumption strategies. */ -public final class EventFilterBuilder { - private final EventFilter filter = new EventFilter(); - private final EventFilterCorrelate correlate = new EventFilterCorrelate(); - - /** Predicate to match event properties. */ - public EventFilterBuilder with(Consumer c) { - EventPropertiesBuilder pb = new EventPropertiesBuilder(); - c.accept(pb); - filter.setWith(pb.build()); - return this; - } - - /** Correlation property for the filter. */ - public EventFilterBuilder correlate( - String key, Consumer c) { - ListenTaskBuilder.CorrelatePropertyBuilder cpb = - new ListenTaskBuilder.CorrelatePropertyBuilder(); - c.accept(cpb); - correlate.withAdditionalProperty(key, cpb.build()); + @Override + protected EventFilterBuilder self() { return this; } - public EventFilter build() { - filter.setCorrelate(correlate); - return filter; + @Override + protected EventPropertiesBuilder newEventPropertiesBuilder() { + return new EventPropertiesBuilder(); } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EventPropertiesBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EventPropertiesBuilder.java index ccfccfec..27fb3b47 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EventPropertiesBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/EventPropertiesBuilder.java @@ -15,64 +15,10 @@ */ package io.serverlessworkflow.fluent.spec; -import io.serverlessworkflow.api.types.EventData; -import io.serverlessworkflow.api.types.EventProperties; -import io.serverlessworkflow.api.types.EventSource; -import io.serverlessworkflow.api.types.EventTime; -import io.serverlessworkflow.api.types.UriTemplate; -import java.net.URI; -import java.util.Date; +public class EventPropertiesBuilder extends AbstractEventPropertiesBuilder { -public final class EventPropertiesBuilder { - private final EventProperties properties = new EventProperties(); - - public EventPropertiesBuilder id(String id) { - properties.setId(id); - return this; - } - - public EventPropertiesBuilder source(String expr) { - - properties.setSource(new EventSource().withRuntimeExpression(expr)); - return this; - } - - public EventPropertiesBuilder source(URI uri) { - properties.setSource(new EventSource().withUriTemplate(new UriTemplate().withLiteralUri(uri))); - return this; - } - - public EventPropertiesBuilder type(String type) { - properties.setType(type); - return this; - } - - public EventPropertiesBuilder time(Date time) { - properties.setTime(new EventTime().withLiteralTime(time)); + @Override + protected EventPropertiesBuilder self() { return this; } - - public EventPropertiesBuilder subject(String subject) { - properties.setSubject(subject); - return this; - } - - public EventPropertiesBuilder dataContentType(String ct) { - properties.setDatacontenttype(ct); - return this; - } - - public EventPropertiesBuilder data(String expr) { - properties.setData(new EventData().withRuntimeExpression(expr)); - return this; - } - - public EventPropertiesBuilder data(Object obj) { - properties.setData(new EventData().withObject(obj)); - return this; - } - - public EventProperties build() { - return properties; - } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/ListenTaskBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/ListenTaskBuilder.java index 43e52020..760bdc49 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/ListenTaskBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/ListenTaskBuilder.java @@ -15,77 +15,24 @@ */ package io.serverlessworkflow.fluent.spec; -import io.serverlessworkflow.api.types.CorrelateProperty; -import io.serverlessworkflow.api.types.ListenTask; -import io.serverlessworkflow.api.types.ListenTaskConfiguration; -import io.serverlessworkflow.api.types.ListenTo; -import java.util.function.Consumer; - /** * Fluent builder for a "listen" task in a Serverless Workflow. Enforces exactly one consumption * strategy: one, all, or any. */ -public class ListenTaskBuilder> - extends TaskBaseBuilder> { - - private final ListenTask listenTask; - private final ListenTaskConfiguration config; - private final T taskItemListBuilder; +public class ListenTaskBuilder + extends AbstractListenTaskBuilder { - public ListenTaskBuilder(T taskItemListBuilder) { - super(); - this.listenTask = new ListenTask(); - this.config = new ListenTaskConfiguration(); - this.config.setTo(new ListenTo()); - this.listenTask.setListen(config); - this.taskItemListBuilder = taskItemListBuilder; - super.setTask(listenTask); + protected ListenTaskBuilder() { + super(new TaskItemListBuilder()); } @Override - protected ListenTaskBuilder self() { - return this; - } - - public ListenTaskBuilder forEach(Consumer> c) { - final SubscriptionIteratorBuilder iteratorBuilder = - new SubscriptionIteratorBuilder<>(this.taskItemListBuilder); - c.accept(iteratorBuilder); - this.listenTask.setForeach(iteratorBuilder.build()); - return this; - } - - public ListenTaskBuilder read(ListenTaskConfiguration.ListenAndReadAs listenAndReadAs) { - this.config.setRead(listenAndReadAs); - return this; + protected ListenToBuilder newEventConsumptionStrategyBuilder() { + return new ListenToBuilder(); } - public ListenTaskBuilder to(Consumer c) { - final ListenToBuilder listenToBuilder = new ListenToBuilder(); - c.accept(listenToBuilder); - this.config.setTo(listenToBuilder.build()); + @Override + protected ListenTaskBuilder self() { return this; } - - public ListenTask build() { - return listenTask; - } - - public static final class CorrelatePropertyBuilder { - private final CorrelateProperty prop = new CorrelateProperty(); - - public CorrelatePropertyBuilder from(String expr) { - prop.setFrom(expr); - return this; - } - - public CorrelatePropertyBuilder expect(String val) { - prop.setExpect(val); - return this; - } - - public CorrelateProperty build() { - return prop; - } - } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/ListenToBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/ListenToBuilder.java index f78f473c..ca01805e 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/ListenToBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/ListenToBuilder.java @@ -22,34 +22,39 @@ import io.serverlessworkflow.api.types.Until; public class ListenToBuilder - extends AbstractEventConsumptionStrategyBuilder { + extends AbstractEventConsumptionStrategyBuilder { private final ListenTo listenTo = new ListenTo(); - ListenToBuilder() {} + protected ListenToBuilder() {} @Override - void setOne(OneEventConsumptionStrategy strategy) { + protected EventFilterBuilder newEventFilterBuilder() { + return new EventFilterBuilder(); + } + + @Override + protected void setOne(OneEventConsumptionStrategy strategy) { this.listenTo.setOneEventConsumptionStrategy(strategy); } @Override - void setAll(AllEventConsumptionStrategy strategy) { + protected void setAll(AllEventConsumptionStrategy strategy) { this.listenTo.setAllEventConsumptionStrategy(strategy); } @Override - void setAny(AnyEventConsumptionStrategy strategy) { + protected void setAny(AnyEventConsumptionStrategy strategy) { this.listenTo.setAnyEventConsumptionStrategy(strategy); } @Override - ListenTo getEventConsumptionStrategy() { + protected ListenTo getEventConsumptionStrategy() { return this.listenTo; } @Override - void setUntil(Until until) { + protected void setUntil(Until until) { this.listenTo.getAnyEventConsumptionStrategy().setUntil(until); } } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/SwitchTaskBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/SwitchTaskBuilder.java index 8c6cc9a1..7d517fb1 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/SwitchTaskBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/SwitchTaskBuilder.java @@ -41,7 +41,7 @@ protected SwitchTaskBuilder self() { } @Override - public SwitchTaskBuilder items( + public SwitchTaskBuilder on( final String name, Consumer switchCaseConsumer) { final SwitchTaskFluent.SwitchCaseBuilder switchCaseBuilder = new SwitchTaskFluent.SwitchCaseBuilder(); diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java index b7fef28b..4c82f62a 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/TaskItemListBuilder.java @@ -91,11 +91,9 @@ public TaskItemListBuilder fork(String name, Consumer itemsConf } @Override - public TaskItemListBuilder listen( - String name, Consumer> itemsConfigurer) { + public TaskItemListBuilder listen(String name, Consumer itemsConfigurer) { requireNameAndConfig(name, itemsConfigurer); - final ListenTaskBuilder listenBuilder = - new ListenTaskBuilder<>(newItemListBuilder()); + final ListenTaskBuilder listenBuilder = new ListenTaskBuilder(); itemsConfigurer.accept(listenBuilder); return addTaskItem(new TaskItem(name, new Task().withListenTask(listenBuilder.build()))); } diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/DoFluent.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/DoFluent.java index 11631f0b..a18a08bf 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/DoFluent.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/DoFluent.java @@ -41,5 +41,5 @@ public interface DoFluent EmitFluent, ForEachFluent, T>, ForkFluent, - ListenFluent, T>, + ListenFluent, RaiseFluent {} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/EventConsumptionStrategyFluent.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/EventConsumptionStrategyFluent.java index 4db05af1..5ee72399 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/EventConsumptionStrategyFluent.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/EventConsumptionStrategyFluent.java @@ -15,19 +15,21 @@ */ package io.serverlessworkflow.fluent.spec.spi; +import io.serverlessworkflow.fluent.spec.AbstractEventFilterBuilder; import io.serverlessworkflow.fluent.spec.EventConsumptionStrategyBuilder; -import io.serverlessworkflow.fluent.spec.EventFilterBuilder; import java.io.Serializable; import java.util.function.Consumer; public interface EventConsumptionStrategyFluent< - SELF extends EventConsumptionStrategyFluent, T extends Serializable> { + SELF extends EventConsumptionStrategyFluent, + T extends Serializable, + F extends AbstractEventFilterBuilder> { - SELF one(Consumer c); + SELF one(Consumer c); - SELF all(Consumer c); + SELF all(Consumer c); - SELF any(Consumer c); + SELF any(Consumer c); SELF until(Consumer c); diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/SwitchTaskFluent.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/SwitchTaskFluent.java index 1543660b..8273876e 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/SwitchTaskFluent.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/spi/SwitchTaskFluent.java @@ -24,11 +24,21 @@ import java.util.function.Consumer; public interface SwitchTaskFluent> { - default SELF items(Consumer switchCaseConsumer) { - return this.items(UUID.randomUUID().toString(), switchCaseConsumer); + String DEFAULT_CASE = "default"; + + default SELF on(Consumer switchCaseConsumer) { + return this.on(UUID.randomUUID().toString(), switchCaseConsumer); } - SELF items(final String name, Consumer switchCaseConsumer); + SELF on(final String name, Consumer switchCaseConsumer); + + default SELF onDefault(String taskName) { + return this.on(DEFAULT_CASE, c -> c.then(taskName)); + } + + default SELF onDefault(FlowDirectiveEnum directiveEnum) { + return this.on(DEFAULT_CASE, c -> c.then(directiveEnum)); + } SwitchTask build(); @@ -44,8 +54,8 @@ public SwitchCaseBuilder when(String when) { return this; } - public SwitchCaseBuilder then(FlowDirective then) { - this.switchCase.setThen(then); + public SwitchCaseBuilder then(String then) { + this.switchCase.setThen(new FlowDirective().withString(then)); return this; } diff --git a/impl/pom.xml b/impl/pom.xml index aa2e4138..d98717ad 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -10,7 +10,6 @@ pom 3.1.11 - 4.0.1 1.5.0 5.2.3 4.0.0 @@ -37,16 +36,6 @@ serverlessworkflow-impl-jackson ${project.version} - - io.cloudevents - cloudevents-core - ${version.io.cloudevents} - - - io.cloudevents - cloudevents-json-jackson - ${version.io.cloudevents} - net.thisptr jackson-jq diff --git a/pom.xml b/pom.xml index 5cf1deb7..b91d9871 100644 --- a/pom.xml +++ b/pom.xml @@ -80,6 +80,7 @@ 2.19.2 1.5.8 5.1.0 + 4.0.1 3.1.1 1.5.2 3.27.4 @@ -137,6 +138,16 @@ jackson-annotations ${version.com.fasterxml.jackson} + + io.cloudevents + cloudevents-core + ${version.io.cloudevents} + + + io.cloudevents + cloudevents-json-jackson + ${version.io.cloudevents} + org.slf4j slf4j-api