Skip to content

Commit

Permalink
fix(broker-core): ignore empty or invalid task headers
Browse files Browse the repository at this point in the history
  • Loading branch information
Miguel Pires committed Jan 25, 2019
1 parent 261d7ad commit 7cb055c
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 15 deletions.
Expand Up @@ -17,6 +17,7 @@
*/
package io.zeebe.broker.workflow.model.transformation.transformer;

import static io.zeebe.broker.Broker.LOG;
import static io.zeebe.util.buffer.BufferUtil.wrapString;

import io.zeebe.broker.workflow.model.BpmnStep;
Expand All @@ -30,7 +31,8 @@
import io.zeebe.model.bpmn.instance.zeebe.ZeebeTaskHeaders;
import io.zeebe.msgpack.spec.MsgPackWriter;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.MutableDirectBuffer;
Expand Down Expand Up @@ -80,34 +82,56 @@ private void transformTaskHeaders(ServiceTask element, final ExecutableServiceTa
final ZeebeTaskHeaders taskHeaders = element.getSingleExtensionElement(ZeebeTaskHeaders.class);

if (taskHeaders != null) {
final DirectBuffer encodedHeaders = encode(taskHeaders);
serviceTask.setEncodedHeaders(encodedHeaders);
final List<ZeebeHeader> validHeaders =
taskHeaders
.getHeaders()
.stream()
.filter(this::isValidHeader)
.collect(Collectors.toList());

if (validHeaders.size() < taskHeaders.getHeaders().size()) {
LOG.warn(
"Ignoring invalid headers for task '{}'. Must have non-empty key and value.",
element.getName());
}

if (!validHeaders.isEmpty()) {
final DirectBuffer encodedHeaders = encode(validHeaders);
serviceTask.setEncodedHeaders(encodedHeaders);
}
}
}

private DirectBuffer encode(ZeebeTaskHeaders taskHeaders) {
private DirectBuffer encode(List<ZeebeHeader> taskHeaders) {
final MutableDirectBuffer buffer = new UnsafeBuffer(0, 0);

final Collection<ZeebeHeader> headers = taskHeaders.getHeaders();
final ExpandableArrayBuffer expandableBuffer =
new ExpandableArrayBuffer(INITIAL_SIZE_KEY_VALUE_PAIR * taskHeaders.size());

if (!headers.isEmpty()) {
final ExpandableArrayBuffer expandableBuffer =
new ExpandableArrayBuffer(INITIAL_SIZE_KEY_VALUE_PAIR * headers.size());
msgPackWriter.wrap(expandableBuffer, 0);
msgPackWriter.writeMapHeader(headers.size());
msgPackWriter.wrap(expandableBuffer, 0);
msgPackWriter.writeMapHeader(taskHeaders.size());

headers.forEach(
h -> {
taskHeaders.forEach(
h -> {
if (isValidHeader(h)) {
final DirectBuffer key = wrapString(h.getKey());
msgPackWriter.writeString(key);

final DirectBuffer value = wrapString(h.getValue());
msgPackWriter.writeString(value);
});
}
});

buffer.wrap(expandableBuffer.byteArray(), 0, msgPackWriter.getOffset());
}
buffer.wrap(expandableBuffer.byteArray(), 0, msgPackWriter.getOffset());

return buffer;
}

private boolean isValidHeader(ZeebeHeader header) {
return header != null
&& header.getValue() != null
&& !header.getValue().isEmpty()
&& header.getKey() != null
&& !header.getKey().isEmpty();
}
}
Expand Up @@ -27,6 +27,7 @@
import io.zeebe.exporter.record.value.WorkflowInstanceRecordValue;
import io.zeebe.model.bpmn.Bpmn;
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.protocol.intent.DeploymentIntent;
import io.zeebe.protocol.intent.JobIntent;
import io.zeebe.protocol.intent.TimerIntent;
import io.zeebe.protocol.intent.WorkflowInstanceIntent;
Expand Down Expand Up @@ -167,6 +168,53 @@ public void shouldUnsubscribeFromBoundaryEventTriggersOnTerminating() {
WorkflowInstanceIntent.ELEMENT_TERMINATING, WorkflowInstanceIntent.ELEMENT_TERMINATED);
}

@Test
public void shouldIgnoreTaskHeadersIfEmpty() {
createWorkflowAndAssertIgnoredHeaders("");
}

@Test
public void shouldIgnoreTaskHeadersIfNull() {
createWorkflowAndAssertIgnoredHeaders(null);
}

private void createWorkflowAndAssertIgnoredHeaders(String testValue) {
// given
final BpmnModelInstance model =
Bpmn.createExecutableProcess("process")
.startEvent("start")
.serviceTask("task1", b -> b.zeebeTaskType("type1").zeebeTaskHeader("key", testValue))
.endEvent("end")
.moveToActivity("task1")
.serviceTask("task2", b -> b.zeebeTaskType("type2").zeebeTaskHeader(testValue, "value"))
.connectTo("end")
.moveToActivity("task1")
.serviceTask(
"task3", b -> b.zeebeTaskType("type3").zeebeTaskHeader(testValue, testValue))
.connectTo("end")
.done();

// when
final long deploymentKey = testClient.deploy(model);
testClient.receiveFirstDeploymentEvent(DeploymentIntent.CREATED, deploymentKey);
testClient.createWorkflowInstance("process");

// then
final JobRecordValue firstJob =
testClient.receiveJobs().withType("type1").getFirst().getValue();
assertThat(firstJob.getCustomHeaders()).isEmpty();
testClient.completeJobOfType("type1");

final JobRecordValue secondJob =
testClient.receiveJobs().withType("type2").getFirst().getValue();
assertThat(secondJob.getCustomHeaders()).isEmpty();
testClient.completeJobOfType("type2");

final JobRecordValue thirdJob =
testClient.receiveJobs().withType("type3").getFirst().getValue();
assertThat(thirdJob.getCustomHeaders()).isEmpty();
}

private void shouldUnsubscribeFromBoundaryEventTrigger(
WorkflowInstanceIntent leavingState, WorkflowInstanceIntent leftState) {
// given
Expand Down

0 comments on commit 7cb055c

Please sign in to comment.