Skip to content

Commit

Permalink
Adds support for end-to-end acknowledgements in the S3 Sink. Resolves #…
Browse files Browse the repository at this point in the history
…2732 (#2755)

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable committed May 25, 2023
1 parent 5655383 commit 1d2af56
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 10 deletions.
1 change: 0 additions & 1 deletion data-prepper-plugins/s3-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,3 @@ The following command runs the integration tests:
```
./gradlew :data-prepper-plugins:s3-sink:integrationTest -Dtests.s3sink.region=<your-aws-region> -Dtests.s3sink.bucket=<your-bucket>
```

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.time.Duration;
import java.util.Arrays;
Expand All @@ -27,6 +28,7 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -187,6 +189,7 @@ private Collection<Record<Event>> setEventQueue() {
private static Record<Event> createRecord() {
Map<String, Object> json = generateJson();
final JacksonEvent event = JacksonLog.builder().withData(json).build();
event.setEventHandle(mock(EventHandle.class));
return new Record<>(event);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.sink.accumulator.Buffer;
Expand All @@ -25,6 +26,7 @@

import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -43,6 +45,7 @@ public class S3SinkService {
private final S3SinkConfig s3SinkConfig;
private final Lock reentrantLock;
private final BufferFactory bufferFactory;
private final Collection<EventHandle> bufferedEventHandles;
private final Codec codec;
private Buffer currentBuffer;
private final int maxEvents;
Expand All @@ -69,6 +72,8 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory buffer
this.codec = codec;
reentrantLock = new ReentrantLock();

bufferedEventHandles = new LinkedList<>();

maxEvents = s3SinkConfig.getThresholdOptions().getEventCount();
maxBytes = s3SinkConfig.getThresholdOptions().getMaximumSize();
maxCollectionDuration = s3SinkConfig.getThresholdOptions().getEventCollectTimeOut().getSeconds();
Expand Down Expand Up @@ -100,6 +105,7 @@ void output(Collection<Record<Event>> records) {
final byte[] encodedBytes = encodedEvent.getBytes();

currentBuffer.writeEvent(encodedBytes);
bufferedEventHandles.add(event.getEventHandle());
if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) {
final String s3Key = generateKey();
LOG.info("Writing {} to S3 with {} events and size of {} bytes.",
Expand All @@ -110,10 +116,12 @@ void output(Collection<Record<Event>> records) {
numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount());
objectsSucceededCounter.increment();
s3ObjectSizeSummary.record(currentBuffer.getSize());
releaseEventHandles(true);
} else {
LOG.error("Failed to save {} to S3.", s3Key);
numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount());
objectsFailedCounter.increment();
releaseEventHandles(false);
}
currentBuffer = bufferFactory.getBuffer();
}
Expand All @@ -125,6 +133,14 @@ void output(Collection<Record<Event>> records) {
reentrantLock.unlock();
}

private void releaseEventHandles(boolean result) {
for (EventHandle eventHandle : bufferedEventHandles) {
eventHandle.release(result);
}

bufferedEventHandles.clear();
}

/**
* perform retry in-case any issue occurred, based on max_upload_retries configuration.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -41,9 +42,11 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.TimeZone;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -321,19 +324,124 @@ void test_retryFlushToS3_negative() throws InterruptedException, IOException {
assertFalse(isUploadedToS3);
}

private Collection<Record<Event>> generateRandomStringEventRecord() {
Collection<Record<Event>> records = new ArrayList<>();
for (int i = 0; i < 50; i++) {
final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
records.add(new Record<>(event));

@Test
void output_will_release_all_handles_since_a_flush() throws IOException {
bufferFactory = mock(BufferFactory.class);
final Buffer buffer = mock(Buffer.class);
when(bufferFactory.getBuffer()).thenReturn(buffer);

final long objectSize = random.nextInt(1_000_000) + 10_000;
when(buffer.getSize()).thenReturn(objectSize);

when(codec.parse(any())).thenReturn(UUID.randomUUID().toString());
final S3SinkService s3SinkService = createObjectUnderTest();
final Collection<Record<Event>> records = generateRandomStringEventRecord();
s3SinkService.output(records);

final List<EventHandle> eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());

for (EventHandle eventHandle : eventHandles) {
verify(eventHandle).release(true);
}
}

@Test
void output_will_release_all_handles_since_a_flush_when_S3_fails() throws IOException {
bufferFactory = mock(BufferFactory.class);
final Buffer buffer = mock(Buffer.class);
when(bufferFactory.getBuffer()).thenReturn(buffer);

doThrow(AwsServiceException.class).when(buffer).flushToS3(any(), anyString(), anyString());

final long objectSize = random.nextInt(1_000_000) + 10_000;
when(buffer.getSize()).thenReturn(objectSize);

when(codec.parse(any())).thenReturn(UUID.randomUUID().toString());
final S3SinkService s3SinkService = createObjectUnderTest();
final List<Record<Event>> records = generateEventRecords(1);
s3SinkService.output(records);

final List<EventHandle> eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());

for (EventHandle eventHandle : eventHandles) {
verify(eventHandle).release(false);
}
return records;
}

@Test
void output_will_release_only_new_handles_since_a_flush() throws IOException {
bufferFactory = mock(BufferFactory.class);
final Buffer buffer = mock(Buffer.class);
when(bufferFactory.getBuffer()).thenReturn(buffer);

final long objectSize = random.nextInt(1_000_000) + 10_000;
when(buffer.getSize()).thenReturn(objectSize);

when(codec.parse(any())).thenReturn(UUID.randomUUID().toString());
final S3SinkService s3SinkService = createObjectUnderTest();
final Collection<Record<Event>> records = generateRandomStringEventRecord();
s3SinkService.output(records);
final Collection<Record<Event>> records2 = generateRandomStringEventRecord();
s3SinkService.output(records2);

final List<EventHandle> eventHandles1 = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());

for (EventHandle eventHandle : eventHandles1) {
verify(eventHandle).release(true);
}

final List<EventHandle> eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());

for (EventHandle eventHandle : eventHandles2) {
verify(eventHandle).release(true);
}
}

@Test
void output_will_release_only_new_handles_since_a_flush_when_S3_fails() throws IOException {
bufferFactory = mock(BufferFactory.class);
final Buffer buffer = mock(Buffer.class);
when(bufferFactory.getBuffer()).thenReturn(buffer);

doThrow(AwsServiceException.class).when(buffer).flushToS3(any(), anyString(), anyString());

final long objectSize = random.nextInt(1_000_000) + 10_000;
when(buffer.getSize()).thenReturn(objectSize);

when(codec.parse(any())).thenReturn(UUID.randomUUID().toString());
final S3SinkService s3SinkService = createObjectUnderTest();
final List<Record<Event>> records = generateEventRecords(1);
s3SinkService.output(records);
final List<Record<Event>> records2 = generateEventRecords(1);
s3SinkService.output(records2);

final List<EventHandle> eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());

for (EventHandle eventHandle : eventHandles) {
verify(eventHandle).release(false);
}
final List<EventHandle> eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());

for (EventHandle eventHandle : eventHandles2) {
verify(eventHandle).release(false);
}
}

private Collection<Record<Event>> generateRandomStringEventRecord() {
return generateEventRecords(50);
}

private Collection<Record<Event>> generateLessRandomStringEventRecord() {
Collection<Record<Event>> records = new ArrayList<>();
for (int i = 0; i < 5; i++) {
final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
return generateEventRecords(5);
}

private List<Record<Event>> generateEventRecords(final int numberOfRecords) {
List<Record<Event>> records = new ArrayList<>();
for (int i = 0; i < numberOfRecords; i++) {
final JacksonEvent event = (JacksonEvent) JacksonEvent.fromMessage(UUID.randomUUID().toString());
final EventHandle eventHandle = mock(EventHandle.class);
event.setEventHandle(eventHandle);
records.add(new Record<>(event));
}
return records;
Expand Down

0 comments on commit 1d2af56

Please sign in to comment.