Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creation of s3-sink plugin #2324

Closed

Conversation

deepaksahu562
Copy link
Contributor

Description

Created "s3-sink" plugin.
Configurations for the bucket name, key path and key pattern.
The key pattern support timestamps such as logs-${YYYY.mm}-${uniqueId}.
Collection of objects from Buffer and store it in RAM/Local file.

Issues Resolved

Github issue: #1048

Check List

  • New functionality s3-sink plugin.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

DE20436406 and others added 5 commits February 28, 2023 19:56
Description

Created "s3-sink" plugin.
Github issue : opensearch-project#1048

Added Functionality

Configurations for the bucket name, key path and key pattern.
The key pattern support timestamps such as logs-${YYYY.mm}-${uniqueId}.
Collection of objects from Buffer and store it in RAM/Local file.

Check List
New functionality s3-sink plugin.
New functionality has been documented.
 New functionality has javadoc added.
Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and
signing off your commits, please check
https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md
Configurations for the bucket name, key path and key pattern.
The key pattern support timestamps such as logs-${YYYY.mm}-${uniqueId}.
Collection of objects from Buffer and store it in RAM/Local file.

Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
Signed-off-by: deepaksahu562 <deepak.sahu562@gmail.com>
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @deepaksahu562 , Thank you for the contribution! I left some feedback on the user configuration. Since this is a draft I didn't look too much at the implementation yet.

import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;

/**
An implementation class AWS Authentication configuration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Javadocs should have an * on each line. They should all be in the same column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, David for your review. Replaced with Javadocs comment.

*/
public class ThresholdOptions {
static final int DEFAULT_EVENT_COUNT = 200;
private static final long DEFAULT_BYTE_CAPACITY = 2500;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value seems rather low.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, David for your review. Modified with higher values.


@JsonProperty("byte_capacity")
@NotNull
private long byteCapacity = DEFAULT_BYTE_CAPACITY;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data Prepper has a built-in data type for byte counts which could help here.

https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/types/ByteCount.java

With this, the user could configure the byte_capacity with any of the following:

byte_capacity: 512
byte_capacity: 512kb
byte_capacity: 50mb

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, David for your review. Incorporated biild-in ByteCount.

private long byteCapacity = DEFAULT_BYTE_CAPACITY;

@JsonProperty("event_collection_duration")
private long eventCollectionDuration = DEFAULT_TIMEOUT;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a Duration here. Data Prepper has a few strings that plugin authors could provide if you specify Duration as the class.

event_collection_duration: 20s

or

event_collection_duration: 5050ms

or

event_collection_duration: PT2M

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, David for your review. Incorporated Duration class.

/*
Read the event count configuration
*/
public int getEeventCount() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: getEeventCount

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, David for your review.
Addressed. Please review.

public void run() {
try {
while (!S3Sink.isStopRequested()) {
if (s3SinkConfig.getTemporaryStorage().equalsIgnoreCase(IN_MEMORY)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The configuration values will not change while running. So you do not need to run this check on each iteration. You can move this code to part of the initialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, David for your review. Moved this code to the constructor.

accumulator.doAccumulate();
}
} catch (Exception e) {
e.printStackTrace();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not call e.printStackTrace().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, David for your review. Replaced with "LOG.error("Exception while running S3SinkWorkerRunner : ", e);"

}
} catch (Exception e) {
e.printStackTrace();
LOG.error("Exception in S3Sink : \n Error message {} \n Exception cause {}", e.getMessage(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should have newlines in the logs. I'm not sure if it is very common, except in the stack trace. I'm not against it, but more wondering if it is a good pattern here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, David for your review. Removed newlines from logs.

*/
public class S3SinkConfig {

static final String DEFAULT_BUCKET_NAME = "dataprepper";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The S3 bucket should not have a default. Only one AWS user in the entire world can have a bucket named dataprepper.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, David for your review. Addressed.

public class S3SinkConfig {

static final String DEFAULT_BUCKET_NAME = "dataprepper";
static final String DEFAULT_PATH_PREFIX = "logdata";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure there is much value in giving a default value for the key prefix. If not specified, it can go into the root of the bucket.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, David for your review. Incorporated the changes.

deepaksahu562 added a commit to deepaksahu562/data-prepper that referenced this pull request Mar 3, 2023
Signed-off-by: Deepak Sahu <deepak.sahu562@gmail.com>
@deepaksahu562
Copy link
Contributor Author

@dlvenable , Thanks for your review and comments.
I have incorporated all the review comments and merged them into the same PR.
Please review and provide suggestions if any.

private String byteCapacity = DEFAULT_BYTE_CAPACITY;

@JsonProperty("event_collection_duration")
private String eventCollectionDuration = DEFAULT_TIMEOUT;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change this type to a Duration. Once you do that, Data Prepper core will parse this using a standard approach. You will then be able to remove the code you have here for parsing it.

*/
public Duration getEventCollectionDuration() {

Duration duration;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you make the change I suggested above, you can change this whole method to:

return eventCollectionDuration;

@deepaksahu562
Copy link
Contributor Author

Draft version, Hence closing it.

@deepaksahu562 deepaksahu562 deleted the development branch April 10, 2023 09:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants