Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial commit for the S3 Sink #1048 #2585

Merged
merged 8 commits into from
Apr 28, 2023

Conversation

deepaksahu562
Copy link
Contributor

@deepaksahu562 deepaksahu562 commented Apr 24, 2023

Description

  • Created s3-sink related configurations.

Issues Resolved

Check List

  • New functionality includes s3-sink plugin configurations.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.s3source.bucket', System.getProperty('tests.s3source.bucket')
systemProperty 'tests.s3source.region', System.getProperty('tests.s3source.region')
systemProperty 'tests.s3source.queue.url', System.getProperty('tests.s3source.queue.url')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not require an SQS queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.s3source.bucket', System.getProperty('tests.s3source.bucket')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update these two properties to tests.s3sink.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.

}

@Test
void getAwsRegion_returns_null_when_region_is_null() throws NoSuchFieldException, IllegalAccessException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a test that validates a non-null value as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test case for non-null value.

reflectivelySetField(awsAuthenticationOptions, "awsRegion", null);
assertThat(awsAuthenticationOptions.getAwsRegion(), nullValue());
}
private void reflectivelySetField(final AwsAuthenticationOptions awsAuthenticationOptions, final String fieldName,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is available as a utility. Please use that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.

* SPDX-License-Identifier: Apache-2.0
*/

plugins {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove these boilerplate sections: plugins, repositories.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.

import software.amazon.awssdk.services.sts.StsClientBuilder;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;

class AwsAuthenticationOptionsTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test suite should have a unit test for authenticateAwsConfiguration as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test case for authenticateAwsConfiguration.


## Configuration

- `aws_region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html). Defaults to `none`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These configurations are misleading, since the parameters are under aws, and not aws_region. Please follow the documentation format from the s3 source that breaks the aws options out (https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/s3-source#aws-configuration)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.

.build();

AssumeRoleRequest.Builder assumeRoleRequestBuilder = AssumeRoleRequest.builder()
.roleSessionName("S3-Source-" + UUID.randomUUID())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"S3-sink" as the session name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.


This is the Data Prepper S3 sink plugin that sends records to an S3 bucket via S3Client.

The S3 sink plugin supports OpenSearch 2.0.0 and greater.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QUES: Not following this statement. How is S3 Sink related to OpenSearch engine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.


## Configuration

- `aws_region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html). Defaults to `none`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the context of the PR, we are only supporting AWS Configuration in S3 sink instead of aws_ prefix. We should align the configuration parameter names with what we actually support to avoid confusion. Same applies to aws_sts_role_arn and aws_sts_header_overrides below.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also by Defaults to none, do you mean Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).? If yes please also reword that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.


- `aws_region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html). Defaults to `none`.

- `aws_sts_role_arn` (Optional) : The AWS STS role to assume for requests to SQS and S3. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html). Defaults to `none`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defaults to none should be deleted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.


- `aws_sts_role_arn` (Optional) : The AWS STS role to assume for requests to SQS and S3. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html). Defaults to `none`.

- `aws_sts_header_overrides` (Optional) : An optional map of header overrides to make when assuming the IAM role for the sink plugin. Defaults to `none`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Defaults to null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.


- `path_prefix` (Optional) : path_prefix nothing but directory structure inside bucket in-order to store objects. Defaults to `none`.

- `event_count` (Required) : An integer value indicates the maximum number of events required to ingest into s3-bucket as part of threshold.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest making a threshold configuration section for the parameters under threshold: similar as https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/opensearch#aws-configuration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.


- `maximum_size` (Optional) : A String representing the count or size of bytes required to ingest into s3-bucket as part of threshold. Defaults to `50mb`.

- `event_collect` (Required) : A String representing how long events should be collected before ingest into s3-bucket as part of threshold. All Duration values are a string that represents a duration. They support ISO_8601 notation string ("PT20.345S", "PT15M", etc.) as well as simple notation Strings for seconds ("60s") and milliseconds ("1500ms").
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: suggest rename into event_collect_timeout or collect_timeout.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree - this would be a clearer name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified the name as event_collect_timeout.

pipeline:
...
sink:
- s3:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also include buffer_type in this example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added buffer_type in the yaml configuration example.

import jakarta.validation.constraints.NotNull;

/**
* An implementation class of s3 sink configuration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Given the class name, this java doc description seems a little redundant. But I am OK to leave it there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified the description.

Copy link
Contributor

@ashoktelukuntla ashoktelukuntla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good

Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @deepaksahu562 !

@dlvenable dlvenable merged commit 8a66e21 into opensearch-project:main Apr 28, 2023
24 checks passed
@dlvenable dlvenable changed the title Github-issue#1048 Initial commit for the S3 Sink #1048 Apr 28, 2023
@dlvenable dlvenable mentioned this pull request Jun 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants