Skip to content

Commit

Permalink
fix(broker-core): Convert numeric correlation key to string
Browse files Browse the repository at this point in the history
  • Loading branch information
Miguel Pires committed Dec 7, 2018
1 parent 6a22030 commit 953ec7b
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 17 deletions.
2 changes: 1 addition & 1 deletion broker-core/src/main/java/io/zeebe/broker/workflow/processor/CatchEventOutput.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ private DirectBuffer extractCorrelationKey(
}

if (result.isLong()) {
return result.getLongAsBuffer();
return result.getLongAsString();
}

errorMessage = "the value must be either a string or a number";
Expand Down
18 changes: 18 additions & 0 deletions broker-core/src/test/java/io/zeebe/broker/workflow/message/MessageCorrelationTest.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,24 @@ public void shouldCorrelateMessageIfPublishedBefore() {
assertWorkflowInstancePayload(event, "{'key':'order-123', 'foo':'bar'}");
}

@Test
public void shouldCorrelateMessageIfCorrelationKeyIsANumber() {
// given
testClient.deploy(SINGLE_MESSAGE_WORKFLOW);

testClient.publishMessage("message", "123", asMsgPack("foo", "bar"));

// when
final long workflowInstanceKey =
testClient.createWorkflowInstance(PROCESS_ID, asMsgPack("key", 123));

// then
final Record<WorkflowInstanceRecordValue> event =
testClient.receiveFirstWorkflowInstanceEvent(WorkflowInstanceIntent.ELEMENT_COMPLETED);
assertWorkflowInstanceRecord(workflowInstanceKey, "receive-message", event);
assertWorkflowInstancePayload(event, "{'key':123, 'foo':'bar'}");
}

@Test
public void shouldCorrelateFirstPublishedMessage() {
// given
Expand Down
6 changes: 3 additions & 3 deletions docs/src/bpmn-workflows/message-events.md
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Message Events

Message events are events which reference a message. They can be used to wait until a proper message is received.
Message events are events which reference a message. They can be used to wait until a proper message is received.

> Currently, messages can be published only externally using one of the Zeebe clients.
Expand All @@ -11,7 +11,7 @@ A message can be referenced by one or more message events. It holds the informat
* the name of the message
* the correlation key

The correlation key is specified as JSON Path expression. It is evaluated when the message event is entered and extracts the value from the workflow instance payload. The value must be a string. If the correlation key can't be resolved or it is not a string then an incident is created.
The correlation key is specified as [JSON Path](reference/json-conditions.html) expression. It is evaluated when the message event is entered and extracts the value from the workflow instance payload. The value must be either a string or a number. If the correlation key can't be resolved or it is neither a string nor a number then an incident is created.

XML representation:

Expand Down Expand Up @@ -60,4 +60,4 @@ XML representation:
* [Message Correlation](reference/message-correlation.html)
* [JSON Path](reference/json-conditions.html)
* [Input/Output Mappings](/bpmn-workflows/data-flow.html#inputoutput-mappings)
* [Incidents](/reference/incidents.html)
* [Incidents](/reference/incidents.html)
4 changes: 2 additions & 2 deletions docs/src/bpmn-workflows/receive-tasks.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Receive Tasks

Receive tasks are tasks which references a message. They can be used to wait until a proper message is received.
Receive tasks are tasks which references a message. They can be used to wait until a proper message is received.

## Messages

Expand All @@ -9,7 +9,7 @@ A message can be referenced by one or more receive tasks. It holds the informati
* the name of the message
* the correlation key

The correlation key is specified as [JSON Path](reference/json-conditions.html) expression. It is evaluated when the receive task is entered and extracts the value from the workflow instance payload. The value must be a string.
The correlation key is specified as [JSON Path](reference/json-conditions.html) expression. It is evaluated when the receive task is entered and extracts the value from the workflow instance payload. The value must be either a string or a number.

XML representation:

Expand Down
10 changes: 6 additions & 4 deletions json-path/src/main/java/io/zeebe/msgpack/query/MsgPackQueryProcessor.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.zeebe.msgpack.spec.MsgPackReader;
import io.zeebe.msgpack.spec.MsgPackToken;
import io.zeebe.msgpack.spec.MsgPackType;
import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

Expand Down Expand Up @@ -78,7 +77,7 @@ private MsgPackToken readToken(int index) {

public class QueryResult {

private final UnsafeBuffer longResultBuffer = new UnsafeBuffer(new byte[BitUtil.SIZE_OF_LONG]);
private final UnsafeBuffer longResultBuffer = new UnsafeBuffer();

private MsgPackToken token;

Expand All @@ -102,11 +101,14 @@ public DirectBuffer getString() {
return token.getValueBuffer();
}

public DirectBuffer getLongAsBuffer() {
public DirectBuffer getLongAsString() {
if (!isLong()) {
throw new RuntimeException(String.format("expected Long but found '%s'", token.getType()));
}
longResultBuffer.putLong(0, token.getIntegerValue());

final long key = token.getIntegerValue();
final String converted = String.valueOf(key);
longResultBuffer.wrap(converted.getBytes());
return longResultBuffer;
}
}
Expand Down
10 changes: 3 additions & 7 deletions json-path/src/test/java/io/zeebe/msgpack/query/MsgPackQueryProcessorTest.java
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import io.zeebe.msgpack.jsonpath.JsonPathQueryCompiler;
import io.zeebe.msgpack.query.MsgPackQueryProcessor.QueryResult;
import io.zeebe.msgpack.query.MsgPackQueryProcessor.QueryResults;
import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Test;

public class MsgPackQueryProcessorTest {
Expand Down Expand Up @@ -62,7 +60,7 @@ public void shouldGetSingleResultString() {
}

@Test
public void shouldGetSingleResultLongAsBuffer() {
public void shouldGetSingleResultLongAsString() {
final QueryResults results =
processor.process(
path("$.foo"),
Expand All @@ -78,9 +76,7 @@ public void shouldGetSingleResultLongAsBuffer() {
assertThat(result).isNotNull();
assertThat(result.isLong()).isTrue();

final UnsafeBuffer buffer = new UnsafeBuffer(new byte[BitUtil.SIZE_OF_LONG]);
buffer.putLong(0, 1L);
assertThat(result.getLongAsBuffer()).isEqualTo(buffer);
assertThat(result.getLongAsString()).isEqualTo(wrapString(String.valueOf(1L)));
}

@Test
Expand Down Expand Up @@ -140,7 +136,7 @@ public void shouldThrowExceptionIfNotLong() {

assertThat(results.size()).isEqualTo(1);

assertThatThrownBy(() -> results.getSingleResult().getLongAsBuffer())
assertThatThrownBy(() -> results.getSingleResult().getLongAsString())
.isInstanceOf(RuntimeException.class)
.hasMessage("expected Long but found 'BOOLEAN'");
}
Expand Down

0 comments on commit 953ec7b

Please sign in to comment.