Skip to content

projectdiscovery/asyncsqs

 
 

Repository files navigation

asyncsqs

Go.Dev reference Build, Unit Tests, Linters Status codecov Go Report Card MIT license

asyncsqs wraps around SQS client from aws-sdk-go-v2 to provide an async buffered client which batches send message and delete message requests to optimise AWS costs.

Messages can be scheduled to be sent and deleted. Requests will be dispatched when

  • either batch becomes full
  • or waiting period exhausts (if configured)
  • or the batch total body size becomes grater than or equal to 256 kb. (addition)

...whichever occurs earlier.

Getting started

Add dependency

asyncsqs requires a Go version with modules support. If you're starting a new project, make sure to initialise a Go module:

$ mkdir ~/hellosqs
$ cd ~/hellosqs
$ go mod init github.com/my/hellosqs

And then add asyncsqs as a dependency to your existing or new project:

$ go get github.com/prashanthpai/asyncsqs
Write Code
package main

import (
	"context"
	"log"
	"strconv"

	"github.com/prashanthpai/asyncsqs"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	"github.com/aws/aws-sdk-go-v2/service/sqs"
	"github.com/aws/aws-sdk-go-v2/service/sqs/types"
)

func main() {
	// Create a SQS client with appropriate credentials/IAM role, region etc.
	awsCfg, err := config.LoadDefaultConfig(context.Background())
	if err != nil {
		log.Fatalf("config.LoadDefaultConfig() failed: %v", err)
	}
	sqsClient := sqs.NewFromConfig(awsCfg)

	// Create a asyncsqs buffered client; you'd have one per SQS queue
	client, err := asyncsqs.NewBufferedClient(asyncsqs.Config{
		SQSClient:          sqsClient,
		QueueURL:           "https://sqs.us-east-1.amazonaws.com/xxxxxxxxxxxx/qqqqqqqqqqqq",
		OnSendMessageBatch: sendResponseHandler, // register callback function (recommended)
	})
	if err != nil {
		log.Fatalf("asyncsqs.NewBufferedClient() failed: %v", err)
	}
	// important! Stop() ensures that requests in memory are gracefully
	// flushed/dispatched and resources like goroutines are cleaned-up
	defer client.Stop()

	for i := 0; i < 100; i++ {
		_ = client.SendMessageAsync(types.SendMessageBatchRequestEntry{
			Id:          aws.String(strconv.Itoa(i)),
			MessageBody: aws.String("hello world"),
		})
	}
}

func sendResponseHandler(output *sqs.SendMessageBatchOutput, err error) {
	if err != nil {
		log.Printf("send returned error: %v", err)
	}
	for _, s := range output.Successful {
		log.Printf("message send successful: msg id = %s", *s.Id)
	}
	for _, f := range output.Failed {
		log.Printf("message send failed: msg id = %s", *f.Id)
	}
}

About

asyncsqs wraps around SQS client from aws-sdk-go-v2 to provide an async buffered client to optimise AWS costs.

Resources

License

Code of conduct

Security policy

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Go 100.0%