Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory;
import io.serverlessworkflow.impl.resources.ResourceLoaderFactory;
import io.serverlessworkflow.impl.resources.StaticResource;
import io.serverlessworkflow.impl.scheduler.DefaultWorkflowScheduler;
import io.serverlessworkflow.impl.schema.SchemaValidator;
import io.serverlessworkflow.impl.schema.SchemaValidatorFactory;
import java.util.ArrayList;
Expand Down Expand Up @@ -59,6 +60,7 @@ public class WorkflowApplication implements AutoCloseable {
private final Collection<EventPublisher> eventPublishers;
private final boolean lifeCycleCEPublishingEnabled;
private final WorkflowModelFactory modelFactory;
private final WorkflowScheduler scheduler;

private WorkflowApplication(Builder builder) {
this.taskFactory = builder.taskFactory;
Expand All @@ -75,6 +77,7 @@ private WorkflowApplication(Builder builder) {
this.eventPublishers = builder.eventPublishers;
this.lifeCycleCEPublishingEnabled = builder.lifeCycleCEPublishingEnabled;
this.modelFactory = builder.modelFactory;
this.scheduler = builder.scheduler;
}

public TaskExecutorFactory taskFactory() {
Expand Down Expand Up @@ -142,6 +145,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
private SchemaValidatorFactory schemaValidatorFactory;
private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition();
private WorkflowInstanceIdFactory idFactory;
private WorkflowScheduler scheduler;
private ExecutorServiceFactory executorFactory = new DefaultExecutorServiceFactory();
private EventConsumer<?, ?> eventConsumer;
private Collection<EventPublisher> eventPublishers = new ArrayList<>();
Expand All @@ -167,6 +171,11 @@ public Builder withExpressionFactory(ExpressionFactory factory) {
return this;
}

public Builder withScheduler(WorkflowScheduler scheduler) {
this.scheduler = scheduler;
return this;
}

public Builder withResourceLoaderFactory(ResourceLoaderFactory resourceLoader) {
this.resourceLoaderFactory = resourceLoader;
return this;
Expand Down Expand Up @@ -257,6 +266,9 @@ public WorkflowApplication build() {
if (idFactory == null) {
idFactory = new MonotonicUlidWorkflowInstanceIdFactory();
}
if (scheduler == null) {
scheduler = new DefaultWorkflowScheduler();
}
return new WorkflowApplication(this);
}
}
Expand Down Expand Up @@ -313,4 +325,8 @@ public ExecutorService executorService() {
public boolean isLifeCycleCEPublishingEnabled() {
return lifeCycleCEPublishingEnabled;
}

public WorkflowScheduler scheduler() {
return scheduler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
import static io.serverlessworkflow.impl.WorkflowUtils.*;

import io.serverlessworkflow.api.types.Input;
import io.serverlessworkflow.api.types.ListenTo;
import io.serverlessworkflow.api.types.Output;
import io.serverlessworkflow.api.types.Schedule;
import io.serverlessworkflow.api.types.Workflow;
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
import io.serverlessworkflow.impl.events.EventRegistrationInfo;
import io.serverlessworkflow.impl.executors.TaskExecutor;
import io.serverlessworkflow.impl.executors.TaskExecutorHelper;
import io.serverlessworkflow.impl.resources.ResourceLoader;
import io.serverlessworkflow.impl.scheduler.ScheduledEventConsumer;
import io.serverlessworkflow.impl.schema.SchemaValidator;
import java.nio.file.Path;
import java.util.HashMap;
Expand All @@ -46,14 +51,16 @@ private WorkflowDefinition(
this.workflow = workflow;
this.application = application;
this.resourceLoader = resourceLoader;
if (workflow.getInput() != null) {
Input input = workflow.getInput();

Input input = workflow.getInput();
if (input != null) {
this.inputSchemaValidator =
getSchemaValidator(application.validatorFactory(), resourceLoader, input.getSchema());
this.inputFilter = buildWorkflowFilter(application, input.getFrom());
}
if (workflow.getOutput() != null) {
Output output = workflow.getOutput();

Output output = workflow.getOutput();
if (output != null) {
this.outputSchemaValidator =
getSchemaValidator(application.validatorFactory(), resourceLoader, output.getSchema());
this.outputFilter = buildWorkflowFilter(application, output.getAs());
Expand All @@ -68,8 +75,23 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow)
}

static WorkflowDefinition of(WorkflowApplication application, Workflow workflow, Path path) {
return new WorkflowDefinition(
application, workflow, application.resourceLoaderFactory().getResourceLoader(path));
WorkflowDefinition definition =
new WorkflowDefinition(
application, workflow, application.resourceLoaderFactory().getResourceLoader(path));
Schedule schedule = workflow.getSchedule();
if (schedule != null) {
ListenTo to = schedule.getOn();
if (to != null) {
EventRegistrationBuilderInfo builderInfo =
EventRegistrationBuilderInfo.from(application, to, x -> null);
ScheduledEventConsumer consumer = application.scheduler().eventConsumer(definition);
EventRegistrationInfo info =
EventRegistrationInfo.combine(
builderInfo.registrations(), consumer, application.eventConsumer());
info.completableFuture().thenAccept(consumer::whenDone);
}
}
return definition;
}

public WorkflowInstance instance(Object input) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl;

import io.serverlessworkflow.impl.scheduler.ScheduledEventConsumer;
import java.util.Collection;

public interface WorkflowScheduler {
Collection<WorkflowInstance> scheduledInstances();

ScheduledEventConsumer eventConsumer(WorkflowDefinition definition);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.events;

import java.util.Collection;

public record EventRegistrationBuilderCollection(
Collection<EventRegistrationBuilder> registrations, boolean isAnd) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.events;

import io.serverlessworkflow.api.types.AllEventConsumptionStrategy;
import io.serverlessworkflow.api.types.AnyEventConsumptionStrategy;
import io.serverlessworkflow.api.types.EventConsumptionStrategy;
import io.serverlessworkflow.api.types.EventFilter;
import io.serverlessworkflow.api.types.ListenTo;
import io.serverlessworkflow.api.types.OneEventConsumptionStrategy;
import io.serverlessworkflow.api.types.Until;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowPredicate;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

public record EventRegistrationBuilderInfo(
EventRegistrationBuilderCollection registrations,
EventRegistrationBuilderCollection untilRegistrations,
WorkflowPredicate until) {

public static EventRegistrationBuilderInfo from(
WorkflowApplication application,
ListenTo to,
Function<Until, WorkflowPredicate> predBuilder) {
EventRegistrationBuilderCollection registrations;
EventRegistrationBuilderCollection untilRegistrations = null;
WorkflowPredicate until = null;
if (to.getAllEventConsumptionStrategy() != null) {
registrations = allEvents(to.getAllEventConsumptionStrategy(), application);
} else if (to.getAnyEventConsumptionStrategy() != null) {
AnyEventConsumptionStrategy any = to.getAnyEventConsumptionStrategy();
registrations = anyEvents(any, application);
Until untilDesc = any.getUntil();
if (untilDesc != null) {
until = predBuilder.apply(untilDesc);
if (until == null) {
if (untilDesc.getAnyEventUntilConsumed() != null) {
EventConsumptionStrategy strategy = untilDesc.getAnyEventUntilConsumed();
if (strategy.getAllEventConsumptionStrategy() != null) {
untilRegistrations =
allEvents(strategy.getAllEventConsumptionStrategy(), application);
} else if (strategy.getAnyEventConsumptionStrategy() != null) {
untilRegistrations =
anyEvents(strategy.getAnyEventConsumptionStrategy(), application);
} else if (strategy.getOneEventConsumptionStrategy() != null) {
untilRegistrations = oneEvent(strategy.getOneEventConsumptionStrategy(), application);
}
}
}
}
} else {
registrations = oneEvent(to.getOneEventConsumptionStrategy(), application);
}
return new EventRegistrationBuilderInfo(registrations, untilRegistrations, until);
}

private static EventRegistrationBuilderCollection allEvents(
AllEventConsumptionStrategy allStrategy, WorkflowApplication application) {
return new EventRegistrationBuilderCollection(from(allStrategy.getAll(), application), true);
}

private static EventRegistrationBuilderCollection anyEvents(
AnyEventConsumptionStrategy anyStrategy, WorkflowApplication application) {
List<EventFilter> eventFilters = anyStrategy.getAny();
return new EventRegistrationBuilderCollection(
eventFilters.isEmpty() ? registerToAll(application) : from(eventFilters, application),
false);
}

private static EventRegistrationBuilderCollection oneEvent(
OneEventConsumptionStrategy oneStrategy, WorkflowApplication application) {
return new EventRegistrationBuilderCollection(
List.of(from(oneStrategy.getOne(), application)), true);
}

private static Collection<EventRegistrationBuilder> registerToAll(
WorkflowApplication application) {
return application.eventConsumer().listenToAll(application);
}

private static Collection<EventRegistrationBuilder> from(
List<EventFilter> filters, WorkflowApplication application) {
return filters.stream().map(filter -> from(filter, application)).collect(Collectors.toList());
}

private static EventRegistrationBuilder from(
EventFilter filter, WorkflowApplication application) {
return application.eventConsumer().listen(filter, application);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.events;

import io.cloudevents.CloudEvent;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

public record EventRegistrationInfo(
CompletableFuture<?> completableFuture, Collection<EventRegistration> registrations) {

public static final <T> EventRegistrationInfo build(
EventRegistrationBuilderCollection builderInfo,
BiConsumer<CloudEvent, CompletableFuture<T>> consumer,
EventConsumer eventConsumer) {
Collection<EventRegistration> registrations = new ArrayList();
CompletableFuture<T>[] futures =
builderInfo.registrations().stream()
.map(reg -> toCompletable(reg, registrations, consumer, eventConsumer))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture<?> future =
builderInfo.isAnd() ? CompletableFuture.allOf(futures) : CompletableFuture.anyOf(futures);
return new EventRegistrationInfo(future, registrations);
}

public static final <T> EventRegistrationInfo combine(
EventRegistrationBuilderCollection builderInfo,
BiConsumer<CloudEvent, CompletableFuture<T>> consumer,
EventConsumer eventConsumer) {
Collection<EventRegistration> registrations = new ArrayList();
CompletableFuture<T>[] futures =
builderInfo.registrations().stream()
.map(reg -> toCompletable(reg, registrations, consumer, eventConsumer))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture<?> future =
builderInfo.isAnd() ? CompletableFuture.allOf(futures) : CompletableFuture.anyOf(futures);
return new EventRegistrationInfo(future, registrations);
}

private static final <T> CompletableFuture<T> toCompletable(
EventRegistrationBuilder regBuilder,
Collection<EventRegistration> registrations,
BiConsumer<CloudEvent, CompletableFuture<T>> ceConsumer,
EventConsumer eventConsumer) {
final CompletableFuture<T> future = new CompletableFuture<>();
registrations.add(
eventConsumer.register(regBuilder, ce -> ceConsumer.accept((CloudEvent) ce, future)));
return future;
}
}
Loading
Loading