Skip to content

Commit

Permalink
feat(test): adds a slew of Exporter test utilities
Browse files Browse the repository at this point in the history
- mock implementations of most io.zeebe.exporter.* interfaces
- provide utility to run all scheduled tasks with optional delay
- provides ExporterTestHarness to simulate different points of the
exporter lifecycle
- provides utility to stream record to an exporter
  • Loading branch information
npepinpe committed Dec 19, 2018
1 parent ddeb46d commit 899221e
Show file tree
Hide file tree
Showing 12 changed files with 1,142 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,32 @@
package io.zeebe.exporter;

import static io.zeebe.exporter.ElasticsearchExporter.ZEEBE_RECORD_TEMPLATE_JSON;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.zeebe.exporter.context.Configuration;
import io.zeebe.exporter.context.Context;
import io.zeebe.exporter.context.Controller;
import io.zeebe.exporter.record.Record;
import io.zeebe.exporter.record.RecordMetadata;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.ValueType;
import io.zeebe.util.ZbLogger;
import io.zeebe.test.exporter.ExporterTestHarness;
import java.time.Duration;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;

public class ElasticsearchExporterTest {

private ExporterTestHarness testHarness;
private ElasticsearchExporterConfiguration config;
private ElasticsearchClient esClient;
private Controller controller;

@Before
public void setUp() {
config = new ElasticsearchExporterConfiguration();
esClient = mockElasticsearchClient();
controller = mock(Controller.class);
}

@Test
Expand All @@ -68,7 +60,7 @@ public void shouldCreateIndexTemplates() {
config.index.workflowInstanceSubscription = true;

// when
createExporter(config);
createAndOpenExporter();

// then
verify(esClient).putIndexTemplate("foo-bar", ZEEBE_RECORD_TEMPLATE_JSON, "-");
Expand Down Expand Up @@ -98,7 +90,7 @@ public void shouldExportEnabledValueTypes() {
config.index.workflowInstance = true;
config.index.workflowInstanceSubscription = true;

final ElasticsearchExporter exporter = createExporter(config);
createAndOpenExporter();

final ValueType[] valueTypes =
new ValueType[] {
Expand All @@ -115,8 +107,9 @@ public void shouldExportEnabledValueTypes() {

// when - then
for (ValueType valueType : valueTypes) {
final Record record = mockRecord(valueType, RecordType.EVENT);
exporter.export(record);
final Record record =
testHarness.export(
r -> r.getMetadata().setValueType(valueType).setRecordType(RecordType.EVENT));
verify(esClient).index(record);
}
}
Expand All @@ -135,7 +128,7 @@ public void shouldNotExportDisabledValueTypes() {
config.index.workflowInstance = false;
config.index.workflowInstanceSubscription = false;

final ElasticsearchExporter exporter = createExporter(config);
createAndOpenExporter();

final ValueType[] valueTypes =
new ValueType[] {
Expand All @@ -152,8 +145,9 @@ public void shouldNotExportDisabledValueTypes() {

// when - then
for (ValueType valueType : valueTypes) {
final Record record = mockRecord(valueType, RecordType.EVENT);
exporter.export(record);
final Record record =
testHarness.export(
r -> r.getMetadata().setValueType(valueType).setRecordType(RecordType.EVENT));
verify(esClient, never()).index(record);
}
}
Expand All @@ -162,11 +156,15 @@ public void shouldNotExportDisabledValueTypes() {
public void shouldIgnoreUnknownValueType() {
// given
config.index.event = true;
final ElasticsearchExporter exporter = createExporter(config);
final Record record = mockRecord(ValueType.SBE_UNKNOWN, RecordType.EVENT);
createAndOpenExporter();

// when
exporter.export(record);
final Record record =
testHarness.export(
r ->
r.getMetadata()
.setValueType(ValueType.SBE_UNKNOWN)
.setRecordType(RecordType.EVENT));

// then
verify(esClient, never()).index(record);
Expand All @@ -180,15 +178,16 @@ public void shouldExportEnabledRecordTypes() {
config.index.rejection = true;
config.index.deployment = true;

final ElasticsearchExporter exporter = createExporter(config);
createAndOpenExporter();

final RecordType[] recordTypes =
new RecordType[] {RecordType.COMMAND, RecordType.EVENT, RecordType.COMMAND_REJECTION};

// when - then
for (RecordType recordType : recordTypes) {
final Record record = mockRecord(ValueType.DEPLOYMENT, recordType);
exporter.export(record);
final Record record =
testHarness.export(
r -> r.getMetadata().setValueType(ValueType.DEPLOYMENT).setRecordType(recordType));
verify(esClient).index(record);
}
}
Expand All @@ -201,15 +200,16 @@ public void shouldNotExportDisabledRecordTypes() {
config.index.rejection = false;
config.index.deployment = true;

final ElasticsearchExporter exporter = createExporter(config);
createAndOpenExporter();

final RecordType[] recordTypes =
new RecordType[] {RecordType.COMMAND, RecordType.EVENT, RecordType.COMMAND_REJECTION};

// when - then
for (RecordType recordType : recordTypes) {
final Record record = mockRecord(ValueType.DEPLOYMENT, recordType);
exporter.export(record);
final Record record =
testHarness.export(
r -> r.getMetadata().setValueType(ValueType.DEPLOYMENT).setRecordType(recordType));
verify(esClient, never()).index(record);
}
}
Expand All @@ -218,11 +218,16 @@ public void shouldNotExportDisabledRecordTypes() {
public void shouldIgnoreUnknownRecordType() {
// given
config.index.deployment = true;
final ElasticsearchExporter exporter = createExporter(config);
final Record record = mockRecord(ValueType.DEPLOYMENT, RecordType.SBE_UNKNOWN);
final ElasticsearchExporter exporter = createAndOpenExporter();

// when
exporter.export(record);
createAndOpenExporter();
final Record record =
testHarness.export(
r ->
r.getMetadata()
.setValueType(ValueType.DEPLOYMENT)
.setRecordType(RecordType.SBE_UNKNOWN));

// then
verify(esClient, never()).index(record);
Expand All @@ -231,27 +236,28 @@ public void shouldIgnoreUnknownRecordType() {
@Test
public void shouldUpdateLastPositionOnFlush() {
// given
final ElasticsearchExporter exporter = createExporter(config);
when(esClient.shouldFlush()).thenReturn(true);

final long position = 1234L;
final Record record = mockRecord(ValueType.WORKFLOW_INSTANCE, RecordType.EVENT);
when(record.getPosition()).thenReturn(position);

// when
exporter.export(record);
createAndOpenExporter();
final Record record =
testHarness.export(
r ->
r.getMetadata()
.setValueType(ValueType.WORKFLOW_INSTANCE)
.setRecordType(RecordType.EVENT));

// then
verify(controller).updateLastExportedRecordPosition(position);
assertThat(testHarness.getController().getPosition()).isEqualTo(record.getPosition());
}

@Test
public void shouldFlushOnClose() {
// given
final ElasticsearchExporter exporter = createExporter(config);
createAndOpenExporter();

// when
exporter.close();
testHarness.close();

// then
verify(esClient).flush();
Expand All @@ -267,21 +273,22 @@ public void shouldFlushAfterDelay() {
// scenario: bulk size is not reached still we want to flush
config.bulk.size = Integer.MAX_VALUE;
when(esClient.shouldFlush()).thenReturn(false);

final ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);

final ElasticsearchExporter exporter = createExporter(config);
createAndOpenExporter();

// when
exporter.export(mockRecord(ValueType.WORKFLOW_INSTANCE, RecordType.EVENT));
testHarness.export(
r ->
r.getMetadata()
.setValueType(ValueType.WORKFLOW_INSTANCE)
.setRecordType(RecordType.EVENT));

// then
verify(controller).scheduleTask(eq(Duration.ofSeconds(10)), captor.capture());
assertThat(testHarness.getController().getScheduledTasks()).hasSize(1);
assertThat(testHarness.getController().getScheduledTasks().get(0).getDelay())
.isEqualTo(Duration.ofSeconds(config.bulk.delay));

// when
captor.getValue().run();

// then
// and
testHarness.getController().runScheduledTasks(Duration.ofSeconds(config.bulk.delay));
verify(esClient).flush();
}

Expand All @@ -291,70 +298,36 @@ public void shouldUpdatePositionAfterDelayEvenIfNoRecordsAreExported() {
// scenario: events are not exported but still their position should be recorded
config.index.event = false;
config.index.workflowInstance = false;

final ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);

final ElasticsearchExporter exporter = createExporter(config);

final Record record = mockRecord(ValueType.WORKFLOW_INSTANCE, RecordType.EVENT);
when(record.getPosition()).thenReturn(1L, 2L, 3L);

exporter.export(record);
exporter.export(record);
exporter.export(record);

verify(controller).scheduleTask(any(), captor.capture());
createAndOpenExporter();

// when
captor.getValue().run();
testHarness
.stream(
r ->
r.getMetadata()
.setValueType(ValueType.WORKFLOW_INSTANCE)
.setRecordType(RecordType.EVENT))
.export(4);
testHarness.getController().runScheduledTasks(Duration.ofSeconds(config.bulk.delay));

// then no record was indexed but the exporter record position was updated
verify(esClient, never()).index(any());
verify(controller).updateLastExportedRecordPosition(3L);
assertThat(testHarness.getController().getPosition()).isEqualTo(testHarness.getPosition());
}

private ElasticsearchExporter createExporter(
final ElasticsearchExporterConfiguration configuration) {
private ElasticsearchExporter createAndOpenExporter() {
final ElasticsearchExporter exporter =
new ElasticsearchExporter() {
@Override
protected ElasticsearchClient createClient() {
return esClient;
}
};
exporter.configure(createContext(configuration));
exporter.open(controller);
return exporter;
}

private Context createContext(final ElasticsearchExporterConfiguration configuration) {
return new Context() {
@Override
public Logger getLogger() {
return new ZbLogger("io.zeebe.exporter.elasticsearch");
}
testHarness = new ExporterTestHarness(exporter);
testHarness.configure("elasticsearch", config);
testHarness.open();

@Override
public Configuration getConfiguration() {
return new Configuration() {
@Override
public String getId() {
return "elasticsearch";
}

@Override
public Map<String, Object> getArguments() {
throw new UnsupportedOperationException("not supported in test case");
}

@Override
@SuppressWarnings("unchecked")
public <T> T instantiate(Class<T> configClass) {
return (T) configuration;
}
};
}
};
return exporter;
}

private ElasticsearchClient mockElasticsearchClient() {
Expand All @@ -364,15 +337,4 @@ private ElasticsearchClient mockElasticsearchClient() {
when(client.putIndexTemplate(anyString(), anyString(), anyString())).thenReturn(true);
return client;
}

private Record mockRecord(final ValueType valueType, final RecordType recordType) {
final RecordMetadata metadata = mock(RecordMetadata.class);
when(metadata.getValueType()).thenReturn(valueType);
when(metadata.getRecordType()).thenReturn(recordType);

final Record record = mock(Record.class);
when(record.getMetadata()).thenReturn(metadata);

return record;
}
}
Loading

0 comments on commit 899221e

Please sign in to comment.