diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index c4d4f11c3..c70134cd3 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -39,6 +39,8 @@ import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionCompletableListener; import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListenerAdapter; +import io.serverlessworkflow.impl.lifecycle.ce.DefaultLifeCycleCloudEventFactory; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowLifeCycleCloudEventFactory; import io.serverlessworkflow.impl.resources.DefaultResourceLoaderFactory; import io.serverlessworkflow.impl.resources.ExternalResourceHandler; import io.serverlessworkflow.impl.resources.ResourceLoaderFactory; @@ -96,6 +98,7 @@ public class WorkflowApplication implements AutoCloseable { private final Collection callableProxyBuilders; private final CloudEventPredicateFactory cloudEventPredicateFactory; private final AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory; + private final WorkflowLifeCycleCloudEventFactory lifeCycleCloudEventFactory; private WorkflowApplication(Builder builder) { this.taskFactory = builder.taskFactory; @@ -126,6 +129,7 @@ private WorkflowApplication(Builder builder) { this.callableProxyBuilders = builder.callableProxyBuilders; this.cloudEventPredicateFactory = builder.cloudEventPredicateFactory; this.allStrategyCorrelationInfoFactory = builder.allStrategyCorrelationInfoFactory; + this.lifeCycleCloudEventFactory = builder.lifeCycleCloudEventFactory; } public TaskExecutorFactory taskFactory() { @@ -245,6 +249,7 @@ public SchemaValidator getValidator(SchemaInline inline) { private URI defaultCatalogURI; private CloudEventPredicateFactory cloudEventPredicateFactory; private AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory; + private WorkflowLifeCycleCloudEventFactory lifeCycleCloudEventFactory; private Builder() { ServiceLoader.load(NamedWorkflowAdditionalObject.class) @@ -383,6 +388,12 @@ public Builder withAllStrategyCorrelationInfoFactory( return this; } + public Builder withLifeCycleCloudEventFactory( + WorkflowLifeCycleCloudEventFactory lifeCycleCloudEventFactory) { + this.lifeCycleCloudEventFactory = lifeCycleCloudEventFactory; + return this; + } + public WorkflowApplication build() { if (modelFactory == null) { @@ -443,6 +454,9 @@ public WorkflowApplication build() { loadFirst(CloudEventPredicateFactory.class) .orElseGet(() -> new DefaultCloudEventPredicateFactory()); } + if (lifeCycleCloudEventFactory == null) { + lifeCycleCloudEventFactory = new DefaultLifeCycleCloudEventFactory(); + } if (allStrategyCorrelationInfoFactory == null) { allStrategyCorrelationInfoFactory = definition -> InMemoryAllStrategyCorrelationInfo.instance(); @@ -579,4 +593,8 @@ public Collection callableProxyBuilders() { public AllStrategyCorrelationInfoFactory allStrategyCorrelationInfoFactory() { return allStrategyCorrelationInfoFactory; } + + public WorkflowLifeCycleCloudEventFactory lifeCycleCloudEventFactory() { + return lifeCycleCloudEventFactory; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java index 05b776831..2fbb44e51 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/AbstractLifeCyclePublisher.java @@ -29,8 +29,6 @@ import static io.serverlessworkflow.impl.LifecycleEvents.WORKFLOW_STARTED; import static io.serverlessworkflow.impl.LifecycleEvents.WORKFLOW_STATUS_CHANGED; import static io.serverlessworkflow.impl.LifecycleEvents.WORKFLOW_SUSPENDED; -import static io.serverlessworkflow.impl.WorkflowError.error; -import static io.serverlessworkflow.impl.lifecycle.ce.WorkflowDefinitionCEData.ref; import io.cloudevents.CloudEvent; import io.cloudevents.CloudEventData; @@ -38,11 +36,9 @@ import io.cloudevents.core.data.PojoCloudEventData; import io.cloudevents.core.data.PojoCloudEventData.ToBytes; import io.serverlessworkflow.impl.WorkflowApplication; -import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.events.CloudEventUtils; import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent; import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; -import io.serverlessworkflow.impl.lifecycle.TaskEvent; import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; import io.serverlessworkflow.impl.lifecycle.TaskResumedEvent; import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; @@ -82,202 +78,177 @@ public static Collection getLifeCycleTypes() { WORKFLOW_STATUS_CHANGED); } + private WorkflowLifeCycleCloudEventFactory lifeCycleFactory(WorkflowEvent ev) { + return ev.workflowContext().definition().application().lifeCycleCloudEventFactory(); + } + @Override public void onTaskStarted(TaskStartedEvent event) { + WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event); publish( event, ev -> - builder() - .withData( - cloudEventData( - new TaskStartedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), - this::convert)) - .withType(TASK_STARTED) - .build()); + factory.build( + builder() + .withData(cloudEventData(factory.build(event), this::convert)) + .withType(TASK_STARTED))); } @Override public void onTaskRetried(TaskRetriedEvent event) { + WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event); publish( event, ev -> - builder() - .withData( - cloudEventData( - new TaskRetriedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), - this::convert)) - .withType(TASK_STARTED) - .build()); + factory.build( + builder() + .withData(cloudEventData(factory.build(event), this::convert)) + .withType(TASK_RETRIED))); } @Override public void onTaskCompleted(TaskCompletedEvent event) { + WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event); publish( event, ev -> - builder() - .withData( - cloudEventData( - new TaskCompletedCEData( - id(ev), pos(ev), ref(ev), ev.eventDate(), output(ev)), - this::convert)) - .withType(TASK_COMPLETED) - .build()); + factory.build( + builder() + .withData(cloudEventData(factory.build(event), this::convert)) + .withType(TASK_COMPLETED))); } @Override public void onTaskSuspended(TaskSuspendedEvent event) { + WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event); publish( event, ev -> - builder() - .withData( - cloudEventData( - new TaskSuspendedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), - this::convert)) - .withType(TASK_SUSPENDED) - .build()); + factory.build( + builder() + .withData(cloudEventData(factory.build(event), this::convert)) + .withType(TASK_SUSPENDED))); } @Override public void onTaskResumed(TaskResumedEvent event) { + WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event); publish( event, ev -> - builder() - .withData( - cloudEventData( - new TaskResumedCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), - this::convert)) - .withType(TASK_RESUMED) - .build()); + factory.build( + builder() + .withData(cloudEventData(factory.build(event), this::convert)) + .withType(TASK_RESUMED))); } @Override public void onTaskCancelled(TaskCancelledEvent event) { + WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event); publish( event, ev -> - builder() - .withData( - cloudEventData( - new TaskCancelledCEData(id(ev), pos(ev), ref(ev), ev.eventDate()), - this::convert)) - .withType(TASK_CANCELLED) - .build()); + factory.build( + builder() + .withData(cloudEventData(factory.build(event), this::convert)) + .withType(TASK_CANCELLED))); } @Override public void onTaskFailed(TaskFailedEvent event) { + WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event); publish( event, ev -> - builder() - .withData( - cloudEventData( - new TaskFailedCEData(id(ev), pos(ev), ref(ev), ev.eventDate(), error(ev)), - this::convert)) - .withType(TASK_FAULTED) - .build()); + factory.build( + builder() + .withData(cloudEventData(factory.build(event), this::convert)) + .withType(TASK_FAULTED))); } @Override public void onWorkflowStarted(WorkflowStartedEvent event) { + WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event); publish( event, ev -> - builder() - .withData( - cloudEventData( - new WorkflowStartedCEData(id(ev), ref(ev), ev.eventDate()), this::convert)) - .withType(WORKFLOW_STARTED) - .build()); + factory.build( + builder() + .withData(cloudEventData(factory.build(event), this::convert)) + .withType(WORKFLOW_STARTED))); } @Override public void onWorkflowSuspended(WorkflowSuspendedEvent event) { + WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event); publish( event, ev -> - builder() - .withData( - cloudEventData( - new WorkflowSuspendedCEData(id(ev), ref(ev), ev.eventDate()), - this::convert)) - .withType(WORKFLOW_SUSPENDED) - .build()); + factory.build( + builder() + .withData(cloudEventData(factory.build(event), this::convert)) + .withType(WORKFLOW_SUSPENDED))); } @Override public void onWorkflowCancelled(WorkflowCancelledEvent event) { + WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event); publish( event, ev -> - builder() - .withData( - cloudEventData( - new WorkflowCancelledCEData(id(ev), ref(ev), ev.eventDate()), - this::convert)) - .withType(WORKFLOW_CANCELLED) - .build()); + factory.build( + builder() + .withData(cloudEventData(factory.build(event), this::convert)) + .withType(WORKFLOW_CANCELLED))); } @Override public void onWorkflowResumed(WorkflowResumedEvent event) { + WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event); publish( event, ev -> - builder() - .withData( - cloudEventData( - new WorkflowResumedCEData(id(ev), ref(ev), ev.eventDate()), this::convert)) - .withType(WORKFLOW_RESUMED) - .build()); + factory.build( + builder() + .withData(cloudEventData(factory.build(event), this::convert)) + .withType(WORKFLOW_RESUMED))); } @Override public void onWorkflowCompleted(WorkflowCompletedEvent event) { + WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event); publish( event, ev -> - builder() - .withData( - cloudEventData( - new WorkflowCompletedCEData( - id(ev), ref(ev), ev.eventDate(), from(event.output())), - this::convert)) - .withType(WORKFLOW_COMPLETED) - .build()); + factory.build( + builder() + .withData(cloudEventData(factory.build(event), this::convert)) + .withType(WORKFLOW_COMPLETED))); } @Override public void onWorkflowFailed(WorkflowFailedEvent event) { + WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event); publish( event, ev -> - builder() - .withData( - cloudEventData( - new WorkflowFailedCEData(id(ev), ref(ev), ev.eventDate(), error(ev)), - this::convert)) - .withType(WORKFLOW_FAULTED) - .build()); + factory.build( + builder() + .withData(cloudEventData(factory.build(event), this::convert)) + .withType(WORKFLOW_FAULTED))); } @Override public void onWorkflowStatusChanged(WorkflowStatusEvent event) { if (appl(event).isStatusChangePublishingEnabled()) { + WorkflowLifeCycleCloudEventFactory factory = lifeCycleFactory(event); publish( event, ev -> - builder() - .withData( - cloudEventData( - new WorkflowStatusCEDataEvent( - id(ev), ref(ev), ev.eventDate(), ev.status().toString()), - this::convert)) - .withType(WORKFLOW_STATUS_CHANGED) - .build()); + factory.build( + builder() + .withData(cloudEventData(factory.build(event), this::convert)) + .withType(WORKFLOW_STATUS_CHANGED))); } } @@ -367,20 +338,4 @@ private static CloudEventBuilder builder() { private static WorkflowApplication appl(WorkflowEvent ev) { return ev.workflowContext().definition().application(); } - - private static String id(WorkflowEvent ev) { - return ev.workflowContext().instanceData().id(); - } - - private static String pos(TaskEvent ev) { - return ev.taskContext().position().jsonPointer(); - } - - private static Object output(TaskEvent ev) { - return from(ev.taskContext().output()); - } - - private static Object from(WorkflowModel model) { - return model.asJavaObject(); - } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/DefaultLifeCycleCloudEventFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/DefaultLifeCycleCloudEventFactory.java new file mode 100644 index 000000000..e2e3ce184 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/DefaultLifeCycleCloudEventFactory.java @@ -0,0 +1,111 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent; +import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskResumedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskSuspendedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; + +public class DefaultLifeCycleCloudEventFactory implements WorkflowLifeCycleCloudEventFactory { + + @Override + public CloudEvent build(CloudEventBuilder builder) { + return builder.build(); + } + + @Override + public TaskCompletedCEData build(TaskCompletedEvent ev) { + return new TaskCompletedCEData(ev); + } + + @Override + public TaskFailedCEData build(TaskFailedEvent ev) { + return new TaskFailedCEData(ev); + } + + @Override + public TaskCancelledCEData build(TaskCancelledEvent ev) { + return new TaskCancelledCEData(ev); + } + + @Override + public TaskResumedCEData build(TaskResumedEvent ev) { + return new TaskResumedCEData(ev); + } + + @Override + public TaskRetriedCEData build(TaskRetriedEvent ev) { + return new TaskRetriedCEData(ev); + } + + @Override + public TaskStartedCEData build(TaskStartedEvent ev) { + return new TaskStartedCEData(ev); + } + + @Override + public TaskSuspendedCEData build(TaskSuspendedEvent ev) { + return new TaskSuspendedCEData(ev); + } + + @Override + public WorkflowCancelledCEData build(WorkflowCancelledEvent ev) { + return new WorkflowCancelledCEData(ev); + } + + @Override + public WorkflowFailedCEData build(WorkflowFailedEvent ev) { + return new WorkflowFailedCEData(ev); + } + + @Override + public WorkflowResumedCEData build(WorkflowResumedEvent ev) { + return new WorkflowResumedCEData(ev); + } + + @Override + public WorkflowStartedCEData build(WorkflowStartedEvent ev) { + return new WorkflowStartedCEData(ev); + } + + @Override + public WorkflowStatusCEDataEvent build(WorkflowStatusEvent ev) { + return new WorkflowStatusCEDataEvent(ev); + } + + @Override + public WorkflowSuspendedCEData build(WorkflowSuspendedEvent ev) { + return new WorkflowSuspendedCEData(ev); + } + + @Override + public WorkflowCompletedCEData build(WorkflowCompletedEvent ev) { + return new WorkflowCompletedCEData(ev); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/InputOutputLifeCycleCloudEventFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/InputOutputLifeCycleCloudEventFactory.java new file mode 100644 index 000000000..dbba242aa --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/InputOutputLifeCycleCloudEventFactory.java @@ -0,0 +1,44 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; + +public class InputOutputLifeCycleCloudEventFactory extends DefaultLifeCycleCloudEventFactory { + + @Override + public WorkflowCompletedCEData build(WorkflowCompletedEvent ev) { + return new WorkflowCompletedCEDataWithOutput(ev); + } + + @Override + public WorkflowStartedCEData build(WorkflowStartedEvent ev) { + return new WorkflowStartedCEDataWithInput(ev); + } + + @Override + public TaskCompletedCEData build(TaskCompletedEvent ev) { + return new TaskCompletedCEDataWithOutput(ev); + } + + @Override + public TaskStartedCEData build(TaskStartedEvent ev) { + return new TaskStartedCEDataWithInput(ev); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCEData.java new file mode 100644 index 000000000..9af7499ad --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCEData.java @@ -0,0 +1,85 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import static io.serverlessworkflow.impl.lifecycle.ce.WorkflowCEData.id; +import static io.serverlessworkflow.impl.lifecycle.ce.WorkflowDefinitionCEData.ref; + +import io.serverlessworkflow.impl.lifecycle.TaskEvent; +import java.util.Objects; + +public class TaskCEData { + + private String workflow; + private String task; + private WorkflowDefinitionCEData definition; + + protected TaskCEData(TaskEvent ev) { + this(id(ev), pos(ev), ref(ev)); + } + + protected TaskCEData() {} + + protected TaskCEData(String workflow, String task, WorkflowDefinitionCEData definition) { + this.workflow = workflow; + this.task = task; + this.definition = definition; + } + + public String getWorkflow() { + return workflow; + } + + public String getTask() { + return task; + } + + public WorkflowDefinitionCEData getDefinition() { + return definition; + } + + public String workflow() { + return workflow; + } + + public String task() { + return task; + } + + public WorkflowDefinitionCEData definition() { + return definition; + } + + protected static String pos(TaskEvent ev) { + return ev.taskContext().position().jsonPointer(); + } + + @Override + public int hashCode() { + return Objects.hash(definition, task, workflow); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + TaskCEData other = (TaskCEData) obj; + return Objects.equals(definition, other.definition) + && Objects.equals(task, other.task) + && Objects.equals(workflow, other.workflow); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCancelledCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCancelledCEData.java index efa63c164..da5d4eae8 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCancelledCEData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCancelledCEData.java @@ -15,10 +15,62 @@ */ package io.serverlessworkflow.impl.lifecycle.ce; +import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent; import java.time.OffsetDateTime; +import java.util.Objects; -public record TaskCancelledCEData( - String workflow, - String task, - WorkflowDefinitionCEData definition, - OffsetDateTime cancelledAt) {} +public class TaskCancelledCEData extends TaskCEData { + private OffsetDateTime cancelledAt; + + public TaskCancelledCEData(TaskCancelledEvent ev) { + super(ev); + this.cancelledAt = ev.eventDate(); + } + + public TaskCancelledCEData() {} + + public TaskCancelledCEData( + String workflow, String task, WorkflowDefinitionCEData definition, OffsetDateTime time) { + super(workflow, task, definition); + this.cancelledAt = time; + } + + public OffsetDateTime cancelledAt() { + return cancelledAt; + } + + public OffsetDateTime getCancelledAt() { + return cancelledAt; + } + + @Override + public String toString() { + return "TaskCancelledCEData [cancelledAt=" + + cancelledAt + + ", getWorkflow()=" + + workflow() + + ", getTask()=" + + task() + + ", getDefinition()=" + + definition() + + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(cancelledAt); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + TaskCancelledCEData other = (TaskCancelledCEData) obj; + return Objects.equals(cancelledAt, other.cancelledAt); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCompletedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCompletedCEData.java index 1480aa846..1251df49c 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCompletedCEData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCompletedCEData.java @@ -15,11 +15,63 @@ */ package io.serverlessworkflow.impl.lifecycle.ce; +import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; import java.time.OffsetDateTime; +import java.util.Objects; -public record TaskCompletedCEData( - String workflow, - String task, - WorkflowDefinitionCEData definition, - OffsetDateTime completedAt, - Object output) {} +public class TaskCompletedCEData extends TaskCEData { + + private OffsetDateTime completedAt; + + public TaskCompletedCEData(TaskCompletedEvent ev) { + super(ev); + this.completedAt = ev.eventDate(); + } + + public TaskCompletedCEData() {} + + public TaskCompletedCEData( + String workflow, String task, WorkflowDefinitionCEData definition, OffsetDateTime time) { + super(workflow, task, definition); + this.completedAt = time; + } + + public OffsetDateTime completedAt() { + return completedAt; + } + + public OffsetDateTime getCompletedAt() { + return completedAt; + } + + @Override + public String toString() { + return "TaskCompletedCEData [completedAt=" + + completedAt + + ", getWorkflow()=" + + workflow() + + ", getTask()=" + + task() + + ", getDefinition()=" + + definition() + + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(completedAt); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + TaskCompletedCEData other = (TaskCompletedCEData) obj; + return Objects.equals(completedAt, other.completedAt); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCompletedCEDataWithOutput.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCompletedCEDataWithOutput.java new file mode 100644 index 000000000..5ffd05fec --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskCompletedCEDataWithOutput.java @@ -0,0 +1,57 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; +import java.util.Objects; + +public class TaskCompletedCEDataWithOutput extends TaskCompletedCEData { + + private Object output; + + public TaskCompletedCEDataWithOutput(TaskCompletedEvent ev) { + super(ev); + this.output = ev.taskContext().output().asJavaObject(); + } + + public TaskCompletedCEDataWithOutput() {} + + public Object output() { + return output; + } + + public Object getOutput() { + return output; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(output); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + TaskCompletedCEDataWithOutput other = (TaskCompletedCEDataWithOutput) obj; + return Objects.equals(output, other.output); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskFailedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskFailedCEData.java index eadaaf319..688fe7f85 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskFailedCEData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskFailedCEData.java @@ -16,11 +16,79 @@ package io.serverlessworkflow.impl.lifecycle.ce; import io.serverlessworkflow.impl.WorkflowError; +import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; import java.time.OffsetDateTime; +import java.util.Objects; -public record TaskFailedCEData( - String workflow, - String task, - WorkflowDefinitionCEData definition, - OffsetDateTime faultedAt, - WorkflowError error) {} +public class TaskFailedCEData extends TaskCEData { + private OffsetDateTime faultedAt; + private WorkflowError error; + + public TaskFailedCEData(TaskFailedEvent ev) { + super(ev); + this.faultedAt = ev.eventDate(); + this.error = WorkflowError.error(ev); + } + + public TaskFailedCEData( + String workflow, + String task, + WorkflowDefinitionCEData definition, + OffsetDateTime faultedAt, + WorkflowError error) { + super(workflow, task, definition); + this.faultedAt = faultedAt; + this.error = error; + } + + public TaskFailedCEData() {} + + public OffsetDateTime faultedAt() { + return faultedAt; + } + + public WorkflowError error() { + return error; + } + + public OffsetDateTime getFaultedAt() { + return faultedAt; + } + + public WorkflowError getError() { + return error; + } + + @Override + public String toString() { + return "TaskFailedCEData [faultedAt=" + + faultedAt + + ", error=" + + error + + ", getWorkflow()=" + + workflow() + + ", getTask()=" + + task() + + ", getDefinition()=" + + definition() + + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(error, faultedAt); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + TaskFailedCEData other = (TaskFailedCEData) obj; + return Objects.equals(error, other.error) && Objects.equals(faultedAt, other.faultedAt); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskResumedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskResumedCEData.java index eef3606bc..73fcf059f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskResumedCEData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskResumedCEData.java @@ -15,7 +15,62 @@ */ package io.serverlessworkflow.impl.lifecycle.ce; +import io.serverlessworkflow.impl.lifecycle.TaskResumedEvent; import java.time.OffsetDateTime; +import java.util.Objects; -public record TaskResumedCEData( - String workflow, String task, WorkflowDefinitionCEData definition, OffsetDateTime resumedAt) {} +public class TaskResumedCEData extends TaskCEData { + private OffsetDateTime resumedAt; + + public TaskResumedCEData(TaskResumedEvent ev) { + super(ev); + this.resumedAt = ev.eventDate(); + } + + public TaskResumedCEData() {} + + public TaskResumedCEData( + String workflow, String task, WorkflowDefinitionCEData definition, OffsetDateTime time) { + super(workflow, task, definition); + this.resumedAt = time; + } + + public OffsetDateTime resumedAt() { + return resumedAt; + } + + public OffsetDateTime getResumedAt() { + return resumedAt; + } + + @Override + public String toString() { + return "TaskResumedCEData [resumedAt=" + + resumedAt + + ", getWorkflow()=" + + workflow() + + ", getTask()=" + + task() + + ", getDefinition()=" + + definition() + + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(resumedAt); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + TaskResumedCEData other = (TaskResumedCEData) obj; + return Objects.equals(resumedAt, other.resumedAt); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskRetriedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskRetriedCEData.java index 230d161ea..770ef4f5d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskRetriedCEData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskRetriedCEData.java @@ -15,7 +15,62 @@ */ package io.serverlessworkflow.impl.lifecycle.ce; +import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; import java.time.OffsetDateTime; +import java.util.Objects; -public record TaskRetriedCEData( - String workflow, String task, WorkflowDefinitionCEData definition, OffsetDateTime retriedAt) {} +public class TaskRetriedCEData extends TaskCEData { + private OffsetDateTime retriedAt; + + public TaskRetriedCEData(TaskRetriedEvent ev) { + super(ev); + this.retriedAt = ev.eventDate(); + } + + public TaskRetriedCEData() {} + + public TaskRetriedCEData( + String workflow, String task, WorkflowDefinitionCEData definition, OffsetDateTime time) { + super(workflow, task, definition); + this.retriedAt = time; + } + + public OffsetDateTime retriedAt() { + return retriedAt; + } + + public OffsetDateTime getRetriedAt() { + return retriedAt; + } + + @Override + public String toString() { + return "TaskRetriedCEData [retriedAt=" + + retriedAt + + ", getWorkflow()=" + + workflow() + + ", getTask()=" + + task() + + ", getDefinition()=" + + definition() + + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(retriedAt); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + TaskRetriedCEData other = (TaskRetriedCEData) obj; + return Objects.equals(retriedAt, other.retriedAt); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskStartedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskStartedCEData.java index b9d659fcb..875f01487 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskStartedCEData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskStartedCEData.java @@ -15,7 +15,62 @@ */ package io.serverlessworkflow.impl.lifecycle.ce; +import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; import java.time.OffsetDateTime; +import java.util.Objects; -public record TaskStartedCEData( - String workflow, String task, WorkflowDefinitionCEData definition, OffsetDateTime startedAt) {} +public class TaskStartedCEData extends TaskCEData { + private OffsetDateTime startedAt; + + public TaskStartedCEData(TaskStartedEvent ev) { + super(ev); + this.startedAt = ev.eventDate(); + } + + public TaskStartedCEData( + String workflow, String task, WorkflowDefinitionCEData definition, OffsetDateTime time) { + super(workflow, task, definition); + this.startedAt = time; + } + + public TaskStartedCEData() {} + + public OffsetDateTime startedAt() { + return startedAt; + } + + public OffsetDateTime getStartedAt() { + return startedAt; + } + + @Override + public String toString() { + return "TaskStartedCEData [startedAt=" + + startedAt + + ", getWorkflow()=" + + workflow() + + ", getTask()=" + + task() + + ", getDefinition()=" + + definition() + + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(startedAt); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + TaskStartedCEData other = (TaskStartedCEData) obj; + return Objects.equals(startedAt, other.startedAt); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskStartedCEDataWithInput.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskStartedCEDataWithInput.java new file mode 100644 index 000000000..15cbb978c --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskStartedCEDataWithInput.java @@ -0,0 +1,56 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; +import java.util.Objects; + +public class TaskStartedCEDataWithInput extends TaskStartedCEData { + private Object input; + + public TaskStartedCEDataWithInput(TaskStartedEvent ev) { + super(ev); + this.input = ev.taskContext().input().asJavaObject(); + } + + public TaskStartedCEDataWithInput() {} + + public Object input() { + return input; + } + + public Object getInput() { + return input; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(input); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + TaskStartedCEDataWithInput other = (TaskStartedCEDataWithInput) obj; + return Objects.equals(input, other.input); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskSuspendedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskSuspendedCEData.java index 4a376073e..13594583c 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskSuspendedCEData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/TaskSuspendedCEData.java @@ -15,10 +15,63 @@ */ package io.serverlessworkflow.impl.lifecycle.ce; +import io.serverlessworkflow.impl.lifecycle.TaskSuspendedEvent; import java.time.OffsetDateTime; +import java.util.Objects; -public record TaskSuspendedCEData( - String workflow, - String task, - WorkflowDefinitionCEData definition, - OffsetDateTime suspendedAt) {} +public class TaskSuspendedCEData extends TaskCEData { + + private OffsetDateTime suspendedAt; + + public TaskSuspendedCEData(TaskSuspendedEvent ev) { + super(ev); + this.suspendedAt = ev.eventDate(); + } + + public TaskSuspendedCEData() {} + + public TaskSuspendedCEData( + String workflow, String task, WorkflowDefinitionCEData definition, OffsetDateTime time) { + super(workflow, task, definition); + this.suspendedAt = time; + } + + public OffsetDateTime suspendedAt() { + return suspendedAt; + } + + public OffsetDateTime getSuspendedAt() { + return suspendedAt; + } + + @Override + public String toString() { + return "TaskSuspendedCEData [suspendedAt=" + + suspendedAt + + ", getWorkflow()=" + + workflow() + + ", getTask()=" + + task() + + ", getDefinition()=" + + definition() + + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(suspendedAt); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + TaskSuspendedCEData other = (TaskSuspendedCEData) obj; + return Objects.equals(suspendedAt, other.suspendedAt); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCEData.java new file mode 100644 index 000000000..ba2fa60d1 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCEData.java @@ -0,0 +1,72 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import static io.serverlessworkflow.impl.lifecycle.ce.WorkflowDefinitionCEData.ref; + +import io.serverlessworkflow.impl.lifecycle.WorkflowEvent; +import java.util.Objects; + +public class WorkflowCEData { + + private String name; + private WorkflowDefinitionCEData definition; + + protected WorkflowCEData(WorkflowEvent ev) { + this(id(ev), ref(ev)); + } + + protected WorkflowCEData(String name, WorkflowDefinitionCEData definition) { + this.name = name; + this.definition = definition; + } + + protected WorkflowCEData() {} + + public String getName() { + return name; + } + + public WorkflowDefinitionCEData getDefinition() { + return definition; + } + + public String name() { + return name; + } + + public WorkflowDefinitionCEData definition() { + return definition; + } + + protected static String id(WorkflowEvent ev) { + return ev.workflowContext().instanceData().id(); + } + + @Override + public int hashCode() { + return Objects.hash(definition, name); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + WorkflowCEData other = (WorkflowCEData) obj; + return Objects.equals(definition, other.definition) && Objects.equals(name, other.name); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCancelledCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCancelledCEData.java index 7eb9bd627..ff94c92fb 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCancelledCEData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCancelledCEData.java @@ -15,7 +15,60 @@ */ package io.serverlessworkflow.impl.lifecycle.ce; +import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent; import java.time.OffsetDateTime; +import java.util.Objects; -public record WorkflowCancelledCEData( - String name, WorkflowDefinitionCEData definition, OffsetDateTime cancelledAt) {} +public class WorkflowCancelledCEData extends WorkflowCEData { + private OffsetDateTime cancelledAt; + + public WorkflowCancelledCEData(WorkflowCancelledEvent ev) { + super(ev); + this.cancelledAt = ev.eventDate(); + } + + public WorkflowCancelledCEData() {} + + public WorkflowCancelledCEData( + String name, WorkflowDefinitionCEData definition, OffsetDateTime time) { + super(name, definition); + this.cancelledAt = time; + } + + public OffsetDateTime cancelledAt() { + return cancelledAt; + } + + public OffsetDateTime getCancelledAt() { + return cancelledAt; + } + + @Override + public String toString() { + return "WorkflowCancelledCEData [cancelledAt=" + + cancelledAt + + ", getName()=" + + name() + + ", getDefinition()=" + + definition() + + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(cancelledAt); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + WorkflowCancelledCEData other = (WorkflowCancelledCEData) obj; + return Objects.equals(cancelledAt, other.cancelledAt); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCompletedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCompletedCEData.java index f025f7a7b..8b8b740f6 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCompletedCEData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCompletedCEData.java @@ -15,7 +15,60 @@ */ package io.serverlessworkflow.impl.lifecycle.ce; +import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; import java.time.OffsetDateTime; +import java.util.Objects; -public record WorkflowCompletedCEData( - String name, WorkflowDefinitionCEData definition, OffsetDateTime completedAt, Object output) {} +public class WorkflowCompletedCEData extends WorkflowCEData { + public WorkflowCompletedCEData(WorkflowCompletedEvent ev) { + super(ev); + this.completedAt = ev.eventDate(); + } + + public WorkflowCompletedCEData( + String name, WorkflowDefinitionCEData definition, OffsetDateTime time) { + super(name, definition); + this.completedAt = time; + } + + public WorkflowCompletedCEData() {} + + private OffsetDateTime completedAt; + + public OffsetDateTime completedAt() { + return completedAt; + } + + public OffsetDateTime getCompletedAt() { + return completedAt; + } + + @Override + public String toString() { + return "WorkflowCompletedCEData [completedAt=" + + completedAt + + ", getName()=" + + name() + + ", getDefinition()=" + + definition() + + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(completedAt); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + WorkflowCompletedCEData other = (WorkflowCompletedCEData) obj; + return Objects.equals(completedAt, other.completedAt); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCompletedCEDataWithOutput.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCompletedCEDataWithOutput.java new file mode 100644 index 000000000..20040d747 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowCompletedCEDataWithOutput.java @@ -0,0 +1,57 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; +import java.util.Objects; + +public class WorkflowCompletedCEDataWithOutput extends WorkflowCompletedCEData { + + private Object output; + + public Object output() { + return output; + } + + public Object getOutput() { + return output; + } + + public WorkflowCompletedCEDataWithOutput(WorkflowCompletedEvent ev) { + super(ev); + this.output = ev.output().asJavaObject(); + } + + public WorkflowCompletedCEDataWithOutput() {} + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(output); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + WorkflowCompletedCEDataWithOutput other = (WorkflowCompletedCEDataWithOutput) obj; + return Objects.equals(output, other.output); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowDefinitionCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowDefinitionCEData.java index ffdbce9db..62e7e3dfc 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowDefinitionCEData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowDefinitionCEData.java @@ -15,14 +15,13 @@ */ package io.serverlessworkflow.impl.lifecycle.ce; -import io.serverlessworkflow.api.types.Document; +import io.serverlessworkflow.impl.WorkflowDefinitionId; import io.serverlessworkflow.impl.lifecycle.WorkflowEvent; public record WorkflowDefinitionCEData(String namespace, String name, String version) { public static WorkflowDefinitionCEData ref(WorkflowEvent ev) { - Document document = ev.workflowContext().definition().workflow().getDocument(); - return new WorkflowDefinitionCEData( - document.getNamespace(), document.getName(), document.getVersion()); + WorkflowDefinitionId id = ev.workflowContext().definition().id(); + return new WorkflowDefinitionCEData(id.namespace(), id.name(), id.version()); } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowFailedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowFailedCEData.java index d53e9e561..1efc05c3f 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowFailedCEData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowFailedCEData.java @@ -16,10 +16,74 @@ package io.serverlessworkflow.impl.lifecycle.ce; import io.serverlessworkflow.impl.WorkflowError; +import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; import java.time.OffsetDateTime; +import java.util.Objects; -public record WorkflowFailedCEData( - String name, - WorkflowDefinitionCEData definition, - OffsetDateTime faultedAt, - WorkflowError error) {} +public class WorkflowFailedCEData extends WorkflowCEData { + + private OffsetDateTime faultedAt; + private WorkflowError error; + + public WorkflowFailedCEData(WorkflowFailedEvent ev) { + super(ev); + this.faultedAt = ev.eventDate(); + this.error = WorkflowError.error(ev); + } + + public WorkflowFailedCEData( + String name, WorkflowDefinitionCEData definition, OffsetDateTime time, WorkflowError error) { + super(name, definition); + this.faultedAt = time; + this.error = error; + } + + public WorkflowFailedCEData() {} + + public OffsetDateTime faultedAt() { + return faultedAt; + } + + public WorkflowError error() { + return error; + } + + public OffsetDateTime getFaultedAt() { + return faultedAt; + } + + public WorkflowError getError() { + return error; + } + + @Override + public String toString() { + return "WorkflowFailedCEData [faultedAt=" + + faultedAt + + ", error=" + + error + + ", getName()=" + + name() + + ", getDefinition()=" + + definition() + + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(error, faultedAt); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + WorkflowFailedCEData other = (WorkflowFailedCEData) obj; + return Objects.equals(error, other.error) && Objects.equals(faultedAt, other.faultedAt); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowLifeCycleCloudEventFactory.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowLifeCycleCloudEventFactory.java new file mode 100644 index 000000000..32d074022 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowLifeCycleCloudEventFactory.java @@ -0,0 +1,66 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent; +import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskResumedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskSuspendedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; + +public interface WorkflowLifeCycleCloudEventFactory { + + CloudEvent build(CloudEventBuilder builder); + + TaskCompletedCEData build(TaskCompletedEvent ev); + + TaskFailedCEData build(TaskFailedEvent ev); + + TaskCancelledCEData build(TaskCancelledEvent ev); + + TaskResumedCEData build(TaskResumedEvent ev); + + TaskRetriedCEData build(TaskRetriedEvent ev); + + TaskStartedCEData build(TaskStartedEvent ev); + + TaskSuspendedCEData build(TaskSuspendedEvent ev); + + WorkflowCancelledCEData build(WorkflowCancelledEvent ev); + + WorkflowFailedCEData build(WorkflowFailedEvent ev); + + WorkflowResumedCEData build(WorkflowResumedEvent ev); + + WorkflowStartedCEData build(WorkflowStartedEvent ev); + + WorkflowStatusCEDataEvent build(WorkflowStatusEvent ev); + + WorkflowSuspendedCEData build(WorkflowSuspendedEvent ev); + + WorkflowCompletedCEData build(WorkflowCompletedEvent event); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowResumedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowResumedCEData.java index eb040d06a..cab8d0308 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowResumedCEData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowResumedCEData.java @@ -15,7 +15,60 @@ */ package io.serverlessworkflow.impl.lifecycle.ce; +import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent; import java.time.OffsetDateTime; +import java.util.Objects; -public record WorkflowResumedCEData( - String name, WorkflowDefinitionCEData definition, OffsetDateTime resumedAt) {} +public class WorkflowResumedCEData extends WorkflowCEData { + private OffsetDateTime resumedAt; + + public WorkflowResumedCEData(WorkflowResumedEvent ev) { + super(ev); + this.resumedAt = ev.eventDate(); + } + + public WorkflowResumedCEData( + String name, WorkflowDefinitionCEData definition, OffsetDateTime time) { + super(name, definition); + this.resumedAt = time; + } + + public WorkflowResumedCEData() {} + + public OffsetDateTime resumedAt() { + return resumedAt; + } + + public OffsetDateTime getResumedAt() { + return resumedAt; + } + + @Override + public String toString() { + return "WorkflowResumedCEData [resumedAt=" + + resumedAt + + ", getName()=" + + name() + + ", getDefinition()=" + + definition() + + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(resumedAt); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + WorkflowResumedCEData other = (WorkflowResumedCEData) obj; + return Objects.equals(resumedAt, other.resumedAt); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowStartedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowStartedCEData.java index 89ae2cda3..ee8c90c96 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowStartedCEData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowStartedCEData.java @@ -15,7 +15,60 @@ */ package io.serverlessworkflow.impl.lifecycle.ce; +import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; import java.time.OffsetDateTime; +import java.util.Objects; -public record WorkflowStartedCEData( - String name, WorkflowDefinitionCEData definition, OffsetDateTime startedAt) {} +public class WorkflowStartedCEData extends WorkflowCEData { + private OffsetDateTime startedAt; + + public WorkflowStartedCEData(WorkflowStartedEvent ev) { + super(ev); + this.startedAt = ev.eventDate(); + } + + public WorkflowStartedCEData( + String name, WorkflowDefinitionCEData definition, OffsetDateTime time) { + super(name, definition); + this.startedAt = time; + } + + public WorkflowStartedCEData() {} + + public OffsetDateTime startedAt() { + return startedAt; + } + + public OffsetDateTime getStartedAt() { + return startedAt; + } + + @Override + public String toString() { + return "WorkflowStartedCEData [startedAt=" + + startedAt + + ", getName()=" + + name() + + ", getDefinition()=" + + definition() + + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(startedAt); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + WorkflowStartedCEData other = (WorkflowStartedCEData) obj; + return Objects.equals(startedAt, other.startedAt); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowStartedCEDataWithInput.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowStartedCEDataWithInput.java new file mode 100644 index 000000000..5eb228655 --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowStartedCEDataWithInput.java @@ -0,0 +1,57 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.lifecycle.ce; + +import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; +import java.util.Objects; + +public class WorkflowStartedCEDataWithInput extends WorkflowStartedCEData { + + private Object input; + + public Object input() { + return input; + } + + public Object getInput() { + return input; + } + + public WorkflowStartedCEDataWithInput(WorkflowStartedEvent ev) { + super(ev); + this.input = ev.workflowContext().instanceData().input().asJavaObject(); + } + + public WorkflowStartedCEDataWithInput() {} + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(input); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + WorkflowStartedCEDataWithInput other = (WorkflowStartedCEDataWithInput) obj; + return Objects.equals(input, other.input); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowStatusCEDataEvent.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowStatusCEDataEvent.java index e4d295181..f772d086d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowStatusCEDataEvent.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowStatusCEDataEvent.java @@ -15,7 +15,77 @@ */ package io.serverlessworkflow.impl.lifecycle.ce; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent; import java.time.OffsetDateTime; +import java.util.Objects; -public record WorkflowStatusCEDataEvent( - String name, WorkflowDefinitionCEData definition, OffsetDateTime updatedAt, String status) {} +public class WorkflowStatusCEDataEvent extends WorkflowCEData { + private OffsetDateTime updatedAt; + private String status; + + public WorkflowStatusCEDataEvent(WorkflowStatusEvent ev) { + super(ev); + this.updatedAt = ev.eventDate(); + this.status = ev.status().toString(); + } + + public WorkflowStatusCEDataEvent( + String name, + WorkflowDefinitionCEData definition, + OffsetDateTime time, + WorkflowStatus status) { + super(name, definition); + this.updatedAt = time; + this.status = status.toString(); + } + + public WorkflowStatusCEDataEvent() {} + + public OffsetDateTime updatedAt() { + return updatedAt; + } + + public String status() { + return status; + } + + public OffsetDateTime getUpdatedAt() { + return updatedAt; + } + + public String getStatus() { + return status; + } + + @Override + public String toString() { + return "WorkflowStatusCEDataEvent [updatedAt=" + + updatedAt + + ", status=" + + status + + ", getName()=" + + name() + + ", getDefinition()=" + + definition() + + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(status, updatedAt); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + WorkflowStatusCEDataEvent other = (WorkflowStatusCEDataEvent) obj; + return Objects.equals(status, other.status) && Objects.equals(updatedAt, other.updatedAt); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowSuspendedCEData.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowSuspendedCEData.java index 5b0918391..6186666db 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowSuspendedCEData.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/ce/WorkflowSuspendedCEData.java @@ -15,7 +15,60 @@ */ package io.serverlessworkflow.impl.lifecycle.ce; +import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; import java.time.OffsetDateTime; +import java.util.Objects; -public record WorkflowSuspendedCEData( - String name, WorkflowDefinitionCEData definition, OffsetDateTime suspendedAt) {} +public class WorkflowSuspendedCEData extends WorkflowCEData { + private OffsetDateTime suspendedAt; + + public WorkflowSuspendedCEData(WorkflowSuspendedEvent ev) { + super(ev); + this.suspendedAt = ev.eventDate(); + } + + public WorkflowSuspendedCEData( + String name, WorkflowDefinitionCEData definition, OffsetDateTime time) { + super(name, definition); + this.suspendedAt = time; + } + + public WorkflowSuspendedCEData() {} + + public OffsetDateTime suspendedAt() { + return suspendedAt; + } + + public OffsetDateTime getSuspendedAt() { + return suspendedAt; + } + + @Override + public String toString() { + return "WorkflowSuspendedCEData [suspendedAt=" + + suspendedAt + + ", getName()=" + + name() + + ", getDefinition()=" + + definition() + + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects.hash(suspendedAt); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (!super.equals(obj)) return false; + if (getClass() != obj.getClass()) return false; + WorkflowSuspendedCEData other = (WorkflowSuspendedCEData) obj; + return Objects.equals(suspendedAt, other.suspendedAt); + } +} diff --git a/impl/json-utils/pom.xml b/impl/json-utils/pom.xml index 1a3eb3138..c9dfc64f6 100644 --- a/impl/json-utils/pom.xml +++ b/impl/json-utils/pom.xml @@ -20,7 +20,10 @@ io.cloudevents cloudevents-json-jackson - + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + org.junit.jupiter junit-jupiter-engine diff --git a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java index 4b4109661..ce9b518a4 100644 --- a/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java +++ b/impl/json-utils/src/main/java/io/serverlessworkflow/impl/jackson/ObjectMapperFactoryProvider.java @@ -17,7 +17,9 @@ import static io.serverlessworkflow.impl.WorkflowUtils.loadFirst; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; import io.cloudevents.jackson.JsonFormat; import java.util.Objects; import java.util.function.Supplier; @@ -60,7 +62,9 @@ private static class DefaultObjectMapperFactory implements ObjectMapperFactory { this.mapper = new ObjectMapper() .findAndRegisterModules() - .registerModule(JsonFormat.getCloudEventJacksonModule()); + .registerModule(JsonFormat.getCloudEventJacksonModule()) + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) + .disable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE); } @Override diff --git a/impl/lifecycleevent/pom.xml b/impl/lifecycleevent/pom.xml index 82ba06973..dafad537d 100644 --- a/impl/lifecycleevent/pom.xml +++ b/impl/lifecycleevent/pom.xml @@ -16,5 +16,26 @@ io.serverlessworkflow serverlessworkflow-impl-core + + io.serverlessworkflow + serverlessworkflow-impl-model + test + + + org.junit.jupiter + junit-jupiter-engine + + + org.assertj + assertj-core + + + org.mockito + mockito-core + + + org.junit.jupiter + junit-jupiter-params + \ No newline at end of file diff --git a/impl/lifecycleevent/src/test/java/io/serverlessworkflow/impl/jackson/events/JacksonLifeCyclePublisherTest.java b/impl/lifecycleevent/src/test/java/io/serverlessworkflow/impl/jackson/events/JacksonLifeCyclePublisherTest.java new file mode 100644 index 000000000..6f4ccda37 --- /dev/null +++ b/impl/lifecycleevent/src/test/java/io/serverlessworkflow/impl/jackson/events/JacksonLifeCyclePublisherTest.java @@ -0,0 +1,148 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.jackson.events; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.cloudevents.core.data.PojoCloudEventData; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowDefinitionId; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowModelFactory; +import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.jackson.JsonUtils; +import io.serverlessworkflow.impl.lifecycle.TaskCancelledEvent; +import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskResumedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; +import io.serverlessworkflow.impl.lifecycle.TaskSuspendedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowCancelledEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent; +import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; +import io.serverlessworkflow.impl.lifecycle.ce.TaskCancelledCEData; +import io.serverlessworkflow.impl.lifecycle.ce.TaskCompletedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.TaskCompletedCEDataWithOutput; +import io.serverlessworkflow.impl.lifecycle.ce.TaskFailedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.TaskResumedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.TaskRetriedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.TaskStartedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.TaskStartedCEDataWithInput; +import io.serverlessworkflow.impl.lifecycle.ce.TaskSuspendedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCancelledCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCompletedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCompletedCEDataWithOutput; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowFailedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowResumedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowStartedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowStartedCEDataWithInput; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowStatusCEDataEvent; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowSuspendedCEData; +import io.serverlessworkflow.impl.model.jackson.JacksonModelFactory; +import java.io.IOException; +import java.util.Map; +import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class JacksonLifeCyclePublisherTest { + + private static JacksonLifeCyclePublisher publisher; + private static WorkflowContext workflowContext; + private static TaskContext taskContext; + private static WorkflowModelFactory factory; + + @BeforeAll + static void setup() { + publisher = new JacksonLifeCyclePublisher(); + factory = new JacksonModelFactory(); + workflowContext = mock(WorkflowContext.class); + taskContext = mock(TaskContext.class); + WorkflowInstance instanceData = mock(WorkflowInstance.class); + WorkflowDefinition definition = mock(WorkflowDefinition.class); + when(workflowContext.instanceData()).thenReturn(instanceData); + when(instanceData.id()).thenReturn("1"); + when(instanceData.input()).thenReturn(factory.fromAny(Map.of("name", "sensei"))); + when(workflowContext.definition()).thenReturn(definition); + WorkflowPosition position = mock(WorkflowPosition.class); + when(definition.id()).thenReturn(new WorkflowDefinitionId("test", "events", "1_0")); + when(taskContext.position()).thenReturn(position); + when(taskContext.output()).thenReturn(factory.fromAny(Map.of("name", "Fulanito"))); + when(taskContext.input()).thenReturn(factory.fromAny(Map.of("name", "Menganito"))); + when(position.jsonPointer()).thenReturn("do/0/set/javi"); + } + + @ParameterizedTest + @MethodSource("provideParameters") + void testCloudEventSerialization(Object pojo) throws IOException { + PojoCloudEventData source = PojoCloudEventData.wrap(pojo, publisher::convertToBytes); + PojoCloudEventData target = + PojoCloudEventData.wrap( + JsonUtils.mapper().readValue(source.toBytes(), pojo.getClass()), + publisher::convertToBytes); + assertThat(source).isEqualTo(target); + } + + private static Stream provideParameters() { + return Stream.of( + Arguments.of(new TaskCompletedCEData(new TaskCompletedEvent(workflowContext, taskContext))), + Arguments.of( + new TaskCompletedCEDataWithOutput( + new TaskCompletedEvent(workflowContext, taskContext))), + Arguments.of(new TaskStartedCEData(new TaskStartedEvent(workflowContext, taskContext))), + Arguments.of( + new TaskStartedCEDataWithInput(new TaskStartedEvent(workflowContext, taskContext))), + Arguments.of(new TaskCancelledCEData(new TaskCancelledEvent(workflowContext, taskContext))), + Arguments.of(new TaskResumedCEData(new TaskResumedEvent(workflowContext, taskContext))), + Arguments.of(new TaskRetriedCEData(new TaskRetriedEvent(workflowContext, taskContext))), + Arguments.of(new TaskSuspendedCEData(new TaskSuspendedEvent(workflowContext, taskContext))), + Arguments.of( + new TaskFailedCEData( + new TaskFailedEvent( + workflowContext, taskContext, new IllegalArgumentException("NOOOO!!!!")))), + Arguments.of(new WorkflowStartedCEData(new WorkflowStartedEvent(workflowContext))), + Arguments.of(new WorkflowStartedCEDataWithInput(new WorkflowStartedEvent(workflowContext))), + Arguments.of( + new WorkflowCompletedCEData(new WorkflowCompletedEvent(workflowContext, null))), + Arguments.of( + new WorkflowCompletedCEDataWithOutput( + new WorkflowCompletedEvent( + workflowContext, factory.fromAny(Map.of("name", "Javierito"))))), + Arguments.of(new WorkflowCancelledCEData(new WorkflowCancelledEvent(workflowContext))), + Arguments.of( + new WorkflowFailedCEData( + new WorkflowFailedEvent( + workflowContext, new IllegalArgumentException("NOOO!!!!!")))), + Arguments.of(new WorkflowResumedCEData(new WorkflowResumedEvent(workflowContext))), + Arguments.of(new WorkflowSuspendedCEData(new WorkflowSuspendedEvent(workflowContext))), + Arguments.of( + new WorkflowStatusCEDataEvent( + new WorkflowStatusEvent( + workflowContext, WorkflowStatus.RUNNING, WorkflowStatus.WAITING)))); + } +} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java index 058a6c6ca..9a841abc7 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/LifeCycleEventsTest.java @@ -34,11 +34,12 @@ import io.serverlessworkflow.impl.WorkflowStatus; import io.serverlessworkflow.impl.events.InMemoryEvents; import io.serverlessworkflow.impl.lifecycle.ce.AbstractLifeCyclePublisher; +import io.serverlessworkflow.impl.lifecycle.ce.InputOutputLifeCycleCloudEventFactory; import io.serverlessworkflow.impl.lifecycle.ce.TaskCancelledCEData; -import io.serverlessworkflow.impl.lifecycle.ce.TaskCompletedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.TaskCompletedCEDataWithOutput; import io.serverlessworkflow.impl.lifecycle.ce.TaskStartedCEData; import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCancelledCEData; -import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCompletedCEData; +import io.serverlessworkflow.impl.lifecycle.ce.WorkflowCompletedCEDataWithOutput; import io.serverlessworkflow.impl.lifecycle.ce.WorkflowFailedCEData; import io.serverlessworkflow.impl.lifecycle.ce.WorkflowResumedCEData; import io.serverlessworkflow.impl.lifecycle.ce.WorkflowStartedCEData; @@ -77,6 +78,7 @@ static void init() { } appl = WorkflowApplication.builder() + .withLifeCycleCloudEventFactory(new InputOutputLifeCycleCloudEventFactory()) .withEventConsumer(eventBroker) .withEventPublisher(eventBroker) .build(); @@ -107,11 +109,12 @@ void simpleWorkflow() throws IOException { assertPojoInCE("io.serverlessworkflow.workflow.started.v1", WorkflowStartedCEData.class); TaskStartedCEData taskStartedEvent = assertPojoInCE("io.serverlessworkflow.task.started.v1", TaskStartedCEData.class); - TaskCompletedCEData taskCompletedEvent = - assertPojoInCE("io.serverlessworkflow.task.completed.v1", TaskCompletedCEData.class); - WorkflowCompletedCEData workflowCompletedEvent = + TaskCompletedCEDataWithOutput taskCompletedEvent = assertPojoInCE( - "io.serverlessworkflow.workflow.completed.v1", WorkflowCompletedCEData.class); + "io.serverlessworkflow.task.completed.v1", TaskCompletedCEDataWithOutput.class); + WorkflowCompletedCEDataWithOutput workflowCompletedEvent = + assertPojoInCE( + "io.serverlessworkflow.workflow.completed.v1", WorkflowCompletedCEDataWithOutput.class); assertThat(workflowCompletedEvent.output()).isEqualTo(model.asJavaObject()); assertThat(workflowStartedEvent.startedAt()).isBefore(workflowCompletedEvent.completedAt()); assertThat(taskCompletedEvent.output()).isEqualTo(model.asJavaObject());