From 1bacce398c3f32ad24c6eae4ccfa49f38b54a16a Mon Sep 17 00:00:00 2001 From: fjtirado Date: Fri, 10 Oct 2025 16:52:04 +0200 Subject: [PATCH] [Fix #847] Implementing schedule.on Signed-off-by: fjtirado --- .../impl/WorkflowApplication.java | 16 ++ .../impl/WorkflowDefinition.java | 40 ++++- .../impl/WorkflowScheduler.java | 28 +++ .../EventRegistrationBuilderCollection.java | 21 +++ .../events/EventRegistrationBuilderInfo.java | 106 ++++++++++++ .../impl/events/EventRegistrationInfo.java | 63 +++++++ .../impl/executors/ListenExecutor.java | 159 ++++-------------- .../scheduler/DefaultWorkflowScheduler.java | 47 ++++++ .../scheduler/ScheduledEventConsumer.java | 51 ++++++ .../workflows-samples/listen-start.yaml | 15 ++ 10 files changed, 415 insertions(+), 131 deletions(-) create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowScheduler.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationBuilderCollection.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationBuilderInfo.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationInfo.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java create mode 100644 impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java create mode 100644 impl/test/src/test/resources/workflows-samples/listen-start.yaml diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index f589938f..73fb74dc 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -30,6 +30,7 @@ import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory; import io.serverlessworkflow.impl.resources.ResourceLoaderFactory; import io.serverlessworkflow.impl.resources.StaticResource; +import io.serverlessworkflow.impl.scheduler.DefaultWorkflowScheduler; import io.serverlessworkflow.impl.schema.SchemaValidator; import io.serverlessworkflow.impl.schema.SchemaValidatorFactory; import java.util.ArrayList; @@ -59,6 +60,7 @@ public class WorkflowApplication implements AutoCloseable { private final Collection eventPublishers; private final boolean lifeCycleCEPublishingEnabled; private final WorkflowModelFactory modelFactory; + private final WorkflowScheduler scheduler; private WorkflowApplication(Builder builder) { this.taskFactory = builder.taskFactory; @@ -75,6 +77,7 @@ private WorkflowApplication(Builder builder) { this.eventPublishers = builder.eventPublishers; this.lifeCycleCEPublishingEnabled = builder.lifeCycleCEPublishingEnabled; this.modelFactory = builder.modelFactory; + this.scheduler = builder.scheduler; } public TaskExecutorFactory taskFactory() { @@ -142,6 +145,7 @@ public SchemaValidator getValidator(SchemaInline inline) { private SchemaValidatorFactory schemaValidatorFactory; private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition(); private WorkflowInstanceIdFactory idFactory; + private WorkflowScheduler scheduler; private ExecutorServiceFactory executorFactory = new DefaultExecutorServiceFactory(); private EventConsumer eventConsumer; private Collection eventPublishers = new ArrayList<>(); @@ -167,6 +171,11 @@ public Builder withExpressionFactory(ExpressionFactory factory) { return this; } + public Builder withScheduler(WorkflowScheduler scheduler) { + this.scheduler = scheduler; + return this; + } + public Builder withResourceLoaderFactory(ResourceLoaderFactory resourceLoader) { this.resourceLoaderFactory = resourceLoader; return this; @@ -257,6 +266,9 @@ public WorkflowApplication build() { if (idFactory == null) { idFactory = new MonotonicUlidWorkflowInstanceIdFactory(); } + if (scheduler == null) { + scheduler = new DefaultWorkflowScheduler(); + } return new WorkflowApplication(this); } } @@ -313,4 +325,8 @@ public ExecutorService executorService() { public boolean isLifeCycleCEPublishingEnabled() { return lifeCycleCEPublishingEnabled; } + + public WorkflowScheduler scheduler() { + return scheduler; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java index b79d86bb..4a1afeab 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowDefinition.java @@ -18,11 +18,16 @@ import static io.serverlessworkflow.impl.WorkflowUtils.*; import io.serverlessworkflow.api.types.Input; +import io.serverlessworkflow.api.types.ListenTo; import io.serverlessworkflow.api.types.Output; +import io.serverlessworkflow.api.types.Schedule; import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo; +import io.serverlessworkflow.impl.events.EventRegistrationInfo; import io.serverlessworkflow.impl.executors.TaskExecutor; import io.serverlessworkflow.impl.executors.TaskExecutorHelper; import io.serverlessworkflow.impl.resources.ResourceLoader; +import io.serverlessworkflow.impl.scheduler.ScheduledEventConsumer; import io.serverlessworkflow.impl.schema.SchemaValidator; import java.nio.file.Path; import java.util.HashMap; @@ -46,14 +51,16 @@ private WorkflowDefinition( this.workflow = workflow; this.application = application; this.resourceLoader = resourceLoader; - if (workflow.getInput() != null) { - Input input = workflow.getInput(); + + Input input = workflow.getInput(); + if (input != null) { this.inputSchemaValidator = getSchemaValidator(application.validatorFactory(), resourceLoader, input.getSchema()); this.inputFilter = buildWorkflowFilter(application, input.getFrom()); } - if (workflow.getOutput() != null) { - Output output = workflow.getOutput(); + + Output output = workflow.getOutput(); + if (output != null) { this.outputSchemaValidator = getSchemaValidator(application.validatorFactory(), resourceLoader, output.getSchema()); this.outputFilter = buildWorkflowFilter(application, output.getAs()); @@ -68,8 +75,29 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow) } static WorkflowDefinition of(WorkflowApplication application, Workflow workflow, Path path) { - return new WorkflowDefinition( - application, workflow, application.resourceLoaderFactory().getResourceLoader(path)); + WorkflowDefinition definition = + new WorkflowDefinition( + application, workflow, application.resourceLoaderFactory().getResourceLoader(path)); + Schedule schedule = workflow.getSchedule(); + if (schedule != null) { + ListenTo to = schedule.getOn(); + if (to != null) { + EventRegistrationBuilderInfo builderInfo = + EventRegistrationBuilderInfo.from(application, to, x -> null); + ScheduledEventConsumer consumer = + application.scheduler().eventConsumer(definition, application.modelFactory()::from); + WorkflowModelCollection model = application.modelFactory().createCollection(); + EventRegistrationInfo.combine( + model, + EventRegistrationInfo.build( + builderInfo.registrations(), + (ce, f) -> consumer.accept(ce, f, model), + application.eventConsumer()), + application.eventConsumer()) + .thenAccept(consumer::whenDone); + } + } + return definition; } public WorkflowInstance instance(Object input) { diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowScheduler.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowScheduler.java new file mode 100644 index 00000000..e3dde4ad --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowScheduler.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.scheduler.ScheduledEventConsumer; +import java.util.Collection; +import java.util.function.Function; + +public interface WorkflowScheduler { + Collection scheduledInstances(); + + ScheduledEventConsumer eventConsumer( + WorkflowDefinition definition, Function converter); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationBuilderCollection.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationBuilderCollection.java new file mode 100644 index 00000000..501fa7df --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationBuilderCollection.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +import java.util.Collection; + +public record EventRegistrationBuilderCollection( + Collection registrations, boolean isAnd) {} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationBuilderInfo.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationBuilderInfo.java new file mode 100644 index 00000000..77a41396 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationBuilderInfo.java @@ -0,0 +1,106 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +import io.serverlessworkflow.api.types.AllEventConsumptionStrategy; +import io.serverlessworkflow.api.types.AnyEventConsumptionStrategy; +import io.serverlessworkflow.api.types.EventConsumptionStrategy; +import io.serverlessworkflow.api.types.EventFilter; +import io.serverlessworkflow.api.types.ListenTo; +import io.serverlessworkflow.api.types.OneEventConsumptionStrategy; +import io.serverlessworkflow.api.types.Until; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowPredicate; +import java.util.Collection; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + +public record EventRegistrationBuilderInfo( + EventRegistrationBuilderCollection registrations, + EventRegistrationBuilderCollection untilRegistrations, + WorkflowPredicate until) { + + public static EventRegistrationBuilderInfo from( + WorkflowApplication application, + ListenTo to, + Function predBuilder) { + EventRegistrationBuilderCollection registrations; + EventRegistrationBuilderCollection untilRegistrations = null; + WorkflowPredicate until = null; + if (to.getAllEventConsumptionStrategy() != null) { + registrations = allEvents(to.getAllEventConsumptionStrategy(), application); + } else if (to.getAnyEventConsumptionStrategy() != null) { + AnyEventConsumptionStrategy any = to.getAnyEventConsumptionStrategy(); + registrations = anyEvents(any, application); + Until untilDesc = any.getUntil(); + if (untilDesc != null) { + until = predBuilder.apply(untilDesc); + if (until == null) { + if (untilDesc.getAnyEventUntilConsumed() != null) { + EventConsumptionStrategy strategy = untilDesc.getAnyEventUntilConsumed(); + if (strategy.getAllEventConsumptionStrategy() != null) { + untilRegistrations = + allEvents(strategy.getAllEventConsumptionStrategy(), application); + } else if (strategy.getAnyEventConsumptionStrategy() != null) { + untilRegistrations = + anyEvents(strategy.getAnyEventConsumptionStrategy(), application); + } else if (strategy.getOneEventConsumptionStrategy() != null) { + untilRegistrations = oneEvent(strategy.getOneEventConsumptionStrategy(), application); + } + } + } + } + } else { + registrations = oneEvent(to.getOneEventConsumptionStrategy(), application); + } + return new EventRegistrationBuilderInfo(registrations, untilRegistrations, until); + } + + private static EventRegistrationBuilderCollection allEvents( + AllEventConsumptionStrategy allStrategy, WorkflowApplication application) { + return new EventRegistrationBuilderCollection(from(allStrategy.getAll(), application), true); + } + + private static EventRegistrationBuilderCollection anyEvents( + AnyEventConsumptionStrategy anyStrategy, WorkflowApplication application) { + List eventFilters = anyStrategy.getAny(); + return new EventRegistrationBuilderCollection( + eventFilters.isEmpty() ? registerToAll(application) : from(eventFilters, application), + false); + } + + private static EventRegistrationBuilderCollection oneEvent( + OneEventConsumptionStrategy oneStrategy, WorkflowApplication application) { + return new EventRegistrationBuilderCollection( + List.of(from(oneStrategy.getOne(), application)), true); + } + + private static Collection registerToAll( + WorkflowApplication application) { + return application.eventConsumer().listenToAll(application); + } + + private static Collection from( + List filters, WorkflowApplication application) { + return filters.stream().map(filter -> from(filter, application)).collect(Collectors.toList()); + } + + private static EventRegistrationBuilder from( + EventFilter filter, WorkflowApplication application) { + return application.eventConsumer().listen(filter, application); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationInfo.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationInfo.java new file mode 100644 index 00000000..44afcdd7 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/EventRegistrationInfo.java @@ -0,0 +1,63 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.events; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelCollection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; + +public record EventRegistrationInfo( + CompletableFuture completableFuture, Collection registrations) { + + public static final CompletableFuture combine( + WorkflowModelCollection output, EventRegistrationInfo info, EventConsumer eventConsumer) { + return info.completableFuture() + .thenApply( + v -> { + info.registrations().forEach(reg -> eventConsumer.unregister(reg)); + return output; + }); + } + + public static final EventRegistrationInfo build( + EventRegistrationBuilderCollection builderInfo, + BiConsumer> consumer, + EventConsumer eventConsumer) { + Collection registrations = new ArrayList(); + CompletableFuture[] futures = + builderInfo.registrations().stream() + .map(reg -> toCompletable(reg, registrations, consumer, eventConsumer)) + .toArray(size -> new CompletableFuture[size]); + CompletableFuture future = + builderInfo.isAnd() ? CompletableFuture.allOf(futures) : CompletableFuture.anyOf(futures); + return new EventRegistrationInfo(future, registrations); + } + + private static final CompletableFuture toCompletable( + EventRegistrationBuilder regBuilder, + Collection registrations, + BiConsumer> ceConsumer, + EventConsumer eventConsumer) { + final CompletableFuture future = new CompletableFuture<>(); + registrations.add( + eventConsumer.register(regBuilder, ce -> ceConsumer.accept((CloudEvent) ce, future))); + return future; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java index 5fb29901..c5eccd47 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ListenExecutor.java @@ -16,15 +16,9 @@ package io.serverlessworkflow.impl.executors; import io.cloudevents.CloudEvent; -import io.serverlessworkflow.api.types.AllEventConsumptionStrategy; -import io.serverlessworkflow.api.types.AnyEventConsumptionStrategy; -import io.serverlessworkflow.api.types.EventConsumptionStrategy; -import io.serverlessworkflow.api.types.EventFilter; import io.serverlessworkflow.api.types.ListenTask; import io.serverlessworkflow.api.types.ListenTaskConfiguration; import io.serverlessworkflow.api.types.ListenTaskConfiguration.ListenAndReadAs; -import io.serverlessworkflow.api.types.ListenTo; -import io.serverlessworkflow.api.types.OneEventConsumptionStrategy; import io.serverlessworkflow.api.types.SubscriptionIterator; import io.serverlessworkflow.api.types.Until; import io.serverlessworkflow.impl.TaskContext; @@ -38,79 +32,34 @@ import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.WorkflowUtils; import io.serverlessworkflow.impl.events.EventConsumer; -import io.serverlessworkflow.impl.events.EventRegistration; -import io.serverlessworkflow.impl.events.EventRegistrationBuilder; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; +import io.serverlessworkflow.impl.events.EventRegistrationBuilderCollection; +import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo; +import io.serverlessworkflow.impl.events.EventRegistrationInfo; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.function.Function; -import java.util.stream.Collectors; public abstract class ListenExecutor extends RegularTaskExecutor { - protected final EventRegistrationBuilderCollection regBuilders; + protected final EventRegistrationBuilderInfo builderRegistrationInfo; protected final Optional> loop; protected final Function converter; protected final EventConsumer eventConsumer; - private static record EventRegistrationBuilderCollection( - Collection registrations, boolean isAnd) {} - public static class ListenExecutorBuilder extends RegularTaskExecutorBuilder { - private EventRegistrationBuilderCollection registrations; - private WorkflowPredicate until; - private EventRegistrationBuilderCollection untilRegistrations; + private EventRegistrationBuilderInfo registrationInfo; private TaskExecutor loop; private Function converter = ce -> application.modelFactory().from(ce.getData()); - private EventRegistrationBuilderCollection allEvents(AllEventConsumptionStrategy allStrategy) { - return new EventRegistrationBuilderCollection(from(allStrategy.getAll()), true); - } - - private EventRegistrationBuilderCollection anyEvents(AnyEventConsumptionStrategy anyStrategy) { - List eventFilters = anyStrategy.getAny(); - return new EventRegistrationBuilderCollection( - eventFilters.isEmpty() ? registerToAll() : from(eventFilters), false); - } - - private EventRegistrationBuilderCollection oneEvent(OneEventConsumptionStrategy oneStrategy) { - return new EventRegistrationBuilderCollection(List.of(from(oneStrategy.getOne())), true); - } - protected ListenExecutorBuilder( WorkflowMutablePosition position, ListenTask task, WorkflowDefinition definition) { super(position, task, definition); ListenTaskConfiguration listen = task.getListen(); - ListenTo to = listen.getTo(); - if (to.getAllEventConsumptionStrategy() != null) { - registrations = allEvents(to.getAllEventConsumptionStrategy()); - } else if (to.getAnyEventConsumptionStrategy() != null) { - AnyEventConsumptionStrategy any = to.getAnyEventConsumptionStrategy(); - registrations = anyEvents(any); - Until untilDesc = any.getUntil(); - if (untilDesc != null) { - until = buildUntilPredicate(untilDesc); - if (until == null) { - if (untilDesc.getAnyEventUntilConsumed() != null) { - EventConsumptionStrategy strategy = untilDesc.getAnyEventUntilConsumed(); - if (strategy.getAllEventConsumptionStrategy() != null) { - untilRegistrations = allEvents(strategy.getAllEventConsumptionStrategy()); - } else if (strategy.getAnyEventConsumptionStrategy() != null) { - untilRegistrations = anyEvents(strategy.getAnyEventConsumptionStrategy()); - } else if (strategy.getOneEventConsumptionStrategy() != null) { - untilRegistrations = oneEvent(strategy.getOneEventConsumptionStrategy()); - } - } - } - } - } else if (to.getOneEventConsumptionStrategy() != null) { - registrations = oneEvent(to.getOneEventConsumptionStrategy()); - } + registrationInfo = + EventRegistrationBuilderInfo.from(application, listen.getTo(), this::buildUntilPredicate); SubscriptionIterator forEach = task.getForeach(); if (forEach != null) { loop = TaskExecutorHelper.createExecutorList(position, forEach.getDo(), definition); @@ -134,21 +83,11 @@ protected WorkflowPredicate buildUntilPredicate(Until until) { : null; } - private Collection registerToAll() { - return application.eventConsumer().listenToAll(application); - } - - private Collection from(List filters) { - return filters.stream().map(this::from).collect(Collectors.toList()); - } - - private EventRegistrationBuilder from(EventFilter filter) { - return application.eventConsumer().listen(filter, application); - } - @Override public ListenExecutor buildInstance() { - return registrations.isAnd() ? new AndListenExecutor(this) : new OrListenExecutor(this); + return registrationInfo.registrations().isAnd() + ? new AndListenExecutor(this) + : new OrListenExecutor(this); } } @@ -176,28 +115,27 @@ public static class OrListenExecutor extends ListenExecutor { public OrListenExecutor(ListenExecutorBuilder builder) { super(builder); - this.until = Optional.ofNullable(builder.until); - this.untilRegBuilders = builder.untilRegistrations; + this.until = Optional.ofNullable(builder.registrationInfo.until()); + this.untilRegBuilders = builder.registrationInfo.untilRegistrations(); } @Override - protected CompletableFuture buildFuture( - EventRegistrationBuilderCollection regCollection, - Collection registrations, + protected EventRegistrationInfo buildInfo( BiConsumer> consumer) { - CompletableFuture combinedFuture = - super.buildFuture(regCollection, registrations, consumer); + EventRegistrationInfo info = super.buildInfo(consumer); if (untilRegBuilders != null) { - Collection untilRegistrations = new ArrayList<>(); - CompletableFuture untilFuture = - combine(untilRegBuilders, untilRegistrations, (ce, f) -> f.complete(null)); - untilFuture.thenAccept( - v -> { - combinedFuture.complete(null); - untilRegistrations.forEach(reg -> eventConsumer.unregister(reg)); - }); + EventRegistrationInfo untilInfo = + EventRegistrationInfo.build( + untilRegBuilders, (ce, f) -> f.complete(null), eventConsumer); + untilInfo + .completableFuture() + .thenAccept( + v -> { + info.completableFuture().complete(null); + untilInfo.registrations().forEach(reg -> eventConsumer.unregister(reg)); + }); } - return combinedFuture; + return info; } protected void internalProcessCe( @@ -228,49 +166,20 @@ protected CompletableFuture internalExecute( WorkflowContext workflow, TaskContext taskContext) { WorkflowModelCollection output = workflow.definition().application().modelFactory().createCollection(); - Collection registrations = new ArrayList<>(); ((WorkflowMutableInstance) workflow.instance()).status(WorkflowStatus.WAITING); - return buildFuture( - regBuilders, - registrations, + return EventRegistrationInfo.combine( + output, + buildInfo( (BiConsumer>) ((ce, future) -> - processCe(converter.apply(ce), output, workflow, taskContext, future))) - .thenApply( - v -> { - registrations.forEach(reg -> eventConsumer.unregister(reg)); - return output; - }); + processCe(converter.apply(ce), output, workflow, taskContext, future))), + eventConsumer); } - protected CompletableFuture buildFuture( - EventRegistrationBuilderCollection regCollection, - Collection registrations, + protected EventRegistrationInfo buildInfo( BiConsumer> consumer) { - return combine(regCollection, registrations, consumer); - } - - protected final CompletableFuture combine( - EventRegistrationBuilderCollection regCollection, - Collection registrations, - BiConsumer> consumer) { - CompletableFuture[] futures = - regCollection.registrations().stream() - .map(reg -> toCompletable(reg, registrations, consumer)) - .toArray(size -> new CompletableFuture[size]); - return regCollection.isAnd() - ? CompletableFuture.allOf(futures) - : CompletableFuture.anyOf(futures); - } - - private CompletableFuture toCompletable( - EventRegistrationBuilder regBuilder, - Collection registrations, - BiConsumer> ceConsumer) { - final CompletableFuture future = new CompletableFuture<>(); - registrations.add( - eventConsumer.register(regBuilder, ce -> ceConsumer.accept((CloudEvent) ce, future))); - return future; + return EventRegistrationInfo.build( + builderRegistrationInfo.registrations(), consumer, eventConsumer); } private void processCe( @@ -299,7 +208,7 @@ private void processCe( protected ListenExecutor(ListenExecutorBuilder builder) { super(builder); this.eventConsumer = builder.application.eventConsumer(); - this.regBuilders = builder.registrations; + this.builderRegistrationInfo = builder.registrationInfo; this.loop = Optional.ofNullable(builder.loop); this.converter = builder.converter; } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java new file mode 100644 index 00000000..eb553e7c --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/DefaultWorkflowScheduler.java @@ -0,0 +1,47 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scheduler; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowScheduler; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.function.Function; + +public class DefaultWorkflowScheduler implements WorkflowScheduler { + + private Collection instances = new ArrayList<>(); + + @Override + public Collection scheduledInstances() { + return Collections.unmodifiableCollection(instances); + } + + @Override + public ScheduledEventConsumer eventConsumer( + WorkflowDefinition definition, Function converter) { + return new ScheduledEventConsumer(definition, converter) { + @Override + protected void addScheduledInstance(WorkflowInstance instance) { + instances.add(instance); + } + }; + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java new file mode 100644 index 00000000..fafcfdf6 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/scheduler/ScheduledEventConsumer.java @@ -0,0 +1,51 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.scheduler; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelCollection; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +public abstract class ScheduledEventConsumer { + + private final Function converter; + private final WorkflowDefinition definition; + + protected ScheduledEventConsumer( + WorkflowDefinition definition, Function converter) { + this.definition = definition; + this.converter = converter; + } + + public void accept( + CloudEvent t, CompletableFuture u, WorkflowModelCollection col) { + WorkflowModel model = converter.apply(t); + u.complete(model); + col.add(model); + } + + public void whenDone(WorkflowModel model) { + WorkflowInstance instance = definition.instance(model); + addScheduledInstance(instance); + instance.start(); + } + + protected abstract void addScheduledInstance(WorkflowInstance instace); +} diff --git a/impl/test/src/test/resources/workflows-samples/listen-start.yaml b/impl/test/src/test/resources/workflows-samples/listen-start.yaml new file mode 100644 index 00000000..3b5babeb --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/listen-start.yaml @@ -0,0 +1,15 @@ +document: + dsl: '1.0.1' + namespace: examples + name: event-driven-schedule + version: '0.1.0' +schedule: + on: + one: + with: + type: com.example.hospital.events.patients.recover +do: + - recovered: + set: + name: ${ $workflow.input[0].data.name} + \ No newline at end of file