Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix queue url generator and logging #9

Merged
merged 1 commit into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ COPY dist/aws-sqs-sink /bin/aws-sqs-sink
RUN chmod +x /bin/aws-sqs-sink

####################################################################################################
# redis
# sqs-sink
####################################################################################################
FROM scratch as sqs-sink
ARG ARCH
Expand Down
75 changes: 66 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,29 +1,86 @@
# AWS SQS SINK
# AWS SQS Sink for Numaflow

AWS SQS Sink for Numaflow implemented in Golang, which will push the vertex data to AWS SQS.
The AWS SQS Sink is a custom user-defined sink for [Numaflow](https://numaflow.numaproj.io/) that enables the integration of Amazon Simple Queue Service (SQS) as a sink within your Numaflow pipelines.

### Environment Variables
## Quick Start
This quick start guide will walk you through setting up an AWS SQS sink in a Numaflow pipeline.

Specify the environment variables based on the supported envs specified here https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials
### Prerequisites
* [Install Numaflow on your Kubernetes cluster](https://numaflow.numaproj.io/quick-start/)
* [AWS CLI configured with access to AWS SQS](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-quickstart.html)

### Step-by-step Guide

#### 1. Create an AWS SQS Queue

Using AWS CLI or the AWS Management Console, [create a new SQS queue](https://docs.aws.amazon.com/cli/latest/reference/sqs/create-queue.html)

#### 2. Deploy a Numaflow Pipeline with AWS SQS Sink

- Save the following Kubernetes manifest to a file (e.g., `sqs-sink-pipeline.yaml`)
- Modifying the AWS region, queue name accordingly
- Specify the AWS credentials using any supported approach https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials

### Example Pipeline Configuration

```yaml
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-pipeline
spec:
vertices:
- name: in
source:
generator:
# How many messages to generate in the duration.
rpu: 1
duration: 5s
# Optional, size of each generated message, defaults to 10.
msgSize: 10
- name: aws-sink
scale:
min: 1
sink:
udsink:
container:
image: quay.io/numaio/numaflow-sink/aws-sqs-sink:v0.0.1
image: quay.io/numaio/numaproj-contrib/aws-sqs-sink-go:v0.0.1
env:
- name: AWS_SQS_QUEUE_NAME
value: "test"
value: "test-queue"
- name: AWS_REGION
value: "us-east-1"
- name: AWS_ACCESS_KEY_ID
value: "testing" ## This can be passed as k8s secret as well
- name: AWS_SECRET_ACCESS_KEY
value: "testing" # ## This can be passed as k8s secret as well
value: "testing" ## This can be passed as k8s secret as well
edges:
- from: in
to: aws-sink
```

Then apply it to your cluster:
```bash
kubectl apply -f sqs-sink-pipeline.yaml
```

#### 5. Verify the Log Sink

Verify the message using aws cli to poll message, refer: https://docs.aws.amazon.com/cli/latest/reference/sqs/receive-message.html

#### 6. Clean up

To delete the Numaflow pipeline:
```bash
kubectl delete -f sqs-sink-pipeline.yaml
```


To delete the SQS queue:
```bash
aws sqs delete-queue --queue-url <YourQueueUrl>
```

Congratulations!!! You have successfully set up an AWS SQS sink in a Numaflow pipeline.

## Additional Resources

For more information on Numaflow and how to use it to process data in a Kubernetes-native way, visit the [Numaflow Documentation](https://numaflow.numaproj.io/). For AWS SQS specific configuration, refer to the [AWS SQS Documentation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html).
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.19.1
github.com/aws/aws-sdk-go-v2/service/sqs v1.24.7
github.com/numaproj-contrib/numaflow-utils-go v0.0.4
github.com/numaproj/numaflow v0.11.0
github.com/numaproj/numaflow-go v0.5.2
github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.26.0
)

require (
Expand Down Expand Up @@ -75,6 +73,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/numaproj/numaflow v0.11.0 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand All @@ -96,6 +95,7 @@ require (
github.com/yudai/gojsondiff v1.0.0 // indirect
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect
go.uber.org/multierr v1.10.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.8.0 // indirect
Expand Down
51 changes: 26 additions & 25 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,34 @@ package main

import (
"context"
"fmt"
"log"
"os"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
sqsTypes "github.com/aws/aws-sdk-go-v2/service/sqs/types"
sinksdk "github.com/numaproj/numaflow-go/pkg/sinker"
"github.com/numaproj/numaflow/pkg/shared/logging"
"go.uber.org/zap"
)

const (
sqsQueueName = "AWS_SQS_QUEUE_NAME"
awsEndpointURL = "AWS_ENDPOINT_URL"
)

type awsSQSSink struct {
logger *zap.SugaredLogger
type sqsSinkConfig struct {
sqsClient *sqs.Client
queueURL *string
}

// awsSQSClient will generate the aws sqs client with default aws config.
func awsSQSClient(ctx context.Context) *awsSQSSink {
// initialize the logger
logger := logging.NewLogger().Named("aws-sqs-sink")

// newSQSSinkConfig generates the sqs client and queue url using the default config supported by aws.
func newSQSSinkConfig(ctx context.Context) (*sqsSinkConfig, error) {
// Load default configs for aws based on env variable provided based on
// https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
logger.Fatalln("failed loading aws config, err: ", err)
return nil, fmt.Errorf("failed loading aws config, err: %v", err)
}

// generate the sqs client based on default values or if AWS_ENDPOINT_URL is passed as env.
Expand All @@ -62,22 +59,22 @@ func awsSQSClient(ctx context.Context) *awsSQSSink {
client = sqs.NewFromConfig(cfg)
}

return &awsSQSSink{
logger: logger,
sqsClient: client,
// generate the queue url to publish data to queue via queue name.
queueURL, err := client.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{QueueName: aws.String(os.Getenv(sqsQueueName))})
if err != nil {
return nil, fmt.Errorf("failed to generate SQS Queue url, err: %v", err)
}

return &sqsSinkConfig{
sqsClient: client,
queueURL: queueURL.QueueUrl,
}, nil
}

// Sink will publish the vertex data to aws sqs sink
func (s *awsSQSSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.Datum) sinksdk.Responses {
func (s *sqsSinkConfig) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.Datum) sinksdk.Responses {
responses := sinksdk.ResponsesBuilder()

// generate the queue url to publish data to queue via queue name.
queueURL, err := s.sqsClient.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{QueueName: aws.String(os.Getenv(sqsQueueName))})
if err != nil {
s.logger.Fatalln("failed to generate SQS Queue url, err: ", err)
}

// generate message request entries for processing message in a batch
var messageRequests []sqsTypes.SendMessageBatchRequestEntry
for datum := range datumStreamCh {
Expand All @@ -90,10 +87,10 @@ func (s *awsSQSSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.Datu
// send batch message to aws queue
response, err := s.sqsClient.SendMessageBatch(ctx, &sqs.SendMessageBatchInput{
Entries: messageRequests,
QueueUrl: queueURL.QueueUrl,
QueueUrl: s.queueURL,
})
if err != nil {
s.logger.Errorf("failed to push message %v", err)
log.Printf("failed to push batch message %v", err)
}

// append the failure response to responses object
Expand All @@ -110,13 +107,17 @@ func (s *awsSQSSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.Datu
}

func main() {
ctx := context.Background()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// generate aws sqs queue client based on provided config.
configs := awsSQSClient(ctx)
configs, err := newSQSSinkConfig(ctx)
if err != nil {
log.Fatal(err)
}

// start a new sink server which will push data to aws sqs queue.
if err := sinksdk.NewServer(configs).Start(ctx); err != nil {
configs.logger.Fatalln("failed to start aws sqs sink server, err: %v", err)
log.Panicf("failed to start aws sqs sink server, err: %v", err)
}
}
6 changes: 4 additions & 2 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ func Test_awsSQSSink(t *testing.T) {
ctx := context.TODO()

t.Setenv("AWS_REGION", "us-east-1")
sqsClient := awsSQSClient(ctx)
t.Setenv("AWS_SQS_QUEUE_NAME", "unit_test")
sqsClient, err := newSQSSinkConfig(ctx)
assert.NoError(t, err)

assert.NotEmpty(t, sqsClient.logger)
assert.NotEmpty(t, sqsClient.sqsClient)
assert.NotEmpty(t, sqsClient.queueURL)
})
}