Skip to content

Commit

Permalink
fix(protocol): flatten RecordMetadata into Record
Browse files Browse the repository at this point in the history
BREAKING CHANGE:
- Removed metadata field from Record
- Added intent, partitionId, recordType, rejectionType, rejectionReason
and valueType fields to Record
  • Loading branch information
Miguel Pires committed Jun 21, 2019
1 parent 05fe865 commit 6593979
Show file tree
Hide file tree
Showing 76 changed files with 532 additions and 584 deletions.
Expand Up @@ -26,7 +26,6 @@
import io.zeebe.exporter.api.context.Context;
import io.zeebe.exporter.api.context.Controller;
import io.zeebe.protocol.record.Record;
import io.zeebe.protocol.record.RecordMetadata;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.intent.JobIntent;
import io.zeebe.test.broker.protocol.commandapi.CommandApiRule;
Expand Down Expand Up @@ -105,8 +104,7 @@ public void close() {

@Override
public void export(final Record record) {
final RecordMetadata metadata = record.getMetadata();
if (metadata.getValueType() == ValueType.JOB && metadata.getIntent() == JobIntent.CREATED) {
if (record.getValueType() == ValueType.JOB && record.getIntent() == JobIntent.CREATED) {
exportLatch.countDown();
}

Expand Down
Expand Up @@ -115,9 +115,7 @@ public void shouldRemoveExporterFromState() {
private boolean isDeploymentExported(long deploymentKey1) {
return TestExporter.records.stream()
.anyMatch(
r ->
r.getKey() == deploymentKey1
&& r.getMetadata().getIntent() == DeploymentIntent.DISTRIBUTED);
r -> r.getKey() == deploymentKey1 && r.getIntent() == DeploymentIntent.DISTRIBUTED);
}

public static class TestExporter extends DebugLogExporter {
Expand Down
Expand Up @@ -56,7 +56,7 @@ public void processRecord(
entityKey = command.getKey();
wrappedProcessor.onCommand(command, this, streamWriter);

final boolean respond = command.getMetadata().hasRequestMetadata();
final boolean respond = command.hasRequestMetadata();

if (isAccepted) {
streamWriter.appendFollowUpEvent(entityKey, newState, updatedValue);
Expand Down
Expand Up @@ -324,7 +324,7 @@ private void writeRejectionOnCommand(Throwable exception) {
String.format(PROCESSING_ERROR_MESSAGE, typedEvent, exception.getMessage());
LOG.error(errorMessage, exception);

if (typedEvent.getMetadata().getRecordType() == RecordType.COMMAND) {
if (typedEvent.getRecordType() == RecordType.COMMAND) {
logStreamWriter.appendRejection(typedEvent, RejectionType.PROCESSING_ERROR, errorMessage);
responseWriter.writeRejectionOnCommand(
typedEvent, RejectionType.PROCESSING_ERROR, errorMessage);
Expand Down
47 changes: 43 additions & 4 deletions engine/src/main/java/io/zeebe/engine/processor/TypedEventImpl.java
Expand Up @@ -20,6 +20,10 @@
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.zeebe.protocol.record.RecordType;
import io.zeebe.protocol.record.RejectionType;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.intent.Intent;
import java.time.Instant;

@SuppressWarnings({"rawtypes"})
Expand Down Expand Up @@ -48,20 +52,55 @@ public long getKey() {
}

@Override
public RecordMetadata getMetadata() {
return metadata;
public UnifiedRecordValue getValue() {
return value;
}

@Override
public UnifiedRecordValue getValue() {
return value;
public int getRequestStreamId() {
return metadata.getRequestStreamId();
}

@Override
public long getRequestId() {
return metadata.getRequestId();
}

@Override
public String toString() {
return "TypedEventImpl{" + "metadata=" + metadata + ", value=" + value + '}';
}

@Override
public Intent getIntent() {
return metadata.getIntent();
}

@Override
public int getPartitionId() {
return metadata.getPartitionId();
}

@Override
public RecordType getRecordType() {
return metadata.getRecordType();
}

@Override
public RejectionType getRejectionType() {
return metadata.getRejectionType();
}

@Override
public String getRejectionReason() {
return metadata.getRejectionReason();
}

@Override
public ValueType getValueType() {
return metadata.getValueType();
}

@Override
public long getSourceRecordPosition() {
return rawEvent.getSourceEventPosition();
Expand Down
13 changes: 10 additions & 3 deletions engine/src/main/java/io/zeebe/engine/processor/TypedRecord.java
Expand Up @@ -17,18 +17,25 @@
*/
package io.zeebe.engine.processor;

import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.zeebe.protocol.record.Record;
import io.zeebe.protocol.record.RecordMetadataEncoder;

public interface TypedRecord<T extends UnifiedRecordValue> extends Record<T> {

long getKey();

RecordMetadata getMetadata();

T getValue();

int getRequestStreamId();

long getRequestId();

default boolean hasRequestMetadata() {
return getRequestId() != RecordMetadataEncoder.requestIdNullValue()
&& getRequestStreamId() != RecordMetadataEncoder.requestStreamIdNullValue();
}

default int getMaxValueLength() {
throw new UnsupportedOperationException();
}
Expand Down
Expand Up @@ -18,9 +18,9 @@
package io.zeebe.engine.processor;

import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.record.RecordType;
import io.zeebe.protocol.record.RejectionType;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.intent.Intent;
import java.nio.charset.StandardCharsets;
import org.agrona.DirectBuffer;
Expand Down Expand Up @@ -49,11 +49,13 @@ public void writeRejectionOnCommand(TypedRecord<?> command, RejectionType type,

stage(
RecordType.COMMAND_REJECTION,
command.getMetadata().getIntent(),
command.getIntent(),
command.getKey(),
type,
stringWrapper,
command.getMetadata(),
command.getValueType(),
command.getRequestId(),
command.getRequestStreamId(),
command.getValue());
}

Expand All @@ -63,11 +65,13 @@ public void writeEvent(TypedRecord<?> event) {

stage(
RecordType.EVENT,
event.getMetadata().getIntent(),
event.getIntent(),
event.getKey(),
RejectionType.NULL_VAL,
stringWrapper,
event.getMetadata(),
event.getValueType(),
event.getRequestId(),
event.getRequestStreamId(),
event.getValue());
}

Expand All @@ -82,7 +86,9 @@ public void writeEventOnCommand(
eventKey,
RejectionType.NULL_VAL,
stringWrapper,
command.getMetadata(),
command.getValueType(),
command.getRequestId(),
command.getRequestStreamId(),
eventValue);
}

Expand All @@ -92,20 +98,22 @@ private void stage(
long key,
RejectionType rejectionType,
DirectBuffer rejectionReason,
RecordMetadata metadata,
ValueType valueType,
long requestId,
int requestStreamId,
UnpackedObject value) {
writer
.partitionId(partitionId)
.key(key)
.intent(intent)
.recordType(type)
.valueType(metadata.getValueType())
.valueType(valueType)
.rejectionType(rejectionType)
.rejectionReason(rejectionReason)
.valueWriter(value);

this.requestId = metadata.getRequestId();
this.requestStreamId = metadata.getRequestStreamId();
this.requestId = requestId;
this.requestStreamId = requestStreamId;
isResponseStaged = true;
}

Expand Down
Expand Up @@ -58,7 +58,7 @@ public void appendRejection(
appendRecord(
command.getKey(),
RecordType.COMMAND_REJECTION,
command.getMetadata().getIntent(),
command.getIntent(),
rejectionType,
reason,
command.getValue(),
Expand All @@ -74,7 +74,7 @@ public void appendRejection(
appendRecord(
command.getKey(),
RecordType.COMMAND_REJECTION,
command.getMetadata().getIntent(),
command.getIntent(),
rejectionType,
reason,
command.getValue(),
Expand Down
Expand Up @@ -65,7 +65,7 @@ public void processRecord(
processRecordValue(
record.getKey(),
record.getValue(),
(WorkflowInstanceIntent) record.getMetadata().getIntent(),
(WorkflowInstanceIntent) record.getIntent(),
streamWriter,
sideEffect);
}
Expand Down
Expand Up @@ -38,7 +38,7 @@ public WorkflowInstanceCommandContext(final EventOutput eventOutput) {
}

public WorkflowInstanceIntent getCommand() {
return (WorkflowInstanceIntent) record.getMetadata().getIntent();
return (WorkflowInstanceIntent) record.getIntent();
}

public TypedRecord<WorkflowInstanceRecord> getRecord() {
Expand Down
2 changes: 1 addition & 1 deletion engine/src/main/java/io/zeebe/engine/state/ZeebeState.java
Expand Up @@ -137,7 +137,7 @@ public boolean isOnBlacklist(TypedRecord record) {
}

public boolean tryToBlacklist(TypedRecord<?> typedRecord, Consumer<Long> onBlacklistingInstance) {
final Intent intent = typedRecord.getMetadata().getIntent();
final Intent intent = typedRecord.getIntent();
if (shouldBeBlacklisted(intent)) {
final UnpackedObject value = typedRecord.getValue();
if (value instanceof WorkflowInstanceRelated) {
Expand Down
Expand Up @@ -195,8 +195,7 @@ public void shouldSkipFailingEvent() {
.getFirst();

assertThat(deploymentRejection.getKey()).isEqualTo(failingKey);
assertThat(deploymentRejection.getMetadata().getRejectionType())
.isEqualTo(RejectionType.PROCESSING_ERROR);
assertThat(deploymentRejection.getRejectionType()).isEqualTo(RejectionType.PROCESSING_ERROR);
}

protected DeploymentRecord deployment(final String name, final ResourceType resourceType) {
Expand Down
Expand Up @@ -69,7 +69,7 @@ public WorkflowInstanceAssert doesNotEvaluateFlowAfterTerminatingElement(String
actual.stream()
.filter(
r ->
r.getMetadata().getIntent() == WorkflowInstanceIntent.ELEMENT_TERMINATING
r.getIntent() == WorkflowInstanceIntent.ELEMENT_TERMINATING
&& elementIdBuffer.equals(r.getValue().getElementIdBuffer()))
.findFirst();

Expand All @@ -96,7 +96,7 @@ public WorkflowInstanceAssert doesNotEvaluateFlowAfterTerminatingElement(String
.filter(r -> r.getSourceRecordPosition() > terminatingRecord.getPosition())
.map(r -> recordsByPosition.get(r.getSourceRecordPosition()))
.filter(r -> r.getValue().getFlowScopeKey() == instanceKey)
.filter(r -> isFlowEvaluatingState(r.getMetadata().getIntent()))
.filter(r -> isFlowEvaluatingState(r.getIntent()))
.findFirst();

if (firstViolatingRecord.isPresent()) {
Expand Down
Expand Up @@ -164,7 +164,7 @@ public Record<WorkflowInstanceRecord> createAndReceiveWorkflowInstance(

return awaitAndGetFirstWorkflowInstanceRecord(
r ->
r.getMetadata().getIntent() == WorkflowInstanceIntent.ELEMENT_ACTIVATING
r.getIntent() == WorkflowInstanceIntent.ELEMENT_ACTIVATING
&& r.getKey() == createdRecord.getValue().getWorkflowInstanceKey());
}

Expand All @@ -179,7 +179,7 @@ public Record<WorkflowInstanceCreationRecord> createWorkflowInstance(
ValueType.WORKFLOW_INSTANCE_CREATION,
(e) ->
e.getSourceRecordPosition() == position
&& e.getMetadata().getIntent() == WorkflowInstanceCreationIntent.CREATED,
&& e.getIntent() == WorkflowInstanceCreationIntent.CREATED,
new WorkflowInstanceCreationRecord());
}

Expand Down

0 comments on commit 6593979

Please sign in to comment.