Skip to content

Commit

Permalink
feat(broker): add workflow key to incident record
Browse files Browse the repository at this point in the history
  • Loading branch information
menski committed May 7, 2019
1 parent fbb3b33 commit 73477e8
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 18 deletions.
Expand Up @@ -27,6 +27,7 @@ public class IncidentRecordValueImpl extends RecordValueImpl implements Incident
private final String errorMessage;
private final String bpmnProcessId;
private final String elementId;
private final long workflowKey;
private final long workflowInstanceKey;
private final long elementInstanceKey;
private final long jobKey;
Expand All @@ -38,6 +39,7 @@ public IncidentRecordValueImpl(
final String errorMessage,
final String bpmnProcessId,
final String elementId,
final long workflowKey,
final long workflowInstanceKey,
final long elementInstanceKey,
final long jobKey,
Expand All @@ -47,6 +49,7 @@ public IncidentRecordValueImpl(
this.errorMessage = errorMessage;
this.bpmnProcessId = bpmnProcessId;
this.elementId = elementId;
this.workflowKey = workflowKey;
this.workflowInstanceKey = workflowInstanceKey;
this.elementInstanceKey = elementInstanceKey;
this.jobKey = jobKey;
Expand All @@ -73,6 +76,11 @@ public String getElementId() {
return elementId;
}

@Override
public long getWorkflowKey() {
return workflowKey;
}

@Override
public long getWorkflowInstanceKey() {
return workflowInstanceKey;
Expand Down
Expand Up @@ -234,6 +234,7 @@ private IncidentRecordValue ofIncidentRecord(final LoggedEvent event) {
asString(record.getErrorMessage()),
asString(record.getBpmnProcessId()),
asString(record.getElementId()),
record.getWorkflowKey(),
record.getWorkflowInstanceKey(),
record.getElementInstanceKey(),
record.getJobKey(),
Expand Down
Expand Up @@ -56,6 +56,7 @@ public void processRecord(
.setErrorType(ErrorType.JOB_NO_RETRIES)
.setErrorMessage(incidentErrorMessage)
.setBpmnProcessId(jobHeaders.getBpmnProcessId())
.setWorkflowKey(jobHeaders.getWorkflowKey())
.setWorkflowInstanceKey(jobHeaders.getWorkflowInstanceKey())
.setElementId(jobHeaders.getElementId())
.setElementInstanceKey(jobHeaders.getElementInstanceKey())
Expand Down
Expand Up @@ -132,6 +132,7 @@ public void shouldExportDeploymentEvent() {
public void shouldExportIncidentRecord() {
// given
final long elementInstanceKey = 34;
final long workflowKey = 134;
final long workflowInstanceKey = 10;
final String elementId = "activity";
final String bpmnProcessId = "process";
Expand All @@ -142,6 +143,7 @@ public void shouldExportIncidentRecord() {
final IncidentRecord record =
new IncidentRecord()
.setElementInstanceKey(elementInstanceKey)
.setWorkflowKey(workflowKey)
.setWorkflowInstanceKey(workflowInstanceKey)
.setElementId(wrapString(elementId))
.setBpmnProcessId(wrapString(bpmnProcessId))
Expand All @@ -157,6 +159,7 @@ public void shouldExportIncidentRecord() {
errorMessage,
bpmnProcessId,
elementId,
workflowKey,
workflowInstanceKey,
elementInstanceKey,
jobKey,
Expand Down
Expand Up @@ -26,21 +26,24 @@
public class IncidentAssert {

public static void assertIOMappingIncidentWithNoData(
long workflowKey,
long workflowInstanceKey,
Record<WorkflowInstanceRecordValue> followUpEvent,
Record<IncidentRecordValue> incidentRecord) {
assertIOMappingIncidentWithNoData(
workflowInstanceKey, "failingTask", followUpEvent, incidentRecord);
workflowKey, workflowInstanceKey, "failingTask", followUpEvent, incidentRecord);
}

public static void assertIOMappingIncidentWithNoData(
long workflowKey,
long workflowInstanceKey,
String activityId,
Record<WorkflowInstanceRecordValue> followUpEvent,
Record<IncidentRecordValue> incidentRecord) {
assertIncidentRecordValue(
ErrorType.IO_MAPPING_ERROR.name(),
"No data found for query foo.",
workflowKey,
workflowInstanceKey,
activityId,
followUpEvent,
Expand All @@ -62,13 +65,15 @@ public static void assertIncidentRecordValue(
public static void assertIncidentRecordValue(
String errorType,
String errorMsg,
long workflowKey,
long workflowInstanceKey,
String activityId,
Record<WorkflowInstanceRecordValue> followUpEvent,
Record<IncidentRecordValue> incidentResolvedEvent) {
assertIncidentRecordValue(
errorType,
errorMsg,
workflowKey,
workflowInstanceKey,
activityId,
followUpEvent.getKey(),
Expand All @@ -78,13 +83,15 @@ public static void assertIncidentRecordValue(
public static void assertIncidentRecordValue(
String errorType,
String errorMsg,
long workflowKey,
long workflowInstanceKey,
String activityId,
long activityInstanceKey,
Record<IncidentRecordValue> incidentResolvedEvent) {
assertIncidentRecordValue(
errorType,
errorMsg,
workflowKey,
workflowInstanceKey,
activityId,
activityInstanceKey,
Expand All @@ -95,6 +102,7 @@ public static void assertIncidentRecordValue(
public static void assertIncidentRecordValue(
String errorType,
String errorMsg,
long workflowKey,
long workflowInstanceKey,
String elementId,
long elementInstanceKey,
Expand All @@ -105,6 +113,7 @@ public static void assertIncidentRecordValue(
.hasErrorType(errorType)
.hasErrorMessage(errorMsg)
.hasBpmnProcessId("process")
.hasWorkflowKey(workflowKey)
.hasWorkflowInstanceKey(workflowInstanceKey)
.hasElementId(elementId)
.hasElementInstanceKey(elementInstanceKey)
Expand Down
Expand Up @@ -83,7 +83,7 @@ public void init() {
@Test
public void shouldCreateIncidentIfJobHasNoRetriesLeft() {
// given
testClient.deploy(WORKFLOW_INPUT_MAPPING);
final long workflowKey = testClient.deployWorkflow(WORKFLOW_INPUT_MAPPING).getKey();

final long workflowInstanceKey =
testClient
Expand All @@ -107,6 +107,7 @@ public void shouldCreateIncidentIfJobHasNoRetriesLeft() {
assertIncidentRecordValue(
ErrorType.JOB_NO_RETRIES.name(),
"No more retries left.",
workflowKey,
workflowInstanceKey,
"failingTask",
activityEvent.getKey(),
Expand All @@ -117,7 +118,7 @@ public void shouldCreateIncidentIfJobHasNoRetriesLeft() {
@Test
public void shouldCreateIncidentWithJobErrorMessage() {
// given
testClient.deploy(WORKFLOW_INPUT_MAPPING);
final long workflowKey = testClient.deployWorkflow(WORKFLOW_INPUT_MAPPING).getKey();

final long workflowInstanceKey =
testClient
Expand All @@ -141,6 +142,7 @@ public void shouldCreateIncidentWithJobErrorMessage() {
assertIncidentRecordValue(
ErrorType.JOB_NO_RETRIES.name(),
"failed job",
workflowKey,
workflowInstanceKey,
"failingTask",
activityEvent.getKey(),
Expand All @@ -151,7 +153,7 @@ public void shouldCreateIncidentWithJobErrorMessage() {
@Test
public void shouldIncidentContainLastFailedJobErrorMessage() {
// given
testClient.deploy(WORKFLOW_INPUT_MAPPING);
final long workflowKey = testClient.deployWorkflow(WORKFLOW_INPUT_MAPPING).getKey();

final long workflowInstanceKey =
testClient
Expand All @@ -171,6 +173,7 @@ public void shouldIncidentContainLastFailedJobErrorMessage() {
assertIncidentRecordValue(
ErrorType.JOB_NO_RETRIES.name(),
"second message",
workflowKey,
workflowInstanceKey,
"failingTask",
activityEvent.getKey(),
Expand All @@ -181,7 +184,7 @@ public void shouldIncidentContainLastFailedJobErrorMessage() {
@Test
public void shouldResolveIncidentIfJobRetriesIncreased() {
// given
testClient.deploy(WORKFLOW_INPUT_MAPPING);
final long workflowKey = testClient.deployWorkflow(WORKFLOW_INPUT_MAPPING).getKey();

final long workflowInstanceKey =
testClient
Expand Down Expand Up @@ -211,6 +214,7 @@ public void shouldResolveIncidentIfJobRetriesIncreased() {
assertIncidentRecordValue(
ErrorType.JOB_NO_RETRIES.name(),
"No more retries left.",
workflowKey,
workflowInstanceKey,
"failingTask",
activityEvent.getKey(),
Expand Down Expand Up @@ -250,7 +254,7 @@ public void shouldResolveIncidentIfJobRetriesIncreased() {
@Test
public void shouldDeleteIncidentIfJobIsCanceled() {
// given
testClient.deploy(WORKFLOW_INPUT_MAPPING);
final long workflowKey = testClient.deployWorkflow(WORKFLOW_INPUT_MAPPING).getKey();

final long workflowInstanceKey =
testClient
Expand Down Expand Up @@ -280,6 +284,7 @@ public void shouldDeleteIncidentIfJobIsCanceled() {
assertIncidentRecordValue(
ErrorType.JOB_NO_RETRIES.name(),
"No more retries left.",
workflowKey,
workflowInstanceKey,
"failingTask",
resolvedIncidentEvent.getValue().getElementInstanceKey(),
Expand Down
Expand Up @@ -87,7 +87,7 @@ public void init() {
@Test
public void shouldCreateIncidentForInputMappingFailure() {
// given
testClient.deploy(WORKFLOW_INPUT_MAPPING);
final long workflowKey = testClient.deployWorkflow(WORKFLOW_INPUT_MAPPING).getKey();

// when
final long workflowInstanceKey =
Expand All @@ -107,7 +107,8 @@ public void shouldCreateIncidentForInputMappingFailure() {
.isEqualTo(createIncidentEvent.getPosition());
assertThat(incidentEvent.getValue().getVariableScopeKey()).isEqualTo(failureEvent.getKey());

assertIOMappingIncidentWithNoData(workflowInstanceKey, failureEvent, incidentEvent);
assertIOMappingIncidentWithNoData(
workflowKey, workflowInstanceKey, failureEvent, incidentEvent);
}

@Test
Expand Down Expand Up @@ -146,7 +147,7 @@ public void shouldCreateIncidentForNonMatchingAndMatchingValueOnInputMapping() {
@Test
public void shouldCreateIncidentForOutputMappingFailure() {
// given
testClient.deploy(WORKFLOW_OUTPUT_MAPPING);
final long workflowKey = testClient.deployWorkflow(WORKFLOW_OUTPUT_MAPPING).getKey();

// when
final long workflowInstanceKey =
Expand All @@ -168,13 +169,14 @@ public void shouldCreateIncidentForOutputMappingFailure() {
assertThat(incidentEvent.getSourceRecordPosition())
.isEqualTo(createIncidentEvent.getPosition());

assertIOMappingIncidentWithNoData(workflowInstanceKey, failureEvent, incidentEvent);
assertIOMappingIncidentWithNoData(
workflowKey, workflowInstanceKey, failureEvent, incidentEvent);
}

@Test
public void shouldResolveIncidentForInputMappingFailure() {
// given
testClient.deploy(WORKFLOW_INPUT_MAPPING);
final long workflowKey = testClient.deployWorkflow(WORKFLOW_INPUT_MAPPING).getKey();

final long workflowInstanceKey =
testClient.createWorkflowInstance(r -> r.setBpmnProcessId("process")).getInstanceKey();
Expand All @@ -200,13 +202,14 @@ public void shouldResolveIncidentForInputMappingFailure() {
assertThat(incidentResolveCommand.getPosition())
.isEqualTo(incidentResolvedEvent.getSourceRecordPosition());

assertIOMappingIncidentWithNoData(workflowInstanceKey, followUpEvent, incidentResolvedEvent);
assertIOMappingIncidentWithNoData(
workflowKey, workflowInstanceKey, followUpEvent, incidentResolvedEvent);
}

@Test
public void shouldResolveIncidentForOutputMappingFailure() {
// given
testClient.deploy(WORKFLOW_OUTPUT_MAPPING);
final long workflowKey = testClient.deployWorkflow(WORKFLOW_OUTPUT_MAPPING).getKey();

final long workflowInstanceKey =
testClient.createWorkflowInstance(r -> r.setBpmnProcessId("process")).getInstanceKey();
Expand Down Expand Up @@ -235,7 +238,8 @@ public void shouldResolveIncidentForOutputMappingFailure() {
assertThat(incidentResolveCommand.getPosition())
.isEqualTo(incidentResolvedEvent.getSourceRecordPosition());

assertIOMappingIncidentWithNoData(workflowInstanceKey, followUpEvent, incidentResolvedEvent);
assertIOMappingIncidentWithNoData(
workflowKey, workflowInstanceKey, followUpEvent, incidentResolvedEvent);
}

@Test
Expand All @@ -249,7 +253,7 @@ public void shouldCreateNewIncidentAfterResolvedFirstOne() {
t -> t.zeebeTaskType("external").zeebeInput("foo", "foo").zeebeInput("bar", "bar"))
.done();

testClient.deploy(modelInstance);
final long workflowKey = testClient.deployWorkflow(modelInstance).getKey();

final long workflowInstanceKey =
testClient.createWorkflowInstance(r -> r.setBpmnProcessId("process")).getInstanceKey();
Expand Down Expand Up @@ -279,6 +283,7 @@ public void shouldCreateNewIncidentAfterResolvedFirstOne() {
assertIncidentRecordValue(
ErrorType.IO_MAPPING_ERROR.name(),
"No data found for query foo.",
workflowKey,
workflowInstanceKey,
"failingTask",
failureEvent,
Expand All @@ -288,7 +293,7 @@ public void shouldCreateNewIncidentAfterResolvedFirstOne() {
@Test
public void shouldResolveIncidentAfterPreviousResolvingFailed() {
// given
testClient.deploy(WORKFLOW_INPUT_MAPPING);
final long workflowKey = testClient.deployWorkflow(WORKFLOW_INPUT_MAPPING).getKey();

final long workflowInstanceKey =
testClient.createWorkflowInstance(r -> r.setBpmnProcessId("process")).getInstanceKey();
Expand Down Expand Up @@ -323,6 +328,7 @@ public void shouldResolveIncidentAfterPreviousResolvingFailed() {
assertIncidentRecordValue(
ErrorType.IO_MAPPING_ERROR.name(),
"No data found for query foo.",
workflowKey,
workflowInstanceKey,
"failingTask",
failureEvent,
Expand Down Expand Up @@ -364,7 +370,7 @@ public void shouldResolveMultipleIncidents() {
@Test
public void shouldResolveIncidentIfActivityTerminated() {
// given
testClient.deploy(WORKFLOW_INPUT_MAPPING);
final long workflowKey = testClient.deployWorkflow(WORKFLOW_INPUT_MAPPING).getKey();

final long workflowInstanceKey =
testClient.createWorkflowInstance(r -> r.setBpmnProcessId("process")).getInstanceKey();
Expand All @@ -389,6 +395,7 @@ public void shouldResolveIncidentIfActivityTerminated() {
assertIncidentRecordValue(
ErrorType.IO_MAPPING_ERROR.name(),
"No data found for query foo.",
workflowKey,
workflowInstanceKey,
"failingTask",
incidentResolvedEvent.getValue().getElementInstanceKey(),
Expand All @@ -399,7 +406,7 @@ public void shouldResolveIncidentIfActivityTerminated() {
@Category(UnstableTest.class)
public void shouldProcessIncidentsAfterMultipleTerminations() {
// given
testClient.deploy(WORKFLOW_INPUT_MAPPING);
final long workflowKey = testClient.deployWorkflow(WORKFLOW_INPUT_MAPPING).getKey();

// create and cancel instance with incident
long workflowInstanceKey =
Expand Down Expand Up @@ -431,6 +438,7 @@ public void shouldProcessIncidentsAfterMultipleTerminations() {
assertIncidentRecordValue(
ErrorType.IO_MAPPING_ERROR.name(),
"No data found for query foo.",
workflowKey,
workflowInstanceKey,
"failingTask",
incidentEvent.getValue().getElementInstanceKey(),
Expand Down
Expand Up @@ -35,6 +35,12 @@ public interface IncidentRecordValue extends RecordValue {
*/
String getBpmnProcessId();

/**
* @return the key of the workflow this incident belongs to. Can be <code>null</code> if the
* incident belongs to no workflow.
*/
long getWorkflowKey();

/**
* @return the key of the workflow instance this incident belongs to. Can be <code>null</code> if
* the incident belongs to no workflow instance.
Expand Down
Expand Up @@ -21,6 +21,9 @@
"bpmnProcessId": {
"type": "keyword"
},
"workflowKey": {
"type": "long"
},
"workflowInstanceKey": {
"type": "long"
},
Expand Down

0 comments on commit 73477e8

Please sign in to comment.