Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener;
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener;
import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListenerAdapter;
import io.serverlessworkflow.impl.lifecycle.ce.DefaultLifeCycleCloudEventFactory;
import io.serverlessworkflow.impl.lifecycle.ce.WorkflowLifeCycleCloudEventFactory;
import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory;
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
import io.serverlessworkflow.impl.resources.ResourceLoaderFactory;
Expand Down Expand Up @@ -96,6 +98,7 @@ public class WorkflowApplication implements AutoCloseable {
private final Collection<CallableTaskProxyBuilder> callableProxyBuilders;
private final CloudEventPredicateFactory cloudEventPredicateFactory;
private final AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory;
private final WorkflowLifeCycleCloudEventFactory lifeCycleCloudEventFactory;

private WorkflowApplication(Builder builder) {
this.taskFactory = builder.taskFactory;
Expand Down Expand Up @@ -126,6 +129,7 @@ private WorkflowApplication(Builder builder) {
this.callableProxyBuilders = builder.callableProxyBuilders;
this.cloudEventPredicateFactory = builder.cloudEventPredicateFactory;
this.allStrategyCorrelationInfoFactory = builder.allStrategyCorrelationInfoFactory;
this.lifeCycleCloudEventFactory = builder.lifeCycleCloudEventFactory;
}

public TaskExecutorFactory taskFactory() {
Expand Down Expand Up @@ -245,6 +249,7 @@ public SchemaValidator getValidator(SchemaInline inline) {
private URI defaultCatalogURI;
private CloudEventPredicateFactory cloudEventPredicateFactory;
private AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory;
private WorkflowLifeCycleCloudEventFactory lifeCycleCloudEventFactory;

private Builder() {
ServiceLoader.load(NamedWorkflowAdditionalObject.class)
Expand Down Expand Up @@ -383,6 +388,12 @@ public Builder withAllStrategyCorrelationInfoFactory(
return this;
}

public Builder withLifeCycleCloudEventFactory(
WorkflowLifeCycleCloudEventFactory lifeCycleCloudEventFactory) {
this.lifeCycleCloudEventFactory = lifeCycleCloudEventFactory;
return this;
}

public WorkflowApplication build() {

if (modelFactory == null) {
Expand Down Expand Up @@ -443,6 +454,9 @@ public WorkflowApplication build() {
loadFirst(CloudEventPredicateFactory.class)
.orElseGet(() -> new DefaultCloudEventPredicateFactory());
}
if (lifeCycleCloudEventFactory == null) {
lifeCycleCloudEventFactory = new DefaultLifeCycleCloudEventFactory();
}
if (allStrategyCorrelationInfoFactory == null) {
allStrategyCorrelationInfoFactory =
definition -> InMemoryAllStrategyCorrelationInfo.instance();
Expand Down Expand Up @@ -579,4 +593,8 @@ public Collection<CallableTaskProxyBuilder> callableProxyBuilders() {
public AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory() {
return allStrategyCorrelationInfoFactory;
}

public WorkflowLifeCycleCloudEventFactory lifeCycleCloudEventFactory() {
return lifeCycleCloudEventFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,16 @@
import static io.serverlessworkflow.impl.LifecycleEvents.WORKFLOW_STARTED;
import static io.serverlessworkflow.impl.LifecycleEvents.WORKFLOW_STATUS_CHANGED;
import static io.serverlessworkflow.impl.LifecycleEvents.WORKFLOW_SUSPENDED;
import static io.serverlessworkflow.impl.WorkflowError.error;
import static io.serverlessworkflow.impl.lifecycle.ce.WorkflowDefinitionCEData.ref;

import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.data.PojoCloudEventData;
import io.cloudevents.core.data.PojoCloudEventData.ToBytes;
import io.serverlessworkflow.impl.WorkflowApplication;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.events.CloudEventUtils;
import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent;
import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
import io.serverlessworkflow.impl.lifecycle.TaskEvent;
import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent;
import io.serverlessworkflow.impl.lifecycle.TaskResumedEvent;
import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent;
Expand Down Expand Up @@ -82,202 +78,177 @@ public static Collection<String> getLifeCycleTypes() {
WORKFLOW_STATUS_CHANGED);
}

private WorkflowLifeCycleCloudEventFactory lifeCycleFactory(WorkflowEvent ev) {
return ev.workflowContext().definition().application().lifeCycleCloudEventFactory();
}

@Override
public void onTaskStarted(TaskStartedEvent event) {
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
publish(
event,
ev ->
builder()
.withData(
cloudEventData(
new TaskStartedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
this::convert))
.withType(TASK_STARTED)
.build());
factory.build(
builder()
.withData(cloudEventData(factory.build(event), this::convert))
.withType(TASK_STARTED)));
}

@Override
public void onTaskRetried(TaskRetriedEvent event) {
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
publish(
event,
ev ->
builder()
.withData(
cloudEventData(
new TaskRetriedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
this::convert))
.withType(TASK_STARTED)
.build());
factory.build(
builder()
.withData(cloudEventData(factory.build(event), this::convert))
.withType(TASK_RETRIED)));
Comment thread
fjtirado marked this conversation as resolved.
}

@Override
public void onTaskCompleted(TaskCompletedEvent event) {
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
publish(
event,
ev ->
builder()
.withData(
cloudEventData(
new TaskCompletedCEData(
id(ev), pos(ev), ref(ev), ev.eventDate(), output(ev)),
this::convert))
.withType(TASK_COMPLETED)
.build());
factory.build(
builder()
.withData(cloudEventData(factory.build(event), this::convert))
.withType(TASK_COMPLETED)));
}

@Override
public void onTaskSuspended(TaskSuspendedEvent event) {
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
publish(
event,
ev ->
builder()
.withData(
cloudEventData(
new TaskSuspendedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
this::convert))
.withType(TASK_SUSPENDED)
.build());
factory.build(
builder()
.withData(cloudEventData(factory.build(event), this::convert))
.withType(TASK_SUSPENDED)));
}

@Override
public void onTaskResumed(TaskResumedEvent event) {
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
publish(
event,
ev ->
builder()
.withData(
cloudEventData(
new TaskResumedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
this::convert))
.withType(TASK_RESUMED)
.build());
factory.build(
builder()
.withData(cloudEventData(factory.build(event), this::convert))
.withType(TASK_RESUMED)));
}

@Override
public void onTaskCancelled(TaskCancelledEvent event) {
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
publish(
event,
ev ->
builder()
.withData(
cloudEventData(
new TaskCancelledCEData(id(ev), pos(ev), ref(ev), ev.eventDate()),
this::convert))
.withType(TASK_CANCELLED)
.build());
factory.build(
builder()
.withData(cloudEventData(factory.build(event), this::convert))
.withType(TASK_CANCELLED)));
}

@Override
public void onTaskFailed(TaskFailedEvent event) {
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
publish(
event,
ev ->
builder()
.withData(
cloudEventData(
new TaskFailedCEData(id(ev), pos(ev), ref(ev), ev.eventDate(), error(ev)),
this::convert))
.withType(TASK_FAULTED)
.build());
factory.build(
builder()
.withData(cloudEventData(factory.build(event), this::convert))
.withType(TASK_FAULTED)));
}

@Override
public void onWorkflowStarted(WorkflowStartedEvent event) {
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
publish(
event,
ev ->
builder()
.withData(
cloudEventData(
new WorkflowStartedCEData(id(ev), ref(ev), ev.eventDate()), this::convert))
.withType(WORKFLOW_STARTED)
.build());
factory.build(
builder()
.withData(cloudEventData(factory.build(event), this::convert))
.withType(WORKFLOW_STARTED)));
}

@Override
public void onWorkflowSuspended(WorkflowSuspendedEvent event) {
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
publish(
event,
ev ->
builder()
.withData(
cloudEventData(
new WorkflowSuspendedCEData(id(ev), ref(ev), ev.eventDate()),
this::convert))
.withType(WORKFLOW_SUSPENDED)
.build());
factory.build(
builder()
.withData(cloudEventData(factory.build(event), this::convert))
.withType(WORKFLOW_SUSPENDED)));
}

@Override
public void onWorkflowCancelled(WorkflowCancelledEvent event) {
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
publish(
event,
ev ->
builder()
.withData(
cloudEventData(
new WorkflowCancelledCEData(id(ev), ref(ev), ev.eventDate()),
this::convert))
.withType(WORKFLOW_CANCELLED)
.build());
factory.build(
builder()
.withData(cloudEventData(factory.build(event), this::convert))
.withType(WORKFLOW_CANCELLED)));
}

@Override
public void onWorkflowResumed(WorkflowResumedEvent event) {
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
publish(
event,
ev ->
builder()
.withData(
cloudEventData(
new WorkflowResumedCEData(id(ev), ref(ev), ev.eventDate()), this::convert))
.withType(WORKFLOW_RESUMED)
.build());
factory.build(
builder()
.withData(cloudEventData(factory.build(event), this::convert))
.withType(WORKFLOW_RESUMED)));
}

@Override
public void onWorkflowCompleted(WorkflowCompletedEvent event) {
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
publish(
event,
ev ->
builder()
.withData(
cloudEventData(
new WorkflowCompletedCEData(
id(ev), ref(ev), ev.eventDate(), from(event.output())),
this::convert))
.withType(WORKFLOW_COMPLETED)
.build());
factory.build(
builder()
.withData(cloudEventData(factory.build(event), this::convert))
.withType(WORKFLOW_COMPLETED)));
}

@Override
public void onWorkflowFailed(WorkflowFailedEvent event) {
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
publish(
event,
ev ->
builder()
.withData(
cloudEventData(
new WorkflowFailedCEData(id(ev), ref(ev), ev.eventDate(), error(ev)),
this::convert))
.withType(WORKFLOW_FAULTED)
.build());
factory.build(
builder()
.withData(cloudEventData(factory.build(event), this::convert))
.withType(WORKFLOW_FAULTED)));
}

@Override
public void onWorkflowStatusChanged(WorkflowStatusEvent event) {
if (appl(event).isStatusChangePublishingEnabled()) {
WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event);
publish(
event,
ev ->
builder()
.withData(
cloudEventData(
new WorkflowStatusCEDataEvent(
id(ev), ref(ev), ev.eventDate(), ev.status().toString()),
this::convert))
.withType(WORKFLOW_STATUS_CHANGED)
.build());
factory.build(
builder()
.withData(cloudEventData(factory.build(event), this::convert))
.withType(WORKFLOW_STATUS_CHANGED)));
}
}

Expand Down Expand Up @@ -367,20 +338,4 @@ private static CloudEventBuilder builder() {
private static WorkflowApplication appl(WorkflowEvent ev) {
return ev.workflowContext().definition().application();
}

private static String id(WorkflowEvent ev) {
return ev.workflowContext().instanceData().id();
}

private static String pos(TaskEvent ev) {
return ev.taskContext().position().jsonPointer();
}

private static Object output(TaskEvent ev) {
return from(ev.taskContext().output());
}

private static Object from(WorkflowModel model) {
return model.asJavaObject();
}
}
Loading
Loading