Skip to content

Commit

Permalink
Initial commit for the S3 Sink #1048 (#2585)
Browse files Browse the repository at this point in the history
Initial commit for the S3 Sink #1048

Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>

---------

Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
  • Loading branch information
deepaksahu562 committed Apr 28, 2023
1 parent 4794615 commit 8a66e21
Show file tree
Hide file tree
Showing 13 changed files with 660 additions and 0 deletions.
66 changes: 66 additions & 0 deletions data-prepper-plugins/s3-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# S3 Sink

This is the Data Prepper S3 sink plugin that sends records to an S3 bucket via S3Client.

## Usages

The s3 sink should be configured as part of Data Prepper pipeline yaml file.

## Configuration Options

```
pipeline:
...
sink:
- s3:
aws:
region: us-east-1
sts_role_arn: arn:aws:iam::123456789012:role/Data-Prepper
sts_header_overrides:
max_retries: 5
bucket:
name: bucket_name
object_key:
path_prefix: my-elb/%{yyyy}/%{MM}/%{dd}/
threshold:
event_count: 2000
maximum_size: 50mb
event_collect_timeout: 15s
codec:
ndjson:
buffer_type: in_memory
```

## AWS Configuration

- `region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).

- `sts_role_arn` (Optional) : The AWS STS role to assume for requests to S3. which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).

- `max_retries` (Optional) : An integer value indicates the maximum number of times that single request should be retired in-order to ingest data to amazon s3. Defaults to `5`.

- `bucket` (Required) : Object storage built to store and retrieve any amount of data from anywhere, User must provide bucket name.

- `object_key` (Optional) : It contains `path_prefix` and `file_pattern`. Defaults to s3 object `events-%{yyyy-MM-dd'T'hh-mm-ss}` inside bucket root directory.

- `path_prefix` (Optional) : path_prefix nothing but directory structure inside bucket in-order to store objects. Defaults to `none`.

## Threshold Configuration

- `event_count` (Required) : An integer value indicates the maximum number of events required to ingest into s3-bucket as part of threshold.

- `maximum_size` (Optional) : A String representing the count or size of bytes required to ingest into s3-bucket as part of threshold. Defaults to `50mb`.

- `event_collect_timeout` (Required) : A String representing how long events should be collected before ingest into s3-bucket as part of threshold. All Duration values are a string that represents a duration. They support ISO_8601 notation string ("PT20.345S", "PT15M", etc.) as well as simple notation Strings for seconds ("60s") and milliseconds ("1500ms").

## Buffer Type Configuration

- `buffer_type` (Optional) : Records stored temporary before flushing into s3 bucket. Possible values are `local_file` and `in_memory`. Defaults to `in_memory`.


## Developer Guide

This plugin is compatible with Java 8. See

- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
57 changes: 57 additions & 0 deletions data-prepper-plugins/s3-sink/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

dependencies {
implementation project(':data-prepper-api')
implementation project(path: ':data-prepper-plugins:common')
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'org.apache.commons:commons-compress:1.21'
implementation 'joda-time:joda-time:2.11.1'
implementation 'org.hibernate.validator:hibernate-validator:7.0.5.Final'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:sts'
implementation 'org.jetbrains.kotlin:kotlin-stdlib:1.7.10'
implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.7.10'
implementation 'org.apache.commons:commons-lang3:3.12.0'
testImplementation project(':data-prepper-test-common')
}

test {
useJUnitPlatform()
}

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
resources.srcDir file('src/integrationTest/resources')
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime
}

task integrationTest(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.s3sink.bucket', System.getProperty('tests.s3sink.bucket')
systemProperty 'tests.s3ink.region', System.getProperty('tests.s3sink.region')

filter {
includeTestsMatching '*IT'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink;

import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.plugins.sink.accumulator.BufferTypeOptions;
import org.opensearch.dataprepper.plugins.sink.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.sink.configuration.BucketOptions;
import org.opensearch.dataprepper.plugins.sink.configuration.ThresholdOptions;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;

/**
* s3 sink configuration class contains properties, used to read yaml configuration.
*/
public class S3SinkConfig {

private static final int DEFAULT_CONNECTION_RETRIES = 5;
private static final int DEFAULT_UPLOAD_RETRIES = 5;

@JsonProperty("aws")
@NotNull
@Valid
private AwsAuthenticationOptions awsAuthenticationOptions;

@JsonProperty("bucket")
@NotNull
@Valid
private BucketOptions bucketOptions;

@JsonProperty("threshold")
@NotNull
private ThresholdOptions thresholdOptions;

@JsonProperty("codec")
@NotNull
private PluginModel codec;

@JsonProperty("buffer_type")
private BufferTypeOptions bufferType = BufferTypeOptions.INMEMORY;

private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES;

@JsonProperty("max_retries")
private int maxUploadRetries = DEFAULT_UPLOAD_RETRIES;

/**
* Aws Authentication configuration Options
*/
public AwsAuthenticationOptions getAwsAuthenticationOptions() {
return awsAuthenticationOptions;
}

/**
* Threshold configuration Options
*/
public ThresholdOptions getThresholdOptions() {
return thresholdOptions;
}

/**
* S3 bucket configuration Options
*/
public BucketOptions getBucketOptions() {
return bucketOptions;
}

/**
* Sink codec configuration Options
*/
public PluginModel getCodec() {
return codec;
}

/**
* Buffer type configuration Options
*/
public BufferTypeOptions getBufferType() {
return bufferType;
}

/**
* S3 client connection retries configuration Options
*/
public int getMaxConnectionRetries() {
return maxConnectionRetries;
}

/**
* S3 object upload retries configuration Options
*/
public int getMaxUploadRetries() {
return maxUploadRetries;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.accumulator;

/**
* Defines all the buffer types enumerations.
*/
public enum BufferTypeOptions {

INMEMORY,
LOCALFILE

//TODO add buffer type options and remaining functionality
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Size;
import software.amazon.awssdk.arns.Arn;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;

public class AwsAuthenticationOptions {
private static final String AWS_IAM_ROLE = "role";
private static final String AWS_IAM = "iam";

@JsonProperty("region")
@Size(min = 1, message = "Region cannot be empty string")
private String awsRegion;

@JsonProperty("sts_role_arn")
@Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters")
private String awsStsRoleArn;

@JsonProperty("sts_header_overrides")
@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
private Map<String, String> awsStsHeaderOverrides;

private void validateStsRoleArn() {
final Arn arn = getArn();
if (!AWS_IAM.equals(arn.service())) {
throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
final Optional<String> resourceType = arn.resource().resourceType();
if (resourceType.isEmpty() || !resourceType.get().equals(AWS_IAM_ROLE)) {
throw new IllegalArgumentException("sts_role_arn must be an IAM Role");
}
}

private Arn getArn() {
try {
return Arn.fromString(awsStsRoleArn);
} catch (final Exception e) {
throw new IllegalArgumentException(String.format("Invalid ARN format for awsStsRoleArn. Check the format of %s", awsStsRoleArn));
}
}

public Region getAwsRegion() {
return awsRegion != null ? Region.of(awsRegion) : null;
}

public AwsCredentialsProvider authenticateAwsConfiguration() {

final AwsCredentialsProvider awsCredentialsProvider;
if (awsStsRoleArn != null && !awsStsRoleArn.isEmpty()) {

validateStsRoleArn();

final StsClient stsClient = StsClient.builder()
.region(getAwsRegion())
.build();

AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder()
.roleSessionName("S3-Sink-" + UUID.randomUUID())
.roleArn(awsStsRoleArn);
if(awsStsHeaderOverrides != null && !awsStsHeaderOverrides.isEmpty()) {
assumeRoleRequestBuilder = assumeRoleRequestBuilder
.overrideConfiguration(configuration -> awsStsHeaderOverrides.forEach(configuration::putHeader));
}

awsCredentialsProvider = StsAssumeRoleCredentialsProvider.builder()
.stsClient(stsClient)
.refreshRequest(assumeRoleRequestBuilder.build())
.build();

} else {
// use default credential provider
awsCredentialsProvider = DefaultCredentialsProvider.create();
}

return awsCredentialsProvider;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.sink.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;

/**
* An implementation class of bucket name and {@link ObjectKeyOptions} configuration Options
*/
public class BucketOptions {

@JsonProperty("name")
@NotNull
private String bucketName;

@JsonProperty("object_key")
private ObjectKeyOptions objectKeyOptions;

/**
* Read s3 bucket name configuration.
*/
public String getBucketName() {
return bucketName;
}

/**
* S3 {@link ObjectKeyOptions} configuration Options.
*/
public ObjectKeyOptions getObjectKeyOptions() {
if (objectKeyOptions == null) {
objectKeyOptions = new ObjectKeyOptions();
}
return objectKeyOptions;
}
}

0 comments on commit 8a66e21

Please sign in to comment.