Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added SQS interactions for S3 source #1431

Merged
merged 11 commits into from
Jun 7, 2022

Conversation

asifsmohammed
Copy link
Collaborator

@asifsmohammed asifsmohammed commented May 26, 2022

Description

  • Added sqs interaction to getMessage from SQS
  • And convert each Message to S3EventNotificationRecord
  • Added exponential backoff for reading messages from SQS

Used AWS SDK v1 for Deserializing SQS message into S3EventNotificationRecord since it's not yet ported to v2. For more info see: aws/aws-sdk-java-v2#1197

Issues Resolved

Resolves #1425

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@asifsmohammed asifsmohammed requested a review from a team as a code owner May 26, 2022 19:22
@asifsmohammed asifsmohammed marked this pull request as draft May 26, 2022 19:22
@@ -13,10 +13,13 @@ repositories {

dependencies {
implementation project(':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation project(':data-prepper-plugins:blocking-buffer')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you add blocking-buffer here? Data Prepper API provides the Buffer interface.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this for testing S3Source using buffer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For testing could this import be added as testImplementation project(':data-prepper-plugins:blocking-buffer')?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sbayer55 's suggestion is the right approach for a testing dependency.

messages.addAll(sqsClient.receiveMessage(receiveMessageRequest).messages());
} catch (SqsException e) {
LOG.error(e.awsErrorDetails().errorMessage());
System.exit(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will exit all of Data Prepper. That is a far too severe consequence of an error communicating with SQS. I think that we want to log the exception, then wait some time, then try again. It could be appropriate to perform an exponential backoff.

} while (true);
}

private S3EventNotification.S3EventNotificationRecord getS3EventMessages(Message message) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend renaming this so that it has a different verb than "get." Perhaps "convert" or "transform?" Get makes it seem that you are using a getter.

public class SqsOptions {
private final int DEFAULT_MAXIMUM_MESSAGES = 10;
private final int DEFAULT_VISIBILITY_TIMEOUT_SECONDS = 30;
private final int DEFAULT_WAIT_TIME_SECONDS = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this default should be 20 seconds (the current maximum).

import java.time.Duration;

public class SqsOptions {
private final int DEFAULT_MAXIMUM_MESSAGES = 10;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be private static final (They are missing the static).


public class SqsOptions {
private final int DEFAULT_MAXIMUM_MESSAGES = 10;
private final int DEFAULT_VISIBILITY_TIMEOUT_SECONDS = 30;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your choice here: You could make these defaults actual Duration objects. Since they will be static, you won't need to re-create the Duration in each field. I also think it is cleaner since the code only deals with Duration.

e.g.

private static final Duration DEFAULT_VISIBILITY_TIMEOUT = Duration.ofSeconds(20);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this suggestion.

private final S3Client s3Client;

private AwsCredentialsProvider createCredentialsProvider() {
public AwsCredentialsProvider createCredentialsProvider() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this public and not private?


for(int i = 0; i < s3SourceConfig.getSqsOptions().getThreadCount(); i++) {
Thread sqsWorkerThread = new Thread(new SqsWorker(sqsClient, s3Client, s3SourceConfig));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will want to have a List<Thread> as a field. You can keep all the threads there. You will need to stop them when the stop() method for S3Source is called.


messages.addAll(sqsClient.receiveMessage(receiveMessageRequest).messages());
} catch (SqsException e) {
LOG.error(e.awsErrorDetails().errorMessage());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general you should avoid logging just an error message from some external source. It is better to have:

LOG.error("Error reading from SQS: {}", e.awsErrorDetails().errorMessage());

Relatedly - is awsErrorDetails() guaranteed to be non-null?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if awsErrorDetails() will be non-null. I'll check this one

try {
Thread.sleep(s3SourceConfig.getSqsOptions().getPollDelay().toMillis());
} catch (InterruptedException e) {
e.printStackTrace();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should log this, not call e.printStackTrace().

@asifsmohammed asifsmohammed marked this pull request as ready for review May 27, 2022 22:27
@asifsmohammed asifsmohammed force-pushed the sqs-interactions branch 2 times, most recently from 3816ace to f65d28f Compare May 31, 2022 06:50
@sbayer55
Copy link
Member

As a general comment S3Source and SqsWorker have low test coverage. Are you planning to add functionality & test coverage to these classes in a future PR?

@@ -13,10 +13,13 @@ repositories {

dependencies {
implementation project(':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation project(':data-prepper-plugins:blocking-buffer')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For testing could this import be added as testImplementation project(':data-prepper-plugins:blocking-buffer')?


import java.util.Random;

public class BackoffUtils {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In place of utils could BackoffUtils class have a more descriptive name? Consider something like RetryState or BackoffManager. Without looking at the class file I expected BackoffUtils to contain some helper functions for parsing or transforming data.


if (shouldRetry()) {
waitUntilNextTry();
timeToWait += random.nextInt(1000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It there a benefit to a random wait opposed to a fixed time or exponential growth time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's to break the synchronization across the clients thereby avoiding collisions


public S3Client createS3Client(final StsClient stsClient) {

return software.amazon.awssdk.services.s3.S3Client.builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the fully qualified name be shortened from software.amazon.awssdk.services.s3.S3Client to S3Client? It looks like you already have the import.


import java.util.Random;

public class BackoffUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we opting to build our own exponential back off strategy? Is there a reason we cannot use the built-in AWS SDK retry functionality?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do need a different mechanism in place for the entire thread. As the SQS processing runs in its own thread, it must not end until the the source is shutdown.

I think we should use both:

  • AWS SDK retry to help with transient errors
  • Catch exceptions in the thread, suppress, and wait. This goes beyond AWS SDK errors. Say, some message in the SQS queue is not actually JSON. We don't want the thread to fail for a single bad message.

That being said, perhaps this will not need any sophisticated back-off. Perhaps a straightforward Thread.sleep() is enough.

}

@Override
public void run() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is getting very large in this partial state. I would encourage you to break it apart to improve maintainability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will also make it easier to unit test and can remove variable reassignment/mutation in the code below.

@@ -13,10 +13,13 @@ repositories {

dependencies {
implementation project(':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation project(':data-prepper-plugins:blocking-buffer')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sbayer55 's suggestion is the right approach for a testing dependency.


import java.util.Random;

public class BackoffUtils {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do need a different mechanism in place for the entire thread. As the SQS processing runs in its own thread, it must not end until the the source is shutdown.

I think we should use both:

  • AWS SDK retry to help with transient errors
  • Catch exceptions in the thread, suppress, and wait. This goes beyond AWS SDK errors. Say, some message in the SQS queue is not actually JSON. We don't want the thread to fail for a single bad message.

That being said, perhaps this will not need any sophisticated back-off. Perhaps a straightforward Thread.sleep() is enough.

.region(Region.of(s3SourceConfig.getAWSAuthentication().getAwsRegion()))
.credentialsProvider(awsCredentialsProvider)
.build();
Thread.currentThread().interrupt();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data Prepper core is calling the stop() method. Thus, this current code is going to interrupt the Data Prepper core thread.

Also, rather than use interrupt(), I'd like to have a running/shutdown boolean flag in the worker thread. You then can use while(running) { ... } instead of while(true) { ... }.

@codecov-commenter
Copy link

codecov-commenter commented Jun 3, 2022

Codecov Report

Merging #1431 (3498a85) into main (a72618f) will not change coverage.
The diff coverage is n/a.

@@            Coverage Diff            @@
##               main    #1431   +/-   ##
=========================================
  Coverage     94.25%   94.25%           
  Complexity     1179     1179           
=========================================
  Files           165      165           
  Lines          3377     3377           
  Branches        276      276           
=========================================
  Hits           3183     3183           
  Misses          138      138           
  Partials         56       56           

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a72618f...3498a85. Read the comment docs.

assertThrows(IllegalStateException.class, () -> s3Source.start(testBuffer));
}

private BlockingBuffer<Record<Event>> getBuffer() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you can't use a mock here?

There may be value in creating some integration tests in other PRs which use the blocking buffer. But for a unit test, we should use mocks if at all possible.

.build();
}

private S3EventNotification.S3EventNotificationRecord convertS3EventMessages(final Message message) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may want to make this part a separate class which is injected. One reason is that the SQS message may actually be in a different format in some cases. If the SQS queue is connected to SNS, then this becomes a string rather than JSON. So we might need to make this configurable by pipeline authors. I'm fine to keep it here for now if that helps merge it in though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move this to a separate class once we decide on how pipeline authors can configure this.

// build s3ObjectPointer from S3EventNotificationRecord if event name starts with ObjectCreated
List<S3ObjectReference> addedObjects = new ArrayList<>();
for (Map.Entry<Message, S3EventNotification.S3EventNotificationRecord> entry: s3EventNotificationRecords.entrySet()) {
if (entry.getValue().getEventName().startsWith("ObjectCreated")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to create an S3EventFilter structure. There are couple motivations I have in this:

  1. We are also going to add an optional user-configuration to support filtering by bucketName or accountId.
  2. If we add SNS as an option in future iterations, this filter should be the same code.

Here is a possible design:

public interface S3EventFilter {
  Optional<S3EventNotification.S3EventNotificationRecord> filter(S3EventNotification.S3EventNotificationRecord notification)
}

class ObjectCreatedFilter implements S3EventFilter {
  Optional<S3EventNotification.S3EventNotificationRecord> filter(S3EventNotification.S3EventNotificationRecord entry) {
    if (entry.getValue().getEventName().startsWith("ObjectCreated"))
      return Optional.of(entry);
    else
      return Optional.empty();
  }
}

We can build more complicated filters for the bucketName and accountId in another PR.

Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>

import java.util.Objects;

public class S3ObjectReference {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might get a merge conflict on this file since I also added this into main. Please use the one I provided as it has toString() and is unit tested.

dlvenable
dlvenable previously approved these changes Jun 3, 2022
Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
dlvenable
dlvenable previously approved these changes Jun 6, 2022
}

@Override
public void start(Buffer<Record<Event>> buffer) {
if (buffer == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buffer is never used the by the S3Service or the SqsService. Who is going to be responsible for putting the item in the buffer (S3Service or SqsService)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The S3ObjectWorker handles this:

Also, @asifsmohammed and I are coordinating on the work here. He created this PR before the S3ObjectWorker was present in main, so some portions like this are currently unused in his branch and work.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update the logic with buffer here

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class SqsWorkerTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The run method is not tested. We are missing on out some core test coverage should add test cases.

}

@Override
public void run() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will also make it easier to unit test and can remove variable reassignment/mutation in the code below.

private static final Logger LOG = LoggerFactory.getLogger(SqsWorker.class);
private static final ObjectMapper objectMapper = new ObjectMapper();

private S3SourceConfig s3SourceConfig;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These private variables should be final to prevent re-assignment

Comment on lines +27 to +29
S3ObjectReference addS3Object(final S3ObjectReference s3ObjectReference) {
// TODO: should return message id and receipt handle if successfully converted to event
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few concerns over this method and subsequent workflow. I have some ideas for improvements but I want to understand a few things first.

What are we adding an S3Object to? I find this method name confusing and was really confused by the usage in the run() method.

Why are the parameters and return object the same object (S3ObjectReference)? Are we extending S3ObjectReference and expecting this class to mutate the variable with a messageId and Receipt? (I want to see if we can avoid mutation and nullable variables)

How is the receipt used by the calling function? Do we need it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the parameters and return object the same object (S3ObjectReference)? Are we extending S3ObjectReference and expecting this class to mutate the variable with a messageId and Receipt? (I want to see if we can avoid mutation and nullable variables)

@asifsmohammed and I discussed a couple approaches here:

  1. We can make this templated and add something like T getNotificationMetadata() which in this case returns SQS metadata. It could be extended for SNS metadata if that were ever added.
  2. We could extend this class with a sub-class.

In either case, the SQS metadata would include the ReceiptHandle and MessageId.

I'm fine with either approach. But, the inheritance approach would have the benefit of completely abstracting this metadata from the S3 code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial thought was to add ReceiptHandle and MessageId to S3ObjectReference which will be used to get S3 Object.
If S3 object is successfully processed we return the same S3ObjectReference to delete message from queue.

I'll create a subclass which contains ReceiptHandle and MessageId.

Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Copy link
Contributor

@cmanning09 cmanning09 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making those changes. Let's address the remaining in the next PR.

Copy link
Member

@sbayer55 sbayer55 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the consistent updates @asifsmohammed !

@asifsmohammed asifsmohammed merged commit 8d1454c into opensearch-project:main Jun 7, 2022
finnroblin pushed a commit to finnroblin/data-prepper that referenced this pull request Jul 11, 2022
* Added sqs configuration and basic sqs interactions

Signed-off-by: Asif Sohail Mohammed <nsifmoh@amazon.com>
Signed-off-by: Finn Roblin <finnrobl@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add basic SQS interactions
5 participants