Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add AWS SQS Sink functionality #1

Merged
merged 9 commits into from
Oct 30, 2023

Conversation

chandankumar4
Copy link
Contributor

Add the functionality to sink pipeline vertex data to aws queue. Please check README for example pipeline

Fixes: numaproj/numaflow#1093

Signed-off-by: chandankumar4 <chandan.kr404@gmail.com>
Signed-off-by: chandankumar4 <chandan.kr404@gmail.com>
Signed-off-by: chandankumar4 <chandan.kr404@gmail.com>
- name: "Step 6: Load image in cluster"
run: |
docker load --input /tmp/aws-sqs-queue.tar
kind load docker-image "quay.io/numaio/numaproj-contrib/aws-sqs-sink-go:sqs-e2e"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this working? kind?

main.go Outdated
if !ok {
logger.Fatalln("AWS_REGION not found")
}
accessKey, ok := os.LookupEnv("AWS_ACCESS_KEY")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The credentials should not be mandatory, ppl might use things like IAM Role for service account to access.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link

@yhl25 yhl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit tests.

main.go Outdated
Comment on lines 33 to 37
queueName string
region string
awsAccessKey string
awsAccessSecret string
awsBaseEndpoint string
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider using a config object?

main.go Outdated
if !ok {
logger.Fatalln("AWS_REGION not found")
}
accessKey, ok := os.LookupEnv("AWS_ACCESS_KEY")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

main.go Outdated
Comment on lines 99 to 104
if _, err = s.sqsClient.SendMessage(ctx, &sqs.SendMessageInput{
MessageBody: &msgBody,
QueueUrl: queueURL.QueueUrl,
}); err != nil {
s.logger.Errorf("failed to push message %v", err)
continue
Copy link

@yhl25 yhl25 Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invoking send for each message might be expensive.. Consider writing in batches?

Signed-off-by: chandankumar4 <chandan.kr404@gmail.com>
Signed-off-by: chandankumar4 <chandan.kr404@gmail.com>
Signed-off-by: chandankumar4 <chandan.kr404@gmail.com>
.github/workflows/ci.yaml Show resolved Hide resolved
README.md Show resolved Hide resolved
main.go Outdated Show resolved Hide resolved
main.go Outdated Show resolved Hide resolved
limitations under the License.
*/

package fixtures
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see an opportunity of abstraction. Should we create a util repository to hold the E2E testing fixtures such that all UDFs/UDSource/Sink etc. can share? cc: @vigith @whynowy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should extract one out for contrib org.

test/sqs-e2e/sqs_sink_test.go Outdated Show resolved Hide resolved
test/sqs-e2e/testdata/aws-sqs-queue.yaml Outdated Show resolved Hide resolved
Signed-off-by: chandankumar4 <chandan.kr404@gmail.com>
main.go Outdated Show resolved Hide resolved
Signed-off-by: chandankumar4 <chandan.kr404@gmail.com>
main.go Outdated Show resolved Hide resolved
Signed-off-by: chandankumar4 <chandan.kr404@gmail.com>
@KeranYang
Copy link
Contributor

Sry one more comment, are we suppose to see CI workflow succeeds? I don't see checks running for this PR.

@chandankumar4
Copy link
Contributor Author

Sry one more comment, are we suppose to see CI workflow succeeds? I don't see checks running for this PR.

yaa, I think it will trigger after first PR merge to main branch with workflow yaml but not sure (that's what I have observed in previous repo) maybe someone need to enable it manually?

@chandankumar4
Copy link
Contributor Author

Also unit test case is pending, I'm working on that. probably we merge it also and I'll raise another PR for it?

@KeranYang
Copy link
Contributor

Also unit test case is pending, I'm working on that. probably we merge it also and I'll raise another PR for it?

Sounds good. I am approving this PR. Feel free to merge it and use another PR to trigger CI.

@chandankumar4
Copy link
Contributor Author

I don't have access to merge ;)

@KeranYang KeranYang merged commit 6b7f268 into numaproj-contrib:main Oct 30, 2023
@KeranYang
Copy link
Contributor

I don't have access to merge ;)

Done.

@chandankumar4 chandankumar4 deleted the aws-sqs-sink branch October 30, 2023 18:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AWS SQS Sink
5 participants