Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
import io.serverlessworkflow.impl.resources.ResourceLoaderFactory;
import io.serverlessworkflow.impl.resources.URITemplateResolver;
import io.serverlessworkflow.impl.scheduler.AllStrategyCorrelationInfoFactory;
import io.serverlessworkflow.impl.scheduler.DefaultWorkflowScheduler;
import io.serverlessworkflow.impl.scheduler.InMemoryAllStrategyCorrelationInfo;
import io.serverlessworkflow.impl.scheduler.WorkflowScheduler;
import io.serverlessworkflow.impl.schema.SchemaValidator;
import io.serverlessworkflow.impl.schema.SchemaValidatorFactory;
Expand Down Expand Up @@ -93,6 +95,7 @@ public class WorkflowApplication implements AutoCloseable {
private final URI defaultCatalogURI;
private final Collection<CallableTaskProxyBuilder> callableProxyBuilders;
private final CloudEventPredicateFactory cloudEventPredicateFactory;
private final AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory;

private WorkflowApplication(Builder builder) {
this.taskFactory = builder.taskFactory;
Expand Down Expand Up @@ -122,6 +125,7 @@ private WorkflowApplication(Builder builder) {
this.id = builder.id;
this.callableProxyBuilders = builder.callableProxyBuilders;
this.cloudEventPredicateFactory = builder.cloudEventPredicateFactory;
this.allStrategyCorrelationInfoFactory = builder.allStrategyCorrelationInfoFactory;
}

public TaskExecutorFactory taskFactory() {
Expand Down Expand Up @@ -240,6 +244,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
private Optional<FunctionReader> functionReader;
private URI defaultCatalogURI;
private CloudEventPredicateFactory cloudEventPredicateFactory;
private AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory;

private Builder() {
ServiceLoader.load(NamedWorkflowAdditionalObject.class)
Expand Down Expand Up @@ -372,6 +377,12 @@ public Builder withCloudEventPredicateFactory(
return this;
}

public Builder withAllStrategyCorrelationInfoFactory(
AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory) {
this.allStrategyCorrelationInfoFactory = allStrategyCorrelationInfoFactory;
return this;
}
Comment thread
fjtirado marked this conversation as resolved.

public WorkflowApplication build() {

if (modelFactory == null) {
Expand Down Expand Up @@ -432,6 +443,10 @@ public WorkflowApplication build() {
loadFirst(CloudEventPredicateFactory.class)
.orElseGet(() -> new DefaultCloudEventPredicateFactory());
}
if (allStrategyCorrelationInfoFactory == null) {
allStrategyCorrelationInfoFactory = definition -> new InMemoryAllStrategyCorrelationInfo();
}

if (defaultCatalogURI == null) {
defaultCatalogURI = URI.create("https://github.com/serverlessworkflow/catalog");
}
Expand Down Expand Up @@ -559,4 +574,8 @@ public <T> Optional<T> additionalObject(
public Collection<CallableTaskProxyBuilder> callableProxyBuilders() {
return callableProxyBuilders;
}

public AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory() {
return allStrategyCorrelationInfoFactory;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.scheduler;

import io.cloudevents.CloudEvent;
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
import java.util.Collection;
import java.util.function.Consumer;

public interface AllStrategyCorrelationInfo extends AutoCloseable {
void correlate(
EventRegistrationBuilder reg, CloudEvent event, Consumer<Collection<CloudEvent>> starter);

void register(EventRegistrationBuilder reg);

default void close() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.scheduler;

import io.serverlessworkflow.impl.WorkflowDefinition;
import java.util.function.Function;

public interface AllStrategyCorrelationInfoFactory
extends Function<WorkflowDefinition, AllStrategyCorrelationInfo> {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.scheduler;

import io.cloudevents.CloudEvent;
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

public class InMemoryAllStrategyCorrelationInfo implements AllStrategyCorrelationInfo {

private Map<EventRegistrationBuilder, List<CloudEvent>> correlatedEvents;

@Override
public void correlate(
EventRegistrationBuilder reg, CloudEvent event, Consumer<Collection<CloudEvent>> starter) {
Collection<CloudEvent> collection = new ArrayList<>();
// to minimize the critical section, conversion is done later, here we are
// performing just collection, if any
synchronized (correlatedEvents) {
correlatedEvents.get(reg).add(event);
Collection<List<CloudEvent>> events = correlatedEvents.values();
if (satisfyCondition(events)) {
for (List<CloudEvent> values : events) {
collection.add(values.remove(0));
}
}
}
if (!collection.isEmpty()) {
starter.accept(collection);
Comment thread
fjtirado marked this conversation as resolved.
}
}

@Override
public void register(EventRegistrationBuilder reg) {
if (correlatedEvents == null) {
correlatedEvents = new HashMap<>();
}
correlatedEvents.put(reg, new ArrayList<CloudEvent>());
}

private boolean satisfyCondition(Collection<List<CloudEvent>> events) {
for (List<CloudEvent> values : events) {
if (values.isEmpty()) {
return false;
}
}
return true;
}

@Override
public void close() {
if (correlatedEvents != null) {
correlatedEvents.clear();
}
Comment thread
fjtirado marked this conversation as resolved.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,17 @@
*/
package io.serverlessworkflow.impl.scheduler;

import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;

import io.cloudevents.CloudEvent;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowModelCollection;
import io.serverlessworkflow.impl.events.EventConsumer;
import io.serverlessworkflow.impl.events.EventRegistration;
import io.serverlessworkflow.impl.events.EventRegistrationBuilder;
import io.serverlessworkflow.impl.events.EventRegistrationBuilderInfo;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

public class ScheduledEventConsumer implements AutoCloseable {
Expand All @@ -37,8 +35,8 @@ public class ScheduledEventConsumer implements AutoCloseable {
private final EventRegistrationBuilderInfo builderInfo;
private final EventConsumer eventConsumer;
private final ScheduledInstanceRunnable instanceRunner;
private Map<EventRegistrationBuilder, List<CloudEvent>> correlatedEvents;
private Collection<EventRegistration> registrations = new ArrayList<>();
private final Collection<EventRegistration> registrations = new ArrayList<>();
private AllStrategyCorrelationInfo allStrategyCorrelationInfo;

public ScheduledEventConsumer(
WorkflowDefinition definition,
Expand All @@ -50,17 +48,23 @@ public ScheduledEventConsumer(
this.builderInfo = builderInfo;
this.instanceRunner = instanceRunner;
this.eventConsumer = definition.application().eventConsumer();

if (builderInfo.registrations().isAnd()
Comment thread
fjtirado marked this conversation as resolved.
&& builderInfo.registrations().registrations().size() > 1) {
this.correlatedEvents = new HashMap<>();
this.allStrategyCorrelationInfo =
definition.application().allStrategyCorrelationInfoFactory().apply(definition);
builderInfo
.registrations()
.registrations()
.forEach(
reg -> {
correlatedEvents.put(reg, new ArrayList<>());
allStrategyCorrelationInfo.register(reg);
registrations.add(
eventConsumer.register(reg, ce -> consumeEvent(reg, (CloudEvent) ce)));
eventConsumer.register(
reg,
ce ->
allStrategyCorrelationInfo.correlate(
reg, (CloudEvent) ce, this::start)));
});
} else {
builderInfo
Expand All @@ -71,34 +75,6 @@ public ScheduledEventConsumer(
}
}

private void consumeEvent(EventRegistrationBuilder reg, CloudEvent ce) {
Collection<Collection<CloudEvent>> collections = new ArrayList<>();
// to minimize the critical section, conversion is done later, here we are
// performing
// just collection, if any
synchronized (correlatedEvents) {
correlatedEvents.get(reg).add((CloudEvent) ce);
while (satisfyCondition()) {
Collection<CloudEvent> collection = new ArrayList<>();
for (List<CloudEvent> values : correlatedEvents.values()) {
collection.add(values.remove(0));
}
collections.add(collection);
}
}
// convert and start outside synchronized
collections.forEach(this::start);
}

private boolean satisfyCondition() {
for (List<CloudEvent> values : correlatedEvents.values()) {
if (values.isEmpty()) {
return false;
}
}
return true;
}

protected void start(CloudEvent ce) {
WorkflowModelCollection model = definition.application().modelFactory().createCollection();
model.add(converter.apply(ce));
Expand All @@ -112,9 +88,7 @@ protected void start(Collection<CloudEvent> ces) {
}

public void close() {
if (correlatedEvents != null) {
correlatedEvents.clear();
}
registrations.forEach(eventConsumer::unregister);
safeClose(allStrategyCorrelationInfo);
}
}
Loading