Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for end-to-end acknowledgements in the S3 Sink #2755

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion data-prepper-plugins/s3-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,3 @@ The following command runs the integration tests:
```
./gradlew :data-prepper-plugins:s3-sink:integrationTest -Dtests.s3sink.region=<your-aws-region> -Dtests.s3sink.bucket=<your-bucket>
```

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.time.Duration;
import java.util.Arrays;
Expand All @@ -27,6 +28,7 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.log.JacksonLog;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -187,6 +189,7 @@ private Collection<Record<Event>> setEventQueue() {
private static Record<Event> createRecord() {
Map<String, Object> json = generateJson();
final JacksonEvent event = JacksonLog.builder().withData(json).build();
event.setEventHandle(mock(EventHandle.class));
return new Record<>(event);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.types.ByteCount;
import org.opensearch.dataprepper.plugins.sink.accumulator.Buffer;
Expand All @@ -25,6 +26,7 @@

import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -43,6 +45,7 @@ public class S3SinkService {
private final S3SinkConfig s3SinkConfig;
private final Lock reentrantLock;
private final BufferFactory bufferFactory;
private final Collection<EventHandle> bufferedEventHandles;
private final Codec codec;
private Buffer currentBuffer;
private final int maxEvents;
Expand All @@ -69,6 +72,8 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory buffer
this.codec = codec;
reentrantLock = new ReentrantLock();

bufferedEventHandles = new LinkedList<>();

maxEvents = s3SinkConfig.getThresholdOptions().getEventCount();
maxBytes = s3SinkConfig.getThresholdOptions().getMaximumSize();
maxCollectionDuration = s3SinkConfig.getThresholdOptions().getEventCollectTimeOut().getSeconds();
Expand Down Expand Up @@ -100,6 +105,7 @@ void output(Collection<Record<Event>> records) {
final byte[] encodedBytes = encodedEvent.getBytes();

currentBuffer.writeEvent(encodedBytes);
bufferedEventHandles.add(event.getEventHandle());
if (ThresholdCheck.checkThresholdExceed(currentBuffer, maxEvents, maxBytes, maxCollectionDuration)) {
final String s3Key = generateKey();
LOG.info("Writing {} to S3 with {} events and size of {} bytes.",
Expand All @@ -110,10 +116,12 @@ void output(Collection<Record<Event>> records) {
numberOfRecordsSuccessCounter.increment(currentBuffer.getEventCount());
objectsSucceededCounter.increment();
s3ObjectSizeSummary.record(currentBuffer.getSize());
releaseEventHandles(true);
} else {
LOG.error("Failed to save {} to S3.", s3Key);
numberOfRecordsFailedCounter.increment(currentBuffer.getEventCount());
objectsFailedCounter.increment();
releaseEventHandles(false);
}
currentBuffer = bufferFactory.getBuffer();
}
Expand All @@ -125,6 +133,14 @@ void output(Collection<Record<Event>> records) {
reentrantLock.unlock();
}

private void releaseEventHandles(boolean result) {
for (EventHandle eventHandle : bufferedEventHandles) {
eventHandle.release(result);
}

bufferedEventHandles.clear();
}

/**
* perform retry in-case any issue occurred, based on max_upload_retries configuration.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -41,9 +42,11 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.TimeZone;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -321,19 +324,124 @@ void test_retryFlushToS3_negative() throws InterruptedException, IOException {
assertFalse(isUploadedToS3);
}

private Collection<Record<Event>> generateRandomStringEventRecord() {
Collection<Record<Event>> records = new ArrayList<>();
for (int i = 0; i < 50; i++) {
final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
records.add(new Record<>(event));

@Test
void output_will_release_all_handles_since_a_flush() throws IOException {
bufferFactory = mock(BufferFactory.class);
final Buffer buffer = mock(Buffer.class);
when(bufferFactory.getBuffer()).thenReturn(buffer);

final long objectSize = random.nextInt(1_000_000) + 10_000;
when(buffer.getSize()).thenReturn(objectSize);

when(codec.parse(any())).thenReturn(UUID.randomUUID().toString());
final S3SinkService s3SinkService = createObjectUnderTest();
final Collection<Record<Event>> records = generateRandomStringEventRecord();
s3SinkService.output(records);

final List<EventHandle> eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());

for (EventHandle eventHandle : eventHandles) {
verify(eventHandle).release(true);
}
}

@Test
void output_will_release_all_handles_since_a_flush_when_S3_fails() throws IOException {
bufferFactory = mock(BufferFactory.class);
final Buffer buffer = mock(Buffer.class);
when(bufferFactory.getBuffer()).thenReturn(buffer);

doThrow(AwsServiceException.class).when(buffer).flushToS3(any(), anyString(), anyString());

final long objectSize = random.nextInt(1_000_000) + 10_000;
when(buffer.getSize()).thenReturn(objectSize);

when(codec.parse(any())).thenReturn(UUID.randomUUID().toString());
final S3SinkService s3SinkService = createObjectUnderTest();
final List<Record<Event>> records = generateEventRecords(1);
s3SinkService.output(records);

final List<EventHandle> eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());

for (EventHandle eventHandle : eventHandles) {
verify(eventHandle).release(false);
}
return records;
}

@Test
void output_will_release_only_new_handles_since_a_flush() throws IOException {
bufferFactory = mock(BufferFactory.class);
final Buffer buffer = mock(Buffer.class);
when(bufferFactory.getBuffer()).thenReturn(buffer);

final long objectSize = random.nextInt(1_000_000) + 10_000;
when(buffer.getSize()).thenReturn(objectSize);

when(codec.parse(any())).thenReturn(UUID.randomUUID().toString());
final S3SinkService s3SinkService = createObjectUnderTest();
final Collection<Record<Event>> records = generateRandomStringEventRecord();
s3SinkService.output(records);
final Collection<Record<Event>> records2 = generateRandomStringEventRecord();
s3SinkService.output(records2);

final List<EventHandle> eventHandles1 = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());

for (EventHandle eventHandle : eventHandles1) {
verify(eventHandle).release(true);
}

final List<EventHandle> eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());

for (EventHandle eventHandle : eventHandles2) {
verify(eventHandle).release(true);
}
}

@Test
void output_will_release_only_new_handles_since_a_flush_when_S3_fails() throws IOException {
bufferFactory = mock(BufferFactory.class);
final Buffer buffer = mock(Buffer.class);
when(bufferFactory.getBuffer()).thenReturn(buffer);

doThrow(AwsServiceException.class).when(buffer).flushToS3(any(), anyString(), anyString());

final long objectSize = random.nextInt(1_000_000) + 10_000;
when(buffer.getSize()).thenReturn(objectSize);

when(codec.parse(any())).thenReturn(UUID.randomUUID().toString());
final S3SinkService s3SinkService = createObjectUnderTest();
final List<Record<Event>> records = generateEventRecords(1);
s3SinkService.output(records);
final List<Record<Event>> records2 = generateEventRecords(1);
s3SinkService.output(records2);

final List<EventHandle> eventHandles = records.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());

for (EventHandle eventHandle : eventHandles) {
verify(eventHandle).release(false);
}
final List<EventHandle> eventHandles2 = records2.stream().map(Record::getData).map(Event::getEventHandle).collect(Collectors.toList());

for (EventHandle eventHandle : eventHandles2) {
verify(eventHandle).release(false);
}
}

private Collection<Record<Event>> generateRandomStringEventRecord() {
return generateEventRecords(50);
}

private Collection<Record<Event>> generateLessRandomStringEventRecord() {
Collection<Record<Event>> records = new ArrayList<>();
for (int i = 0; i < 5; i++) {
final Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
return generateEventRecords(5);
}

private List<Record<Event>> generateEventRecords(final int numberOfRecords) {
List<Record<Event>> records = new ArrayList<>();
for (int i = 0; i < numberOfRecords; i++) {
final JacksonEvent event = (JacksonEvent) JacksonEvent.fromMessage(UUID.randomUUID().toString());
final EventHandle eventHandle = mock(EventHandle.class);
event.setEventHandle(eventHandle);
records.add(new Record<>(event));
}
return records;
Expand Down
Loading