Skip to content

Commit

Permalink
feat(broker): support timer start event
Browse files Browse the repository at this point in the history
  • Loading branch information
Miguel Pires committed Jan 2, 2019
1 parent dc6d06c commit f28c318
Show file tree
Hide file tree
Showing 17 changed files with 469 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.zeebe.broker.workflow.deployment.distribute.processor.DeploymentCreatedProcessor;
import io.zeebe.broker.workflow.deployment.distribute.processor.DeploymentDistributeProcessor;
import io.zeebe.broker.workflow.processor.BpmnStepProcessor;
import io.zeebe.broker.workflow.processor.CatchEventBehavior;
import io.zeebe.broker.workflow.processor.WorkflowEventProcessors;
import io.zeebe.broker.workflow.processor.deployment.DeploymentEventProcessors;
import io.zeebe.broker.workflow.processor.timer.DueDateTimerChecker;
Expand Down Expand Up @@ -195,7 +196,10 @@ public void addDeploymentRelatedProcessorAndServices(
workflowState,
partitionServiceName));
DeploymentEventProcessors.addTransformingDeploymentProcessor(
typedProcessorBuilder, zeebeState);
typedProcessorBuilder,
zeebeState,
new CatchEventBehavior(
zeebeState, new SubscriptionCommandSender(clusterCfg, managementApi)));
} else {
DeploymentEventProcessors.addDeploymentCreateProcessor(typedProcessorBuilder, workflowState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ public class TimerRecord extends UnpackedObject {
private final LongProperty dueDateProp = new LongProperty("dueDate");
private final StringProperty handlerNodeId = new StringProperty("handlerNodeId");
private final IntegerProperty repetitionsProp = new IntegerProperty("repetitions");
private final StringProperty bpmnProcessIdProp = new StringProperty("bpmnProcessId");
private final LongProperty workflowKeyProp = new LongProperty("workflowKey");

public TimerRecord() {
this.declareProperty(elementInstanceKeyProp)
.declareProperty(dueDateProp)
.declareProperty(handlerNodeId)
.declareProperty(repetitionsProp)
.declareProperty(bpmnProcessIdProp);
.declareProperty(workflowKeyProp);
}

public long getElementInstanceKey() {
Expand Down Expand Up @@ -75,12 +75,12 @@ public TimerRecord setRepetitions(int repetitions) {
return this;
}

public DirectBuffer getBpmnId() {
return bpmnProcessIdProp.getValue();
public long getWorkflowKey() {
return workflowKeyProp.getValue();
}

public TimerRecord setBpmnId(DirectBuffer bpmnId) {
this.bpmnProcessIdProp.setValue(bpmnId);
public TimerRecord setWorkflowKey(long workflowKey) {
this.workflowKeyProp.setValue(workflowKey);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public void processRecord(
final TypedResponseWriter responseWriter,
final TypedStreamWriter streamWriter) {
final DeploymentRecord deploymentEvent = event.getValue();

streamWriter.appendFollowUpCommand(
event.getKey(), DeploymentIntent.DISTRIBUTE, deploymentEvent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@
*/
public class ExecutableFlowElementContainer extends ExecutableActivity {

private ExecutableFlowNode startEvent;
private ExecutableCatchEventElement startEvent;

public ExecutableFlowElementContainer(String id) {
super(id);
}

public ExecutableFlowNode getStartEvent() {
public ExecutableCatchEventElement getStartEvent() {
return startEvent;
}

public void setStartEvent(ExecutableFlowNode startEvent) {
public void setStartEvent(ExecutableCatchEventElement startEvent) {
this.startEvent = startEvent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package io.zeebe.broker.workflow.model.transformation.transformer;

import io.zeebe.broker.workflow.model.BpmnStep;
import io.zeebe.broker.workflow.model.element.ExecutableCatchEventElement;
import io.zeebe.broker.workflow.model.element.ExecutableFlowElementContainer;
import io.zeebe.broker.workflow.model.element.ExecutableFlowNode;
import io.zeebe.broker.workflow.model.element.ExecutableWorkflow;
import io.zeebe.broker.workflow.model.transformation.ModelElementTransformer;
import io.zeebe.broker.workflow.model.transformation.TransformContext;
Expand All @@ -37,8 +37,8 @@ public Class<StartEvent> getType() {
@Override
public void transform(StartEvent element, TransformContext context) {
final ExecutableWorkflow workflow = context.getCurrentWorkflow();
final ExecutableFlowNode startEvent =
workflow.getElementById(element.getId(), ExecutableFlowNode.class);
final ExecutableCatchEventElement startEvent =
workflow.getElementById(element.getId(), ExecutableCatchEventElement.class);

if (element.getScope() instanceof FlowNode) {
final FlowNode scope = (FlowNode) element.getScope();
Expand All @@ -54,7 +54,8 @@ public void transform(StartEvent element, TransformContext context) {
bindLifecycle(context, startEvent);
}

private void bindLifecycle(TransformContext context, final ExecutableFlowNode startEvent) {
private void bindLifecycle(
TransformContext context, final ExecutableCatchEventElement startEvent) {
startEvent.bindLifecycleState(WorkflowInstanceIntent.EVENT_TRIGGERING, BpmnStep.APPLY_EVENT);
startEvent.bindLifecycleState(
WorkflowInstanceIntent.EVENT_TRIGGERED, context.getCurrentFlowNodeOutgoingStep());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.zeebe.msgpack.query.MsgPackQueryProcessor;
import io.zeebe.msgpack.query.MsgPackQueryProcessor.QueryResult;
import io.zeebe.msgpack.query.MsgPackQueryProcessor.QueryResults;
import io.zeebe.msgpack.value.DocumentValue;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.intent.TimerIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
Expand Down Expand Up @@ -87,8 +86,8 @@ public void subscribeToEvents(
if (event.isTimer()) {
subscribeToTimerEvent(
context.getRecord().getKey(),
context.getRecord().getValue().getWorkflowKey(),
event.getId(),
context.getRecord().getValue().getBpmnProcessId(),
event.getTimer(),
context.getOutput().getStreamWriter());
} else if (event.isMessage()) {
Expand All @@ -105,7 +104,6 @@ private void validateEventSubscription(BpmnStepContext<?> context, ExecutableCat

public boolean occurEventForElement(
long elementInstanceKey,
DirectBuffer bpmnId,
DirectBuffer eventHandlerId,
DirectBuffer eventPayload,
TypedStreamWriter streamWriter) {
Expand Down Expand Up @@ -146,28 +144,24 @@ public boolean occurEventForElement(

return true;

} else if (isStartEvent(elementInstanceKey, bpmnId)) {
// if the event is a timer start event

final DeployedWorkflow latestWorkflow =
state.getWorkflowState().getLatestWorkflowVersionByProcessId(bpmnId);

if (latestWorkflow != null) {
workflowInstanceRecord.reset();
workflowInstanceRecord.setVersion(latestWorkflow.getVersion());
workflowInstanceRecord.setWorkflowKey(latestWorkflow.getKey());
workflowInstanceRecord.setBpmnProcessId(bpmnId);
workflowInstanceRecord.setPayload(eventPayload);
streamWriter.appendNewCommand(WorkflowInstanceIntent.CREATE, workflowInstanceRecord);
return true;
}
return false;
} else {
// ignore the event if the element is left
return false;
}
}

public void occurStartEvent(
long workflowKey, DirectBuffer eventPayload, TypedStreamWriter streamWriter) {
final DeployedWorkflow workflow = state.getWorkflowState().getWorkflowByKey(workflowKey);

workflowInstanceRecord.reset();
workflowInstanceRecord.setVersion(workflow.getVersion());
workflowInstanceRecord.setWorkflowKey(workflow.getKey());
workflowInstanceRecord.setBpmnProcessId(workflow.getBpmnProcessId());
workflowInstanceRecord.setPayload(eventPayload);
streamWriter.appendNewCommand(WorkflowInstanceIntent.CREATE, workflowInstanceRecord);
}

public void deferEvent(BpmnStepContext<?> context) {
if (context.getState() != WorkflowInstanceIntent.EVENT_OCCURRED) {
throw new IllegalStateException(
Expand Down Expand Up @@ -199,8 +193,8 @@ public void triggerDeferredEvent(BpmnStepContext<?> context) {

public void subscribeToTimerEvent(
long elementInstanceKey,
long workflowKey,
DirectBuffer handlerNodeId,
DirectBuffer bpmnId,
RepeatingInterval timer,
TypedStreamWriter writer) {
final long nowMs = ActorClock.currentTimeMillis();
Expand All @@ -211,7 +205,7 @@ public void subscribeToTimerEvent(
.setDueDate(dueDate)
.setElementInstanceKey(elementInstanceKey)
.setHandlerNodeId(handlerNodeId)
.setBpmnId(bpmnId);
.setWorkflowKey(workflowKey);
writer.appendNewCommand(TimerIntent.CREATE, timerRecord);
}

Expand All @@ -223,12 +217,12 @@ private void unsubscribeFromTimerEvents(long elementInstanceKey, TypedStreamWrit
elementInstanceKey, t -> unsubscribeFromTimerEvent(t, writer));
}

private void unsubscribeFromTimerEvent(TimerInstance timer, TypedStreamWriter writer) {
public void unsubscribeFromTimerEvent(TimerInstance timer, TypedStreamWriter writer) {
timerRecord
.setElementInstanceKey(timer.getElementInstanceKey())
.setDueDate(timer.getDueDate())
.setHandlerNodeId(timer.getHandlerNodeId())
.setBpmnId(timer.getBpmnId());
.setWorkflowKey(timer.getWorkflowKey());

writer.appendFollowUpCommand(timer.getKey(), TimerIntent.CANCEL, timerRecord);
}
Expand Down Expand Up @@ -348,10 +342,4 @@ public MessageCorrelationKeyException(String message) {
super(message);
}
}

private boolean isStartEvent(long elementInstanceKey, DirectBuffer bpmnId) {
return elementInstanceKey == -1
&& bpmnId != null
&& !bpmnId.equals(DocumentValue.EMPTY_DOCUMENT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import io.zeebe.broker.logstreams.processor.TypedEventStreamProcessorBuilder;
import io.zeebe.broker.logstreams.state.ZeebeState;
import io.zeebe.broker.workflow.processor.CatchEventBehavior;
import io.zeebe.broker.workflow.state.WorkflowState;
import io.zeebe.protocol.clientapi.ValueType;

Expand All @@ -33,8 +34,13 @@ public static void addDeploymentCreateProcessor(
}

public static void addTransformingDeploymentProcessor(
TypedEventStreamProcessorBuilder processorBuilder, ZeebeState zeebeState) {
TypedEventStreamProcessorBuilder processorBuilder,
ZeebeState zeebeState,
CatchEventBehavior catchEventBehavior) {

processorBuilder.onCommand(
ValueType.DEPLOYMENT, CREATE, new TransformingDeploymentCreateProcessor(zeebeState));
ValueType.DEPLOYMENT,
CREATE,
new TransformingDeploymentCreateProcessor(zeebeState, catchEventBehavior));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,37 @@
*/
package io.zeebe.broker.workflow.processor.deployment;

import io.zeebe.broker.logstreams.processor.*;
import static io.zeebe.broker.workflow.state.TimerInstance.NO_ELEMENT_INSTANCE;

import io.zeebe.broker.logstreams.processor.SideEffectProducer;
import io.zeebe.broker.logstreams.processor.TypedRecord;
import io.zeebe.broker.logstreams.processor.TypedRecordProcessor;
import io.zeebe.broker.logstreams.processor.TypedResponseWriter;
import io.zeebe.broker.logstreams.processor.TypedStreamWriter;
import io.zeebe.broker.logstreams.state.ZeebeState;
import io.zeebe.broker.workflow.deployment.transform.DeploymentTransformer;
import io.zeebe.broker.workflow.model.element.ExecutableCatchEventElement;
import io.zeebe.broker.workflow.processor.CatchEventBehavior;
import io.zeebe.broker.workflow.state.WorkflowState;
import io.zeebe.protocol.clientapi.RejectionType;
import io.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.zeebe.protocol.impl.record.value.deployment.Workflow;
import io.zeebe.protocol.intent.DeploymentIntent;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;

public class TransformingDeploymentCreateProcessor
implements TypedRecordProcessor<DeploymentRecord> {

private final DeploymentTransformer deploymentTransformer;
private final WorkflowState workflowState;
private final CatchEventBehavior catchEventBehavior;

public TransformingDeploymentCreateProcessor(final ZeebeState zeebeState) {
public TransformingDeploymentCreateProcessor(
final ZeebeState zeebeState, CatchEventBehavior catchEventBehavior) {
this.workflowState = zeebeState.getWorkflowState();
this.deploymentTransformer = new DeploymentTransformer(zeebeState);
this.catchEventBehavior = catchEventBehavior;
}

@Override
Expand All @@ -51,6 +64,8 @@ public void processRecord(
if (workflowState.putDeployment(key, deploymentEvent)) {
responseWriter.writeEventOnCommand(key, DeploymentIntent.CREATED, deploymentEvent, command);
streamWriter.appendFollowUpEvent(key, DeploymentIntent.CREATED, deploymentEvent);

createTimerIfTimerStartEvent(command, streamWriter);
} else {
// should not be possible
responseWriter.writeRejectionOnCommand(
Expand All @@ -69,4 +84,34 @@ public void processRecord(
deploymentTransformer.getRejectionReason());
}
}

private void createTimerIfTimerStartEvent(
TypedRecord<DeploymentRecord> record, TypedStreamWriter streamWriter) {
for (Workflow workflow : record.getValue().workflows()) {
final ExecutableCatchEventElement startEvent =
workflowState.getWorkflowByKey(workflow.getKey()).getWorkflow().getStartEvent();

workflowState
.getTimerState()
.forEachTimerForElementInstance(
NO_ELEMENT_INSTANCE,
timer -> {
final DirectBuffer timerBpmnId =
workflowState.getWorkflowByKey(timer.getWorkflowKey()).getBpmnProcessId();

if (timerBpmnId.equals(workflow.getBpmnProcessId())) {
catchEventBehavior.unsubscribeFromTimerEvent(timer, streamWriter);
}
});

if (startEvent.isTimer()) {
catchEventBehavior.subscribeToTimerEvent(
NO_ELEMENT_INSTANCE,
workflow.getKey(),
startEvent.getId(),
startEvent.getTimer(),
streamWriter);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ public void processRecord(
final boolean isOccurred =
catchEventBehavior.occurEventForElement(
elementInstanceKey,
null,
subscription.getHandlerNodeId(),
record.getValue().getPayload(),
streamWriter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void processRecord(
timerInstance.setKey(timerKey);
timerInstance.setHandlerNodeId(timer.getHandlerNodeId());
timerInstance.setRepetitions(timer.getRepetitions());
timerInstance.setBpmnId(timer.getBpmnId());
timerInstance.setWorkflowKey(timer.getWorkflowKey());

sideEffect.accept(this::scheduleTimer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private boolean triggerTimer(TimerInstance timer) {
.setDueDate(timer.getDueDate())
.setHandlerNodeId(timer.getHandlerNodeId())
.setRepetitions(timer.getRepetitions())
.setBpmnId(timer.getBpmnId());
.setWorkflowKey(timer.getWorkflowKey());

streamWriter.appendFollowUpCommand(timer.getKey(), TimerIntent.TRIGGER, timerRecord);

Expand Down
Loading

0 comments on commit f28c318

Please sign in to comment.