Skip to content

Commit

Permalink
feat(broker): embedded subprocess
Browse files Browse the repository at this point in the history
- sub process instances have the activity lifecycle, including input and
  output mappings
- worfkflow instance records now reference their scope instance
- adds validation for sub process start events
- does not yet support cancellation when subprocess is active => #1119
- does not test/address payload updates => #1131
  • Loading branch information
ThorbenLindhauer authored and Zelldon committed Aug 8, 2018
1 parent db117d3 commit e54c707
Show file tree
Hide file tree
Showing 23 changed files with 727 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import io.zeebe.model.bpmn.instance.MultiInstanceLoopCharacteristics;
import io.zeebe.model.bpmn.instance.bpmndi.BpmnShape;
import io.zeebe.model.bpmn.instance.dc.Bounds;
import io.zeebe.model.bpmn.instance.zeebe.ZeebeInput;
import io.zeebe.model.bpmn.instance.zeebe.ZeebeIoMapping;
import io.zeebe.model.bpmn.instance.zeebe.ZeebeOutput;
import io.zeebe.model.bpmn.instance.zeebe.ZeebeOutputBehavior;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
Expand Down Expand Up @@ -122,4 +126,29 @@ protected void setBoundaryEventCoordinates(BpmnShape bpmnShape) {
boundaryBounds.setX(x);
boundaryBounds.setY(y);
}

public B zeebeOutputBehavior(ZeebeOutputBehavior outputBehavior) {
final ZeebeIoMapping ioMapping = getCreateSingleExtensionElement(ZeebeIoMapping.class);
ioMapping.setOutputBehavhior(outputBehavior);

return myself;
}

public B zeebeInput(String source, String target) {
final ZeebeIoMapping ioMapping = getCreateSingleExtensionElement(ZeebeIoMapping.class);
final ZeebeInput input = createChild(ioMapping, ZeebeInput.class);
input.setSource(source);
input.setTarget(target);

return myself;
}

public B zeebeOutput(String source, String target) {
final ZeebeIoMapping ioMapping = getCreateSingleExtensionElement(ZeebeIoMapping.class);
final ZeebeOutput input = createChild(ioMapping, ZeebeOutput.class);
input.setSource(source);
input.setTarget(target);

return myself;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,13 @@ public SubProcessBuilder subProcess(String id) {
return createTarget(SubProcess.class, id).builder();
}

public SubProcessBuilder subProcess(String id, Consumer<SubProcessBuilder> consumer) {

final SubProcessBuilder builder = createTarget(SubProcess.class, id).builder();
consumer.accept(builder);
return builder;
}

public TransactionBuilder transaction() {
final Transaction transaction = createTarget(Transaction.class);
return new TransactionBuilder(modelInstance, transaction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.model.bpmn.instance.ServiceTask;
import io.zeebe.model.bpmn.instance.zeebe.ZeebeHeader;
import io.zeebe.model.bpmn.instance.zeebe.ZeebeInput;
import io.zeebe.model.bpmn.instance.zeebe.ZeebeIoMapping;
import io.zeebe.model.bpmn.instance.zeebe.ZeebeOutput;
import io.zeebe.model.bpmn.instance.zeebe.ZeebeOutputBehavior;
import io.zeebe.model.bpmn.instance.zeebe.ZeebeTaskDefinition;
import io.zeebe.model.bpmn.instance.zeebe.ZeebeTaskHeaders;

Expand Down Expand Up @@ -68,29 +64,4 @@ public B zeebeTaskHeader(String key, String value) {

return myself;
}

public B zeebeOutputBehavior(ZeebeOutputBehavior outputBehavior) {
final ZeebeIoMapping ioMapping = getCreateSingleExtensionElement(ZeebeIoMapping.class);
ioMapping.setOutputBehavhior(outputBehavior);

return myself;
}

public B zeebeInput(String source, String target) {
final ZeebeIoMapping ioMapping = getCreateSingleExtensionElement(ZeebeIoMapping.class);
final ZeebeInput input = createChild(ioMapping, ZeebeInput.class);
input.setSource(source);
input.setTarget(target);

return myself;
}

public B zeebeOutput(String source, String target) {
final ZeebeIoMapping ioMapping = getCreateSingleExtensionElement(ZeebeIoMapping.class);
final ZeebeOutput input = createChild(ioMapping, ZeebeOutput.class);
input.setSource(source);
input.setTarget(target);

return myself;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright © 2017 camunda services GmbH (info@camunda.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.zeebe.model.bpmn.validation.zeebe;

import io.zeebe.model.bpmn.instance.StartEvent;
import io.zeebe.model.bpmn.instance.SubProcess;
import java.util.Collection;
import org.camunda.bpm.model.xml.validation.ModelElementValidator;
import org.camunda.bpm.model.xml.validation.ValidationResultCollector;

public class SubProcessValidator implements ModelElementValidator<SubProcess> {

@Override
public Class<SubProcess> getElementType() {
return SubProcess.class;
}

@Override
public void validate(SubProcess element, ValidationResultCollector validationResultCollector) {
final Collection<StartEvent> startEvents = element.getChildElementsByType(StartEvent.class);

if (startEvents.size() != 1) {
validationResultCollector.addError(0, "Must have exactly one start event");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ public class ZeebeDesignTimeValidators {
VALIDATORS.add(new FlowElementValidator());
VALIDATORS.add(new MessageValidator());
VALIDATORS.add(new ProcessValidator());
VALIDATORS.add(new ServiceTaskValidator());
VALIDATORS.add(new SequenceFlowValidator());
VALIDATORS.add(new ServiceTaskValidator());
VALIDATORS.add(new StartEventValidator());
VALIDATORS.add(new SubProcessValidator());
VALIDATORS.add(new ZeebeTaskDefinitionValidator());
VALIDATORS.add(new ZeebeIoMappingValidator());
VALIDATORS.add(new ZeebeSubscriptionValidator());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,23 @@ public static final Object[][] parameters() {
.done(),
Arrays.asList(
expect(TimerEventDefinition.class, "Event definition of this type is not supported"))
},
{
Bpmn.createExecutableProcess("process")
.startEvent()
.subProcess("subProcess")
.embeddedSubProcess()
.startEvent("subProcessStart")
.message(b -> b.name("message").zeebeCorrelationKey("correlationKey"))
.endEvent()
.subProcessDone()
.endEvent()
.done(),
Arrays.asList(expect("subProcessStart", "Must be a none start event"))
},
{
"no-start-event-sub-process.bpmn",
Arrays.asList(expect("subProcess", "Must have exactly one start event"))
}
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" id="Definitions_1" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="1.11.2">
<bpmn:process id="Process_1" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>SequenceFlow_0uj31r2</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="SequenceFlow_0uj31r2" sourceRef="StartEvent_1" targetRef="subProcess" />
<bpmn:subProcess id="subProcess">
<bpmn:incoming>SequenceFlow_0uj31r2</bpmn:incoming>
<bpmn:outgoing>SequenceFlow_0alofse</bpmn:outgoing>
</bpmn:subProcess>
<bpmn:endEvent id="EndEvent_1dmb61w">
<bpmn:incoming>SequenceFlow_0alofse</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="SequenceFlow_0alofse" sourceRef="subProcess" targetRef="EndEvent_1dmb61w" />
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_1">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="173" y="102" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="SequenceFlow_0uj31r2_di" bpmnElement="SequenceFlow_0uj31r2">
<di:waypoint xsi:type="dc:Point" x="209" y="120" />
<di:waypoint xsi:type="dc:Point" x="230" y="120" />
<di:waypoint xsi:type="dc:Point" x="230" y="120" />
<di:waypoint xsi:type="dc:Point" x="258" y="120" />
<bpmndi:BPMNLabel>
<dc:Bounds x="245" y="114" width="0" height="12" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="SubProcess_1cctepv_di" bpmnElement="subProcess" isExpanded="true">
<dc:Bounds x="258" y="45" width="211" height="150" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="EndEvent_1dmb61w_di" bpmnElement="EndEvent_1dmb61w">
<dc:Bounds x="512" y="102" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="530" y="142" width="0" height="12" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="SequenceFlow_0alofse_di" bpmnElement="SequenceFlow_0alofse">
<di:waypoint xsi:type="dc:Point" x="469" y="120" />
<di:waypoint xsi:type="dc:Point" x="512" y="120" />
<bpmndi:BPMNLabel>
<dc:Bounds x="490.5" y="99" width="0" height="12" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,16 @@ public class WorkflowInstanceRecord extends UnpackedObject {

private final DocumentProperty payloadProp = new DocumentProperty(PROP_WORKFLOW_PAYLOAD);

private final LongProperty scopeInstanceKey = new LongProperty("scopeInstanceKey", -1L);

public WorkflowInstanceRecord() {
this.declareProperty(bpmnProcessIdProp)
.declareProperty(versionProp)
.declareProperty(workflowKeyProp)
.declareProperty(workflowInstanceKeyProp)
.declareProperty(activityIdProp)
.declareProperty(payloadProp);
.declareProperty(payloadProp)
.declareProperty(scopeInstanceKey);
}

public DirectBuffer getBpmnProcessId() {
Expand Down Expand Up @@ -98,6 +101,15 @@ public WorkflowInstanceRecord setWorkflowInstanceKey(long workflowInstanceKey) {
return this;
}

public long getScopeInstanceKey() {
return scopeInstanceKey.getValue();
}

public WorkflowInstanceRecord setScopeInstanceKey(long scopeInstanceKey) {
this.scopeInstanceKey.setValue(scopeInstanceKey);
return this;
}

public int getVersion() {
return versionProp.getValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ public class ActivityInstanceMap implements AutoCloseable {
private static final int ID_MAX_LENGTH = 255;

private static final int SIZE_OF_ACTIVITY_ID = ID_MAX_LENGTH * SIZE_OF_CHAR;
private static final int INDEX_VALUE_SIZE = SIZE_OF_LONG + SIZE_OF_INT + SIZE_OF_ACTIVITY_ID;
private static final int INDEX_VALUE_SIZE =
SIZE_OF_LONG + SIZE_OF_INT + SIZE_OF_ACTIVITY_ID + SIZE_OF_LONG;

private static final int JOB_KEY_OFFSET = 0;
private static final int ACTIVITY_ID_LENGTH_OFFSET = JOB_KEY_OFFSET + SIZE_OF_LONG;
private static final int ACTIVITY_ID_OFFSET = ACTIVITY_ID_LENGTH_OFFSET + SIZE_OF_INT;
private static final int SCOPE_INSTANCE_KEY_OFFSET = ACTIVITY_ID_OFFSET + SIZE_OF_ACTIVITY_ID;

private static final ByteOrder BYTE_ORDER = ByteOrder.LITTLE_ENDIAN;

Expand Down Expand Up @@ -95,6 +97,10 @@ public DirectBuffer getActivityId() {
return activityIdBuffer;
}

public long getScopeInstanceKey() {
return buffer.getLong(SCOPE_INSTANCE_KEY_OFFSET, BYTE_ORDER);
}

public ActivityInstanceMap newActivityInstance(long activityInstanceKey) {
key = activityInstanceKey;
isRead = true;
Expand All @@ -113,6 +119,12 @@ public ActivityInstanceMap setActivityId(DirectBuffer activityId) {
return this;
}

public ActivityInstanceMap setScopeInstanceKey(long scopeInstanceKey) {
ensureRead();
buffer.putLong(SCOPE_INSTANCE_KEY_OFFSET, scopeInstanceKey, BYTE_ORDER);
return this;
}

public ActivityInstanceMap setJobKey(long jobKey) {
ensureRead();
buffer.putLong(JOB_KEY_OFFSET, jobKey, BYTE_ORDER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ public enum BpmnStep {

START_ACTIVITY,

TRIGGER_END_EVENT
TRIGGER_END_EVENT,
TRIGGER_START_EVENT
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Zeebe Broker Core
* Copyright © 2017 camunda services GmbH (info@camunda.com)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.zeebe.broker.workflow.model;

public class ExecutableSubProcess extends ExecutableFlowNode {

private ExecutableFlowNode startEvent;

public ExecutableSubProcess(String id) {
super(id);
}

public ExecutableFlowNode getStartEvent() {
return startEvent;
}

public void setStartEvent(ExecutableFlowNode startEvent) {
this.startEvent = startEvent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
/** Executable* prefix in order to avoid confusion with model API classes. */
public class ExecutableWorkflow {

private DirectBuffer id;
private Map<DirectBuffer, ExecutableFlowElement> flowElements = new HashMap<>();
private final DirectBuffer id;
private final Map<DirectBuffer, ExecutableFlowElement> flowElements = new HashMap<>();

private ExecutableFlowNode startEvent;

public ExecutableWorkflow(String id) {
Expand All @@ -45,6 +46,14 @@ public ExecutableFlowElement getElementById(DirectBuffer id) {
return flowElements.get(id);
}

public ExecutableFlowNode getStartEvent() {
return startEvent;
}

public void setStartEvent(ExecutableFlowNode startEvent) {
this.startEvent = startEvent;
}

/** convenience function for transformation */
public <T extends ExecutableFlowElement> T getElementById(String id, Class<T> expectedType) {
final DirectBuffer buffer = BufferUtil.wrapString(id);
Expand All @@ -59,12 +68,4 @@ public <T extends ExecutableFlowElement> T getElementById(String id, Class<T> ex
throw new RuntimeException("Element is not an instance of " + expectedType.getSimpleName());
}
}

public ExecutableFlowNode getStartEvent() {
return startEvent;
}

public void setStartEvent(ExecutableFlowNode startEvent) {
this.startEvent = startEvent;
}
}
Loading

0 comments on commit e54c707

Please sign in to comment.