Skip to content

Commit

Permalink
feat(broker): export variable records to elasticsearch
Browse files Browse the repository at this point in the history
- add template for variable records
- export variable records by default
- move helper methods to exporter configuration to check which records should
  be exported
  • Loading branch information
menski committed Jan 17, 2019
1 parent 00288bb commit f96aa8f
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
import io.zeebe.exporter.context.Context;
import io.zeebe.exporter.context.Controller;
import io.zeebe.exporter.record.Record;
import io.zeebe.exporter.record.RecordMetadata;
import io.zeebe.exporter.spi.Exporter;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.ValueType;
import java.time.Duration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -74,7 +72,7 @@ public void close() {

@Override
public void export(Record record) {
if (shouldIndexRecord(record)) {
if (configuration.shouldIndexRecord(record)) {
client.index(record);
}

Expand Down Expand Up @@ -129,6 +127,9 @@ private void createIndexTemplates() {
if (index.raft) {
createValueIndexTemplate(ValueType.RAFT);
}
if (index.variable) {
createValueIndexTemplate(ValueType.VARIABLE);
}
if (index.workflowInstance) {
createValueIndexTemplate(ValueType.WORKFLOW_INSTANCE);
}
Expand All @@ -151,48 +152,4 @@ private void createValueIndexTemplate(final ValueType valueType) {
log.warn("Put index template for value type {} was not acknowledged", valueType);
}
}

private boolean shouldIndexRecord(Record<?> record) {
final RecordMetadata metadata = record.getMetadata();
return shouldIndexRecordType(metadata.getRecordType())
&& shouldIndexValueType(metadata.getValueType());
}

private boolean shouldIndexValueType(ValueType valueType) {
switch (valueType) {
case DEPLOYMENT:
return configuration.index.deployment;
case INCIDENT:
return configuration.index.incident;
case JOB:
return configuration.index.job;
case JOB_BATCH:
return configuration.index.jobBatch;
case MESSAGE:
return configuration.index.message;
case MESSAGE_SUBSCRIPTION:
return configuration.index.messageSubscription;
case RAFT:
return configuration.index.raft;
case WORKFLOW_INSTANCE:
return configuration.index.workflowInstance;
case WORKFLOW_INSTANCE_SUBSCRIPTION:
return configuration.index.workflowInstanceSubscription;
default:
return false;
}
}

private boolean shouldIndexRecordType(RecordType recordType) {
switch (recordType) {
case EVENT:
return configuration.index.event;
case COMMAND:
return configuration.index.command;
case COMMAND_REJECTION:
return configuration.index.rejection;
default:
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
*/
package io.zeebe.exporter;

import io.zeebe.exporter.record.Record;
import io.zeebe.exporter.record.RecordMetadata;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.ValueType;

public class ElasticsearchExporterConfiguration {

// elasticsearch http url
Expand Down Expand Up @@ -56,6 +61,7 @@ public static class IndexConfiguration {
public boolean message = false;
public boolean messageSubscription = false;
public boolean raft = false;
public boolean variable = true;
public boolean workflowInstance = true;
public boolean workflowInstanceSubscription = false;

Expand Down Expand Up @@ -85,6 +91,8 @@ public String toString() {
+ messageSubscription
+ ", raft="
+ raft
+ ", variable="
+ variable
+ ", workflowInstance="
+ workflowInstance
+ ", workflowInstanceSubscription="
Expand All @@ -104,4 +112,50 @@ public String toString() {
return "BulkConfiguration{" + "delay=" + delay + ", size=" + size + '}';
}
}

public boolean shouldIndexRecord(Record<?> record) {
final RecordMetadata metadata = record.getMetadata();
return shouldIndexRecordType(metadata.getRecordType())
&& shouldIndexValueType(metadata.getValueType());
}

private boolean shouldIndexValueType(ValueType valueType) {
switch (valueType) {
case DEPLOYMENT:
return index.deployment;
case INCIDENT:
return index.incident;
case JOB:
return index.job;
case JOB_BATCH:
return index.jobBatch;
case MESSAGE:
return index.message;
case MESSAGE_SUBSCRIPTION:
return index.messageSubscription;
case RAFT:
return index.raft;
case VARIABLE:
return index.variable;
case WORKFLOW_INSTANCE:
return index.workflowInstance;
case WORKFLOW_INSTANCE_SUBSCRIPTION:
return index.workflowInstanceSubscription;
default:
return false;
}
}

private boolean shouldIndexRecordType(RecordType recordType) {
switch (recordType) {
case EVENT:
return index.event;
case COMMAND:
return index.command;
case COMMAND_REJECTION:
return index.rejection;
default:
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"index_patterns": [
"zeebe-record-variable_*"
],
"order": 20,
"aliases": {
"zeebe-record-variable": {}
},
"mappings": {
"_doc": {
"properties": {
"value": {
"dynamic": "strict",
"properties": {
"name": {
"type": "keyword"
},
"value": {
"type": "text"
},
"scopeInstanceKey": {
"type": "long"
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.zeebe.exporter.record.Record;
import io.zeebe.exporter.record.RecordMetadata;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.test.exporter.ExporterIntegrationRule;
import io.zeebe.util.ZbLogger;
import java.io.IOException;
Expand All @@ -47,14 +44,15 @@ public class ElasticsearchExporterIT {

@Rule public final ExporterIntegrationRule exporterBrokerRule = new ExporterIntegrationRule();

private ElasticsearchExporterConfiguration configuration;
private ElasticsearchTestClient esClient;

@Before
public void setUp() {
esClient =
createElasticsearchClient(
exporterBrokerRule.getExporterConfiguration(
"elasticsearch", ElasticsearchExporterConfiguration.class));
configuration =
exporterBrokerRule.getExporterConfiguration(
"elasticsearch", ElasticsearchExporterConfiguration.class);
esClient = createElasticsearchClient(configuration);
}

@After
Expand All @@ -78,22 +76,12 @@ public void shouldExportRecords() {
// assert all records which where recorded during the tests where exported
exporterBrokerRule.visitExportedRecords(
r -> {
if (shouldBeExported(r)) {
if (configuration.shouldIndexRecord(r)) {
assertRecordExported(r);
}
});
}

private boolean shouldBeExported(Record<?> r) {
final RecordMetadata metadata = r.getMetadata();
final ValueType valueType = metadata.getValueType();
return metadata.getRecordType() == RecordType.EVENT
&& (valueType == ValueType.DEPLOYMENT
|| valueType == ValueType.WORKFLOW_INSTANCE
|| valueType == ValueType.JOB
|| valueType == ValueType.INCIDENT);
}

private void assertIndexSettings() {
final ImmutableOpenMap<String, Settings> settingsForIndices = esClient.getSettingsForIndices();
for (ObjectCursor<String> key : settingsForIndices.keys()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void shouldCreateIndexTemplates() {
config.index.message = true;
config.index.messageSubscription = true;
config.index.raft = true;
config.index.variable = true;
config.index.workflowInstance = true;
config.index.workflowInstanceSubscription = true;

Expand All @@ -73,6 +74,7 @@ public void shouldCreateIndexTemplates() {
verify(esClient).putIndexTemplate(ValueType.MESSAGE);
verify(esClient).putIndexTemplate(ValueType.MESSAGE_SUBSCRIPTION);
verify(esClient).putIndexTemplate(ValueType.RAFT);
verify(esClient).putIndexTemplate(ValueType.VARIABLE);
verify(esClient).putIndexTemplate(ValueType.WORKFLOW_INSTANCE);
verify(esClient).putIndexTemplate(ValueType.WORKFLOW_INSTANCE_SUBSCRIPTION);
}
Expand All @@ -88,6 +90,7 @@ public void shouldExportEnabledValueTypes() {
config.index.message = true;
config.index.messageSubscription = true;
config.index.raft = true;
config.index.variable = true;
config.index.workflowInstance = true;
config.index.workflowInstanceSubscription = true;

Expand All @@ -102,6 +105,7 @@ public void shouldExportEnabledValueTypes() {
ValueType.MESSAGE,
ValueType.MESSAGE_SUBSCRIPTION,
ValueType.RAFT,
ValueType.VARIABLE,
ValueType.WORKFLOW_INSTANCE,
ValueType.WORKFLOW_INSTANCE_SUBSCRIPTION
};
Expand All @@ -126,6 +130,7 @@ public void shouldNotExportDisabledValueTypes() {
config.index.message = false;
config.index.messageSubscription = false;
config.index.raft = false;
config.index.variable = false;
config.index.workflowInstance = false;
config.index.workflowInstanceSubscription = false;

Expand All @@ -140,6 +145,7 @@ public void shouldNotExportDisabledValueTypes() {
ValueType.MESSAGE,
ValueType.MESSAGE_SUBSCRIPTION,
ValueType.RAFT,
ValueType.VARIABLE,
ValueType.WORKFLOW_INSTANCE,
ValueType.WORKFLOW_INSTANCE_SUBSCRIPTION
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ className = "io.zeebe.exporter.ElasticsearchExporter"
message = true
messageSubscription = true
raft = true
variable = true
workflowInstance = true
workflowInstanceSubscription = true

0 comments on commit f96aa8f

Please sign in to comment.