Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moves the S3 sink bucket configuration to the root configuration #2759

Merged
merged 2 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,6 @@

package org.opensearch.dataprepper.plugins.sink;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import org.hamcrest.CoreMatchers;
Expand All @@ -38,7 +25,6 @@
import org.opensearch.dataprepper.plugins.sink.accumulator.ObjectKey;
import org.opensearch.dataprepper.plugins.sink.codec.JsonCodec;
import org.opensearch.dataprepper.plugins.sink.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.sink.configuration.BucketOptions;
import org.opensearch.dataprepper.plugins.sink.configuration.ObjectKeyOptions;
import org.opensearch.dataprepper.plugins.sink.configuration.ThresholdOptions;
import software.amazon.awssdk.core.ResponseBytes;
Expand All @@ -50,6 +36,21 @@
import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
import software.amazon.awssdk.services.s3.model.S3Object;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class S3SinkServiceIT {

Expand All @@ -67,8 +68,6 @@ class S3SinkServiceIT {
@Mock
private ObjectKeyOptions objectKeyOptions;
@Mock
private BucketOptions bucketOptions;
@Mock
private PluginMetrics pluginMetrics;
@Mock
private Counter snapshotSuccessCounter;
Expand All @@ -94,13 +93,12 @@ public void setUp() {

when(objectKeyOptions.getNamePattern()).thenReturn("elb-log-%{yyyy-MM-dd'T'hh-mm-ss}");
when(objectKeyOptions.getPathPrefix()).thenReturn(PATH_PREFIX);
when(bucketOptions.getBucketName()).thenReturn(bucketName);
when(bucketOptions.getObjectKeyOptions()).thenReturn(objectKeyOptions);
when(s3SinkConfig.getBucketName()).thenReturn(bucketName);
when(s3SinkConfig.getObjectKeyOptions()).thenReturn(objectKeyOptions);
when(thresholdOptions.getEventCount()).thenReturn(2);
when(thresholdOptions.getMaximumSize()).thenReturn(ByteCount.parse("2mb"));
when(thresholdOptions.getEventCollectTimeOut()).thenReturn(Duration.parse("PT3M"));
when(s3SinkConfig.getThresholdOptions()).thenReturn(thresholdOptions);
when(s3SinkConfig.getBucketOptions()).thenReturn(bucketOptions);
when(s3SinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions);
when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(s3region));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@

package org.opensearch.dataprepper.plugins.sink;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.plugins.sink.accumulator.BufferTypeOptions;
import org.opensearch.dataprepper.plugins.sink.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.sink.configuration.BucketOptions;
import org.opensearch.dataprepper.plugins.sink.configuration.ObjectKeyOptions;
import org.opensearch.dataprepper.plugins.sink.configuration.ThresholdOptions;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;

/**
* s3 sink configuration class contains properties, used to read yaml configuration.
Expand All @@ -29,8 +30,11 @@ public class S3SinkConfig {

@JsonProperty("bucket")
@NotNull
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a min validation here to prevent against empty strings?

@Valid
private BucketOptions bucketOptions;
@NotEmpty
private String bucketName;

@JsonProperty("object_key")
private ObjectKeyOptions objectKeyOptions;

@JsonProperty("threshold")
@NotNull
Expand Down Expand Up @@ -65,11 +69,22 @@ public ThresholdOptions getThresholdOptions() {
}

/**
* S3 bucket configuration Options.
* @return bucket option object.
* Read s3 bucket name configuration.
* @return bucket name.
*/
public String getBucketName() {
return bucketName;
}

/**
* S3 {@link ObjectKeyOptions} configuration Options.
* @return object key options.
*/
public BucketOptions getBucketOptions() {
return bucketOptions;
public ObjectKeyOptions getObjectKeyOptions() {
if (objectKeyOptions == null) {
objectKeyOptions = new ObjectKeyOptions();
}
return objectKeyOptions;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public S3SinkService(final S3SinkConfig s3SinkConfig, final BufferFactory buffer
maxBytes = s3SinkConfig.getThresholdOptions().getMaximumSize();
maxCollectionDuration = s3SinkConfig.getThresholdOptions().getEventCollectTimeOut().getSeconds();

bucket = s3SinkConfig.getBucketOptions().getBucketName();
bucket = s3SinkConfig.getBucketName();
maxRetries = s3SinkConfig.getMaxUploadRetries();

objectsSucceededCounter = pluginMetrics.counter(OBJECTS_SUCCEEDED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private ObjectKey(){}
* @return s3 object path
*/
public static String buildingPathPrefix(final S3SinkConfig s3SinkConfig) {
String pathPrefix = s3SinkConfig.getBucketOptions().getObjectKeyOptions().getPathPrefix();
String pathPrefix = s3SinkConfig.getObjectKeyOptions().getPathPrefix();
StringBuilder s3ObjectPath = new StringBuilder();
if (pathPrefix != null && !pathPrefix.isEmpty()) {
String[] pathPrefixList = pathPrefix.split("\\/");
Expand All @@ -53,7 +53,7 @@ public static String buildingPathPrefix(final S3SinkConfig s3SinkConfig) {
* @return s3 object name with prefix
*/
public static String objectFileName(S3SinkConfig s3SinkConfig) {
String configNamePattern = s3SinkConfig.getBucketOptions().getObjectKeyOptions().getNamePattern();
String configNamePattern = s3SinkConfig.getObjectKeyOptions().getNamePattern();
int extensionIndex = configNamePattern.lastIndexOf('.');
if (extensionIndex > 0) {
return S3ObjectIndexUtility.getObjectNameWithDateTimeId(configNamePattern.substring(0, extensionIndex)) + "."
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@

package org.opensearch.dataprepper.plugins.sink;

import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.plugins.sink.accumulator.BufferTypeOptions;
import org.opensearch.dataprepper.plugins.sink.configuration.ObjectKeyOptions;
import org.opensearch.dataprepper.test.helper.ReflectivelySetField;

import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNull;

import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.plugins.sink.accumulator.BufferTypeOptions;

class S3SinkConfigTest {

private static final int MAX_CONNECTION_RETRIES = 5;
Expand All @@ -23,20 +28,30 @@ void default_buffer_type_option_test() {
}

@Test
void default_max_connection_retries_test() throws NoSuchFieldException, IllegalAccessException {
void default_max_connection_retries_test() {
assertThat(new S3SinkConfig().getMaxConnectionRetries(), equalTo(MAX_CONNECTION_RETRIES));
}

@Test
void default_max_upload_retries_test() throws NoSuchFieldException, IllegalAccessException {
void default_max_upload_retries_test() {
assertThat(new S3SinkConfig().getMaxUploadRetries(), equalTo(MAX_UPLOAD_RETRIES));
}

@Test
void get_bucket_option_test() {
assertThat(new S3SinkConfig().getBucketOptions(), equalTo(null));
void get_bucket_name_test() throws NoSuchFieldException, IllegalAccessException {
final String bucketName = UUID.randomUUID().toString();
final S3SinkConfig objectUnderTest = new S3SinkConfig();
ReflectivelySetField.setField(S3SinkConfig.class, objectUnderTest, "bucketName", bucketName);
assertThat(objectUnderTest.getBucketName(), equalTo(bucketName));
}

@Test
void get_object_key_test() {
assertThat("Object key is not an instance of ObjectKeyOptions",
new S3SinkConfig().getObjectKeyOptions(), instanceOf(ObjectKeyOptions.class));
}


@Test
void get_threshold_option_test() {
assertThat(new S3SinkConfig().getThresholdOptions(), equalTo(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.opensearch.dataprepper.plugins.sink.codec.Codec;
import org.opensearch.dataprepper.plugins.sink.codec.JsonCodec;
import org.opensearch.dataprepper.plugins.sink.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.sink.configuration.BucketOptions;
import org.opensearch.dataprepper.plugins.sink.configuration.ObjectKeyOptions;
import org.opensearch.dataprepper.plugins.sink.configuration.ThresholdOptions;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
Expand Down Expand Up @@ -89,7 +88,6 @@ void setUp() throws Exception {
random = new Random();
s3SinkConfig = mock(S3SinkConfig.class);
ThresholdOptions thresholdOptions = mock(ThresholdOptions.class);
BucketOptions bucketOptions = mock(BucketOptions.class);
ObjectKeyOptions objectKeyOptions = mock(ObjectKeyOptions.class);
AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class);
PluginSetting pluginSetting = mock(PluginSetting.class);
Expand All @@ -112,10 +110,9 @@ void setUp() throws Exception {
when(s3SinkConfig.getThresholdOptions().getMaximumSize()).thenReturn(ByteCount.parse(MAXIMUM_SIZE));
when(s3SinkConfig.getThresholdOptions().getEventCollectTimeOut()).thenReturn(Duration.ofSeconds(5));
when(s3SinkConfig.getBufferType()).thenReturn(BufferTypeOptions.INMEMORY);
when(s3SinkConfig.getBucketOptions()).thenReturn(bucketOptions);
when(s3SinkConfig.getBucketOptions().getObjectKeyOptions()).thenReturn(objectKeyOptions);
when(s3SinkConfig.getBucketOptions().getBucketName()).thenReturn(BUCKET_NAME);
when(s3SinkConfig.getBucketOptions().getObjectKeyOptions().getPathPrefix()).thenReturn(PATH_PREFIX);
when(s3SinkConfig.getObjectKeyOptions()).thenReturn(objectKeyOptions);
when(s3SinkConfig.getBucketName()).thenReturn(BUCKET_NAME);
when(s3SinkConfig.getObjectKeyOptions().getPathPrefix()).thenReturn(PATH_PREFIX);
when(s3SinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions);
when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(S3_REGION));
when(s3SinkConfig.getCodec()).thenReturn(pluginModel);
Expand Down Expand Up @@ -153,7 +150,7 @@ void test_s3Client_notNull() {
@Test
void test_generateKey_with_general_prefix() {
String pathPrefix = "events/";
when(s3SinkConfig.getBucketOptions().getObjectKeyOptions().getPathPrefix()).thenReturn(pathPrefix);
when(s3SinkConfig.getObjectKeyOptions().getPathPrefix()).thenReturn(pathPrefix);
S3SinkService s3SinkService = createObjectUnderTest();
String key = s3SinkService.generateKey();
assertNotNull(key);
Expand All @@ -171,7 +168,7 @@ void test_generateKey_with_date_prefix() {
.withZoneSameInstant(ZoneId.of(TimeZone.getTimeZone("UTC").getID()));
String dateString = fomatter.format(zdt);

when(s3SinkConfig.getBucketOptions().getObjectKeyOptions()
when(s3SinkConfig.getObjectKeyOptions()
.getPathPrefix()).thenReturn(pathPrefix + datePattern);
S3SinkService s3SinkService = createObjectUnderTest();
String key = s3SinkService.generateKey();
Expand Down Expand Up @@ -263,7 +260,7 @@ void test_output_with_uploadedToS3_success_records_byte_count() throws IOExcepti

@Test
void test_output_with_uploadedToS3_failed() throws IOException {
when(s3SinkConfig.getBucketOptions().getBucketName()).thenReturn(null);
when(s3SinkConfig.getBucketName()).thenReturn(UUID.randomUUID().toString());
when(s3SinkConfig.getMaxUploadRetries()).thenReturn(3);
when(codec.parse(any())).thenReturn("{\"message\":\"31824252-adba-4c47-a2ac-05d16c5b8140\"}");
S3SinkService s3SinkService = createObjectUnderTest();
Expand Down Expand Up @@ -313,7 +310,7 @@ void test_retryFlushToS3_positive() throws InterruptedException, IOException {

@Test
void test_retryFlushToS3_negative() throws InterruptedException, IOException {
when(s3SinkConfig.getBucketOptions().getBucketName()).thenReturn("");
when(s3SinkConfig.getBucketName()).thenReturn("");
S3SinkService s3SinkService = createObjectUnderTest();
assertNotNull(s3SinkService);
Buffer buffer = bufferFactory.getBuffer();
Expand Down