From 07a7d3cd8de198824f1f8d730ec64adee5b8d211 Mon Sep 17 00:00:00 2001 From: Tanabe Ken-ichi Date: Sat, 20 Apr 2024 22:50:51 +0900 Subject: [PATCH] feat!: add aws-sdk-go-v2 support (#79) * Migrate to aws-sdk-go-v2 * Use the original sqsiface package name. * build: prep for v4 * Remove -WithContext method and require context by default. * fix(queue): initialize the map * fix(test-multiqueue): initialize the SQS client properly * fix: go mod tidy * fix: rewrite the real SQS test without suite.Suite * docs: update README --------- Co-authored-by: Nurahmadie --- README.md | 26 +++-- example/test-multiqueue/main.go | 79 ++++++++----- go.mod | 26 ++++- go.sum | 84 +++++-------- internal/sqsiface/api.go | 26 +++++ multiqueue/mq.go | 4 +- multiqueue/mq_test.go | 10 +- queue/example_test.go | 24 ++-- queue/option/option.go | 41 ++++--- queue/queue.go | 142 ++++++++-------------- queue/queue_test.go | 201 ++++++++++++++++---------------- 11 files changed, 333 insertions(+), 330 deletions(-) create mode 100644 internal/sqsiface/api.go diff --git a/README.md b/README.md index 5935b07..fcf00c3 100644 --- a/README.md +++ b/README.md @@ -4,27 +4,23 @@ [![PkgGoDev](https://pkg.go.dev/badge/github.com/nabeken/aws-go-sqs/v3)](https://pkg.go.dev/github.com/nabeken/aws-go-sqs/v3) [![MIT License](http://img.shields.io/badge/license-MIT-blue.svg)](LICENSE) -`aws-go-sqs` is a SQS wrapper library for [aws/aws-sdk-go](https://github.com/aws/aws-sdk-go). +`aws-go-sqs` is a SQS wrapper library for [aws/aws-sdk-go-v2](https://github.com/aws/aws-sdk-go-v2). # Usage -`v3` and later require Go Modules support to import this package. +The current version is `v4`. + ```go -import "github.com/nabeken/aws-go-sqs/v3/queue" +import "github.com/nabeken/aws-go-sqs/v4/queue" ``` -*Note*: v3 is still under-development as it doesn't have any stable release. - -From v3 train (and `master` branch), we no longer use `gopkg.in`. - -- We have [v1 branch](https://github.com/nabeken/aws-go-sqs/tree/v1) so you can import it from `gopkg.in/nabeken/aws-go-sqs.v1`. -- We have [v2 branch](https://github.com/nabeken/aws-go-sqs/tree/v2) so you can import it from `gopkg.in/nabeken/aws-go-sqs.v2`. +If you're looking for the aws-sdk-go (v1) support, please use v2 or v3. # Multi-queue implementation -v3 has [multiqueue](multiqueue/) package to address multi-queue (region) deployment of SQS. SQS is a crucial messaging component but it's still possible to become unavailable for several hours. We experienced that incident at [2020-04-20](https://status.aws.amazon.com/rss/sqs-ap-northeast-1.rss). +v3 and later has the [multiqueue](multiqueue/) package to address multi-queue (region) deployment of SQS. SQS is a crucial messaging component but it's still possible to become unavailable for several hours. -Since SQS is just a message bus between components, deploying SQS to the multiple regions for availability works even the system isn't fully deployed to the multiple regions. +Since SQS is just a message bus between components, deploying SQS to the multiple regions for availability still works even if an entire system isn't fully deployed to the multiple regions. ## How `multiqueue` works @@ -40,7 +36,13 @@ When there is no queue in the "available" slice, the dispatcher returns a queue Example code: ```go // Create SQS instance -s := sqs.New(session.Must(session.NewSession())) +cfg, err := config.LoadDefaultConfig(context.TODO()) +if err != nil { + log.Fatalf("loading AWS config: %s", err.Error()) +} + +s1 := sqs.NewFromConfig(cfg) +s2 := sqs.NewFromConfig(cfg) // Create Queue instances q1 := queue.MustNew(s, "example-queue1") diff --git a/example/test-multiqueue/main.go b/example/test-multiqueue/main.go index 11a8afc..e893fe0 100644 --- a/example/test-multiqueue/main.go +++ b/example/test-multiqueue/main.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/json" + "errors" "flag" "fmt" "log" @@ -15,15 +16,16 @@ import ( "sync" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/mercari/go-circuitbreaker" - "github.com/nabeken/aws-go-sqs/v3/multiqueue" - "github.com/nabeken/aws-go-sqs/v3/queue" - "github.com/nabeken/aws-go-sqs/v3/queue/option" + "github.com/nabeken/aws-go-sqs/v4/multiqueue" + "github.com/nabeken/aws-go-sqs/v4/queue" + "github.com/nabeken/aws-go-sqs/v4/queue/option" ) +var errGotSignal = errors.New("got a signal") + func main() { var queueName1 = flag.String("queue1", "", "specify SQS queue name 1") var region1 = flag.String("region1", "ap-northeast-1", "specify a region for queue1") @@ -39,7 +41,8 @@ func main() { flag.Parse() - rand.Seed(time.Now().UnixNano()) + randSource := rand.NewSource(time.Now().UnixNano()) + rng := rand.New(randSource) if *queueName1 == "" || *queueName2 == "" { log.Fatal("Please specify queue name") @@ -48,11 +51,11 @@ func main() { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancelCause(context.Background()) go func() { sig := <-c log.Println("got signal:", sig) - cancel() + cancel(errGotSignal) }() tr := &http.Transport{ @@ -79,18 +82,22 @@ func main() { } // Create SQS instance for region1 - s1 := sqs.New(session.Must(session.NewSession(&aws.Config{ - HTTPClient: httpClient, - Region: region1, - }))) - s2 := sqs.New(session.Must(session.NewSession(&aws.Config{ - HTTPClient: httpClient, - Region: region2, - }))) + cfg, err := config.LoadDefaultConfig(ctx, config.WithHTTPClient(httpClient)) + if err != nil { + log.Fatalf("loading AWS config: %s", err.Error()) + } + + s1 := sqs.NewFromConfig(cfg, func(opts *sqs.Options) { + opts.Region = *region1 + }) + + s2 := sqs.NewFromConfig(cfg, func(opts *sqs.Options) { + opts.Region = *region2 + }) // Create Queue instance - q1 := multiqueue.NewQueue(queue.MustNew(s1, *queueName1)).Weight(*weight1) - q2 := multiqueue.NewQueue(queue.MustNew(s2, *queueName2)).Weight(*weight2) + q1 := multiqueue.NewQueue(queue.MustNew(ctx, s1, *queueName1)).Weight(*weight1) + q2 := multiqueue.NewQueue(queue.MustNew(ctx, s2, *queueName2)).Weight(*weight2) // if we do not set OpenTimeout nor OpenBackOff, the default value of OpenBackOff will be used. cbOpts := []circuitbreaker.BreakerOption{ @@ -108,10 +115,11 @@ func main() { {URL: *q1.URL}, {URL: *q2.URL}, }, + rng: rng, } go func() { log.Print("starting failure injection HTTP server...") - http.ListenAndServe("127.0.0.1:9003", fss) + _ = http.ListenAndServe("127.0.0.1:9003", fss) }() var wg sync.WaitGroup @@ -171,7 +179,6 @@ func send(ctx context.Context, count, concurrency int, d *multiqueue.Dispatcher, sem := make(chan struct{}, concurrency) -LOOP: for i := 0; i < count; i++ { sem <- struct{}{} cnt := i + 1 @@ -185,10 +192,15 @@ LOOP: return nil, err } - return exec.SendMessage(fmt.Sprintf("MESSAGE BODY FROM MULTI-QUEUE %d", cnt), option.MessageAttributes(attrs)) + return exec.SendMessage(ctx, fmt.Sprintf("MESSAGE BODY FROM MULTI-QUEUE %d", cnt), option.MessageAttributes(attrs)) }) if err != nil { + if ctxGotSignal(ctx) { + log.Printf("got a signal") + return + } + log.Printf("%s: unable to send the message. will retry: %s", *exec.Queue.URL, err) if err == circuitbreaker.ErrOpen { time.Sleep(time.Second) @@ -202,7 +214,7 @@ LOOP: select { case <-ctx.Done(): - break LOOP + return default: } } @@ -214,15 +226,17 @@ func recv(ctx context.Context, exec *multiqueue.Executor) []string { log.Printf("%s: starting receiver...", *exec.URL) var messages []string + for { select { case <-ctx.Done(): log.Printf("shutting down receiver... count:%d", len(messages)) - return messages + goto END default: } resp, err := exec.ReceiveMessage( + ctx, option.MaxNumberOfMessages(10), ) if err != nil { @@ -230,13 +244,15 @@ func recv(ctx context.Context, exec *multiqueue.Executor) []string { } for _, m := range resp { - if err := exec.DeleteMessage(m.ReceiptHandle); err != nil { + if err := exec.DeleteMessage(ctx, m.ReceiptHandle); err != nil { log.Printf("unable to delete message: %s", err) } messages = append(messages, *m.Body) } } +END: + return messages } @@ -249,6 +265,7 @@ type failureScenario struct { type failureScenarioServer struct { mu sync.Mutex scenario []failureScenario + rng *rand.Rand } func (s *failureScenarioServer) findScenario(q *multiqueue.Queue) (failureScenario, bool) { @@ -267,7 +284,7 @@ func (s *failureScenarioServer) failureScenario(q *multiqueue.Queue) error { } if time.Now().Before(sc.Until) { - if rand.Float64() > sc.ErrRate { + if s.rng.Float64() > sc.ErrRate { return nil } return fmt.Errorf("this is a failure scenario until %s", sc.Until.Format(time.RFC3339)) @@ -303,11 +320,17 @@ func (s *failureScenarioServer) ServeHTTP(rw http.ResponseWriter, req *http.Requ s.mu.Lock() defer s.mu.Unlock() if index > int64(len(s.scenario))-1 { - http.Error(rw, err.Error(), http.StatusBadRequest) + http.Error(rw, "index out of bound", http.StatusBadRequest) return } s.scenario[index].Until = time.Now().Add(dur) s.scenario[index].ErrRate = errRate - json.NewEncoder(rw).Encode(s.scenario) + _ = json.NewEncoder(rw).Encode(s.scenario) +} + +func ctxGotSignal(ctx context.Context) bool { + err := context.Cause(ctx) + + return err != nil && errors.Is(err, errGotSignal) } diff --git a/go.mod b/go.mod index 70667e8..183abc2 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,32 @@ -module github.com/nabeken/aws-go-sqs/v3 +module github.com/nabeken/aws-go-sqs/v4 -go 1.15 +go 1.21 require ( - github.com/aws/aws-sdk-go v1.51.25 + github.com/aws/aws-sdk-go-v2 v1.26.1 + github.com/aws/aws-sdk-go-v2/config v1.27.11 + github.com/aws/aws-sdk-go-v2/service/sqs v1.31.2 github.com/benbjohnson/clock v1.3.5 // indirect github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/hashicorp/go-multierror v1.1.1 github.com/mercari/go-circuitbreaker v0.0.2 github.com/stretchr/testify v1.9.0 ) + +require ( + github.com/aws/aws-sdk-go-v2/credentials v1.17.11 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect + github.com/aws/smithy-go v1.20.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index 416e0f8..13e4841 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,31 @@ -github.com/aws/aws-sdk-go v1.51.25 h1:DjTT8mtmsachhV6yrXR8+yhnG6120dazr720nopRsls= -github.com/aws/aws-sdk-go v1.51.25/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= +github.com/aws/aws-sdk-go-v2 v1.26.1 h1:5554eUqIYVWpU0YmeeYZ0wU64H2VLBs8TlhRB2L+EkA= +github.com/aws/aws-sdk-go-v2 v1.26.1/go.mod h1:ffIFB97e2yNsv4aTSGkqtHnppsIJzw7G7BReUZ3jCXM= +github.com/aws/aws-sdk-go-v2/config v1.27.11 h1:f47rANd2LQEYHda2ddSCKYId18/8BhSRM4BULGmfgNA= +github.com/aws/aws-sdk-go-v2/config v1.27.11/go.mod h1:SMsV78RIOYdve1vf36z8LmnszlRWkwMQtomCAI0/mIE= +github.com/aws/aws-sdk-go-v2/credentials v1.17.11 h1:YuIB1dJNf1Re822rriUOTxopaHHvIq0l/pX3fwO+Tzs= +github.com/aws/aws-sdk-go-v2/credentials v1.17.11/go.mod h1:AQtFPsDH9bI2O+71anW6EKL+NcD7LG3dpKGMV4SShgo= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 h1:FVJ0r5XTHSmIHJV6KuDmdYhEpvlHpiSd38RQWhut5J4= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1/go.mod h1:zusuAeqezXzAB24LGuzuekqMAEgWkVYukBec3kr3jUg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 h1:aw39xVGeRWlWx9EzGVnhOR4yOjQDHPQ6o6NmBlscyQg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5/go.mod h1:FSaRudD0dXiMPK2UjknVwwTYyZMRsHv3TtkabsZih5I= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 h1:PG1F3OD1szkuQPzDw3CIQsRIrtTlUC3lP84taWzHlq0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5/go.mod h1:jU1li6RFryMz+so64PpKtudI+QzbKoIEivqdf6LNpOc= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 h1:hT8rVHwugYE2lEfdFE0QWVo81lF7jMrYJVDWI+f+VxU= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0/go.mod h1:8tu/lYfQfFe6IGnaOdrpVgEL2IrrDOf6/m9RQum4NkY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 h1:Ji0DY1xUsUr3I8cHps0G+XM3WWU16lP6yG8qu1GAZAs= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 h1:ogRAwT1/gxJBcSWDMZlgyFUM962F51A5CRhDLbxLdmo= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7/go.mod h1:YCsIZhXfRPLFFCl5xxY+1T9RKzOKjCut+28JSX2DnAk= +github.com/aws/aws-sdk-go-v2/service/sqs v1.31.2 h1:A9ihuyTKpS8Z1ou/D4ETfOEFMyokA6JjRsgXWTiHvCk= +github.com/aws/aws-sdk-go-v2/service/sqs v1.31.2/go.mod h1:J3XhTE+VsY1jDsdDY+ACFAppZj/gpvygzC5JE0bTLbQ= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 h1:vN8hEbpRnL7+Hopy9dzmRle1xmDc7o8tmY0klsr175w= +github.com/aws/aws-sdk-go-v2/service/sso v1.20.5/go.mod h1:qGzynb/msuZIE8I75DVRCUXw3o3ZyBmUvMwQ2t/BrGM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 h1:Jux+gDDyi1Lruk+KHF91tK2KCuY61kzoCpvtvJJBtOE= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4/go.mod h1:mUYPBhaF2lGiukDEjJX2BLRRKTmoUSitGDUgM4tRxak= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 h1:cwIxeBttqPN3qkaAjcEcsh8NYr8n2HZPkcKgPAi1phU= +github.com/aws/aws-sdk-go-v2/service/sts v1.28.6/go.mod h1:FZf1/nKNEkHdGGJP/cI2MoIMquumuRK6ol3QQJNDxmw= +github.com/aws/smithy-go v1.20.2 h1:tbp628ireGtzcHDDmLT/6ADHidqnwgF57XOXZe6tp4Q= +github.com/aws/smithy-go v1.20.2/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= @@ -13,70 +39,16 @@ github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/U github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= -github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= -github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= -github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= -github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/mercari/go-circuitbreaker v0.0.2 h1:o4hEUhXQ5n1CqVYpLLk6dyBUF4GDfgCf+5Fk8UWOFfw= github.com/mercari/go-circuitbreaker v0.0.2/go.mod h1:0jxDKIpe1ktz1HaqQW8bJ9NwT/rxOn5A/92CZVgbJRs= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= -gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/sqsiface/api.go b/internal/sqsiface/api.go new file mode 100644 index 0000000..d62e477 --- /dev/null +++ b/internal/sqsiface/api.go @@ -0,0 +1,26 @@ +package sqsiface + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/service/sqs" +) + +type SQSAPI interface { + ChangeMessageVisibility(context.Context, *sqs.ChangeMessageVisibilityInput, ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityOutput, error) + ChangeMessageVisibilityBatch(context.Context, *sqs.ChangeMessageVisibilityBatchInput, ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityBatchOutput, error) + + SendMessage(context.Context, *sqs.SendMessageInput, ...func(*sqs.Options)) (*sqs.SendMessageOutput, error) + SendMessageBatch(context.Context, *sqs.SendMessageBatchInput, ...func(*sqs.Options)) (*sqs.SendMessageBatchOutput, error) + + ReceiveMessage(context.Context, *sqs.ReceiveMessageInput, ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) + + DeleteMessage(context.Context, *sqs.DeleteMessageInput, ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) + DeleteMessageBatch(context.Context, *sqs.DeleteMessageBatchInput, ...func(*sqs.Options)) (*sqs.DeleteMessageBatchOutput, error) + + DeleteQueue(context.Context, *sqs.DeleteQueueInput, ...func(*sqs.Options)) (*sqs.DeleteQueueOutput, error) + + PurgeQueue(context.Context, *sqs.PurgeQueueInput, ...func(*sqs.Options)) (*sqs.PurgeQueueOutput, error) + + GetQueueUrl(context.Context, *sqs.GetQueueUrlInput, ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error) +} diff --git a/multiqueue/mq.go b/multiqueue/mq.go index 5f6a8d4..358e138 100644 --- a/multiqueue/mq.go +++ b/multiqueue/mq.go @@ -8,7 +8,7 @@ import ( "time" "github.com/mercari/go-circuitbreaker" - "github.com/nabeken/aws-go-sqs/v3/queue" + "github.com/nabeken/aws-go-sqs/v4/queue" ) type Queue struct { @@ -252,11 +252,13 @@ AGAIN: return wq.q[n] } } + if wq.nextIndex >= len(wq.q) { wq.round++ wq.nextIndex = 0 } } + if wq.round >= wq.maxWeight { wq.round = 0 } diff --git a/multiqueue/mq_test.go b/multiqueue/mq_test.go index 7b8d58e..b0ce679 100644 --- a/multiqueue/mq_test.go +++ b/multiqueue/mq_test.go @@ -6,9 +6,9 @@ import ( "testing" "time" - "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/mercari/go-circuitbreaker" - "github.com/nabeken/aws-go-sqs/v3/queue" + "github.com/nabeken/aws-go-sqs/v4/queue" "github.com/stretchr/testify/assert" ) @@ -111,9 +111,9 @@ func TestMaxWeight(t *testing.T) { func TestWeightedQueues(t *testing.T) { wq := NewWeightedQueues([]*Queue{ - &Queue{w: 1, Queue: testDummyQueue("dummy1")}, - &Queue{w: 5, Queue: testDummyQueue("dummy2")}, - &Queue{w: 2, Queue: testDummyQueue("dummy3")}, + {w: 1, Queue: testDummyQueue("dummy1")}, + {w: 5, Queue: testDummyQueue("dummy2")}, + {w: 2, Queue: testDummyQueue("dummy3")}, }) for i := 0; i < 3; i++ { diff --git a/queue/example_test.go b/queue/example_test.go index 26053d8..ff88af3 100644 --- a/queue/example_test.go +++ b/queue/example_test.go @@ -1,20 +1,20 @@ package queue_test import ( + "context" "log" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/nabeken/aws-go-sqs/v3/queue" - "github.com/nabeken/aws-go-sqs/v3/queue/option" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/nabeken/aws-go-sqs/v4/queue" + "github.com/nabeken/aws-go-sqs/v4/queue/option" ) func ExampleQueue_SendMessage() { // Create SQS instance - s := sqs.New(session.Must(session.NewSession())) + s := sqs.New(sqs.Options{}) // Create Queue instance - q, err := queue.New(s, "example-queue-name") + q, err := queue.New(context.Background(), s, "example-queue-name") if err != nil { log.Fatal(err) } @@ -25,7 +25,7 @@ func ExampleQueue_SendMessage() { "ATTR2": 12345, } - if _, err := q.SendMessage("MESSAGE BODY", option.MessageAttributes(attrs)); err != nil { + if _, err := q.SendMessage(context.Background(), "MESSAGE BODY", option.MessageAttributes(attrs)); err != nil { log.Fatal(err) } @@ -34,10 +34,10 @@ func ExampleQueue_SendMessage() { func ExampleQueue_SendMessageBatch() { // Create SQS instance - s := sqs.New(session.Must(session.NewSession())) + s := sqs.New(sqs.Options{}) // Create Queue instance - q, err := queue.New(s, "example-queue-name") + q, err := queue.New(context.Background(), s, "example-queue-name") if err != nil { log.Fatal(err) } @@ -49,16 +49,16 @@ func ExampleQueue_SendMessageBatch() { // Create messages for batch operation batchMessages := []queue.BatchMessage{ - queue.BatchMessage{ + { Body: "success", }, - queue.BatchMessage{ + { Body: "failed", Options: []option.SendMessageInput{option.MessageAttributes(attrs)}, }, } - err = q.SendMessageBatch(batchMessages...) + err = q.SendMessageBatch(context.Background(), batchMessages...) if err != nil { batchErrors, ok := queue.IsBatchError(err) if !ok { diff --git a/queue/option/option.go b/queue/option/option.go index 16f50fb..d932f5e 100644 --- a/queue/option/option.go +++ b/queue/option/option.go @@ -4,8 +4,9 @@ package option import ( "strconv" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" ) // The DataType is a type of data used in Attributes and Message Attributes. @@ -20,25 +21,25 @@ const ( type ReceiveMessageInput func(req *sqs.ReceiveMessageInput) // VisibilityTimeout returns a ReceiveMessageInput that changes a message visibility timeout. -func VisibilityTimeout(timeout int64) ReceiveMessageInput { +func VisibilityTimeout(timeout int32) ReceiveMessageInput { return func(req *sqs.ReceiveMessageInput) { - req.VisibilityTimeout = aws.Int64(timeout) + req.VisibilityTimeout = timeout } } // MaxNumberOfMessages returns a ReceiveMessageInput that // changes a max number of messages to receive to n. -func MaxNumberOfMessages(n int64) ReceiveMessageInput { +func MaxNumberOfMessages(n int32) ReceiveMessageInput { return func(req *sqs.ReceiveMessageInput) { - req.MaxNumberOfMessages = aws.Int64(n) + req.MaxNumberOfMessages = n } } // WaitTimeSeconds returns a ReceiveMessageInput that // changes WaitTimeSeconds parameter. -func WaitTimeSeconds(n int64) ReceiveMessageInput { +func WaitTimeSeconds(n int32) ReceiveMessageInput { return func(req *sqs.ReceiveMessageInput) { - req.WaitTimeSeconds = aws.Int64(n) + req.WaitTimeSeconds = n } } @@ -52,8 +53,10 @@ func UseAllAttribute() ReceiveMessageInput { // changes AttributeNames and MessageAttributeNames to attr. func UseAttributes(attr ...string) ReceiveMessageInput { return func(req *sqs.ReceiveMessageInput) { - req.AttributeNames = aws.StringSlice(attr) - req.MessageAttributeNames = aws.StringSlice(attr) + for i := range attr { + req.AttributeNames = append(req.AttributeNames, types.QueueAttributeName(attr[i])) + } + req.MessageAttributeNames = attr } } @@ -62,9 +65,9 @@ func UseAttributes(attr ...string) ReceiveMessageInput { type SendMessageInput func(req *sqs.SendMessageInput) // DelaySeconds returns a SendMessageInput that changes DelaySeconds to delay in seconds. -func DelaySeconds(delay int64) SendMessageInput { +func DelaySeconds(delay int32) SendMessageInput { return func(req *sqs.SendMessageInput) { - req.DelaySeconds = aws.Int64(delay) + req.DelaySeconds = delay } } @@ -78,35 +81,37 @@ func MessageAttributes(attrs map[string]interface{}) SendMessageInput { return } - ret := make(map[string]*sqs.MessageAttributeValue) + ret := make(map[string]types.MessageAttributeValue) + for n, v := range attrs { ret[n] = MessageAttributeValue(v) } + req.MessageAttributes = ret } } // MessageAttributeValue returns a appropriate sqs.MessageAttributeValue by type assersion of v. // Types except string, []byte, int64 and int cause panicking. -func MessageAttributeValue(v interface{}) *sqs.MessageAttributeValue { +func MessageAttributeValue(v interface{}) types.MessageAttributeValue { switch vv := v.(type) { case string: - return &sqs.MessageAttributeValue{ + return types.MessageAttributeValue{ DataType: aws.String(DataTypeString), StringValue: aws.String(vv), } case []byte: - return &sqs.MessageAttributeValue{ + return types.MessageAttributeValue{ DataType: aws.String(DataTypeBinary), BinaryValue: vv, } case int64: - return &sqs.MessageAttributeValue{ + return types.MessageAttributeValue{ DataType: aws.String(DataTypeNumber), StringValue: aws.String(strconv.FormatInt(vv, 10)), } case int: - return &sqs.MessageAttributeValue{ + return types.MessageAttributeValue{ DataType: aws.String(DataTypeNumber), StringValue: aws.String(strconv.FormatInt(int64(vv), 10)), } diff --git a/queue/queue.go b/queue/queue.go index 0748306..e53176f 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -4,11 +4,12 @@ import ( "context" "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/aws/aws-sdk-go/service/sqs/sqsiface" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" multierror "github.com/hashicorp/go-multierror" - "github.com/nabeken/aws-go-sqs/v3/queue/option" + "github.com/nabeken/aws-go-sqs/v4/internal/sqsiface" + "github.com/nabeken/aws-go-sqs/v4/queue/option" ) // A Queue is an SQS queue which holds queue url in URL. @@ -19,8 +20,8 @@ type Queue struct { } // New initializes Queue with name. -func New(s sqsiface.SQSAPI, name string) (*Queue, error) { - u, err := GetQueueURL(s, name) +func New(ctx context.Context, s sqsiface.SQSAPI, name string) (*Queue, error) { + u, err := GetQueueURL(ctx, s, name) if err != nil { return nil, err } @@ -33,27 +34,22 @@ func New(s sqsiface.SQSAPI, name string) (*Queue, error) { // MustNew initializes Queue with name. // It will panic when it fails to initialize a queue. -func MustNew(s sqsiface.SQSAPI, name string) *Queue { - q, err := New(s, name) +func MustNew(ctx context.Context, s sqsiface.SQSAPI, name string) *Queue { + q, err := New(ctx, s, name) if err != nil { panic(err) } return q } -// ChangeMessageVisibility wraps ChangeMessageVisibilityWithContext using context.Background. -func (q *Queue) ChangeMessageVisibility(receiptHandle *string, visibilityTimeout int64) error { - return q.ChangeMessageVisibilityWithContext(context.Background(), receiptHandle, visibilityTimeout) -} - -// ChangeMessageVisibilityWithContext changes a message visibiliy timeout. -func (q *Queue) ChangeMessageVisibilityWithContext(ctx context.Context, receiptHandle *string, visibilityTimeout int64) error { +// ChangeMessageVisibility changes a message visibiliy timeout. +func (q *Queue) ChangeMessageVisibility(ctx context.Context, receiptHandle *string, visibilityTimeout int32) error { req := &sqs.ChangeMessageVisibilityInput{ ReceiptHandle: receiptHandle, - VisibilityTimeout: aws.Int64(visibilityTimeout), + VisibilityTimeout: visibilityTimeout, QueueUrl: q.URL, } - _, err := q.SQS.ChangeMessageVisibilityWithContext(ctx, req) + _, err := q.SQS.ChangeMessageVisibility(ctx, req) return err } @@ -61,24 +57,19 @@ func (q *Queue) ChangeMessageVisibilityWithContext(ctx context.Context, receiptH // change a visibility timeout. type BatchChangeMessageVisibility struct { ReceiptHandle *string - VisibilityTimeout int64 -} - -// ChangeMessageVisibilityBatch wraps ChangeMessageVisibilityBatchWithContext using context.Background. -func (q *Queue) ChangeMessageVisibilityBatch(opts ...BatchChangeMessageVisibility) error { - return q.ChangeMessageVisibilityBatchWithContext(context.Background(), opts...) + VisibilityTimeout int32 } -// ChangeMessageVisibilityBatchWithContext changes a visibility timeout for each message in opts. -func (q *Queue) ChangeMessageVisibilityBatchWithContext(ctx context.Context, opts ...BatchChangeMessageVisibility) error { - entries := make([]*sqs.ChangeMessageVisibilityBatchRequestEntry, len(opts)) +// ChangeMessageVisibilityBatch changes a visibility timeout for each message in opts. +func (q *Queue) ChangeMessageVisibilityBatch(ctx context.Context, opts ...BatchChangeMessageVisibility) error { + entries := make([]types.ChangeMessageVisibilityBatchRequestEntry, len(opts)) id2index := make(map[string]int) for i, b := range opts { id := aws.String(fmt.Sprintf("msg-%d", i)) - entries[i] = &sqs.ChangeMessageVisibilityBatchRequestEntry{ + entries[i] = types.ChangeMessageVisibilityBatchRequestEntry{ Id: id, ReceiptHandle: b.ReceiptHandle, - VisibilityTimeout: aws.Int64(b.VisibilityTimeout), + VisibilityTimeout: b.VisibilityTimeout, } id2index[*id] = i } @@ -88,20 +79,16 @@ func (q *Queue) ChangeMessageVisibilityBatchWithContext(ctx context.Context, opt QueueUrl: q.URL, } - resp, err := q.SQS.ChangeMessageVisibilityBatchWithContext(ctx, req) + resp, err := q.SQS.ChangeMessageVisibilityBatch(ctx, req) if err != nil { return err } - return NewBatchError(id2index, resp.Failed) -} -// SendMessage wraps SendMessageWithContext using context.Background. -func (q *Queue) SendMessage(body string, opts ...option.SendMessageInput) (*sqs.SendMessageOutput, error) { - return q.SendMessageWithContext(context.Background(), body, opts...) + return NewBatchError(id2index, resp.Failed) } -// SendMessageWithContext sends a message to an SQS queue. opts are used to change parameters for a message. -func (q *Queue) SendMessageWithContext(ctx context.Context, body string, opts ...option.SendMessageInput) (*sqs.SendMessageOutput, error) { +// SendMessage sends a message to an SQS queue. opts are used to change parameters for a message. +func (q *Queue) SendMessage(ctx context.Context, body string, opts ...option.SendMessageInput) (*sqs.SendMessageOutput, error) { req := &sqs.SendMessageInput{ MessageBody: aws.String(body), QueueUrl: q.URL, @@ -111,7 +98,7 @@ func (q *Queue) SendMessageWithContext(ctx context.Context, body string, opts .. f(req) } - return q.SQS.SendMessageWithContext(ctx, req) + return q.SQS.SendMessage(ctx, req) } // A BatchMessage represents each request to send a message. @@ -132,14 +119,14 @@ type BatchError struct { } // NewBatchError composes an error from errors if available. -func NewBatchError(id2index map[string]int, errors []*sqs.BatchResultErrorEntry) error { +func NewBatchError(id2index map[string]int, errors []types.BatchResultErrorEntry) error { var result error for _, entry := range errors { err := &BatchError{ Index: id2index[*entry.Id], Code: *entry.Code, Message: *entry.Message, - SenderFault: *entry.SenderFault, + SenderFault: entry.SenderFault, } result = multierror.Append(result, err) } @@ -173,13 +160,8 @@ func IsBatchError(err error) (errors []*BatchError, ok bool) { return errors, len(errors) > 0 } -// SendMessageBatch wraps SendMessageBatchWithContext using context.Background. -func (q *Queue) SendMessageBatch(messages ...BatchMessage) error { - return q.SendMessageBatchWithContext(context.Background(), messages...) -} - // SendMessageBatch sends messages to SQS queue. -func (q *Queue) SendMessageBatchWithContext(ctx context.Context, messages ...BatchMessage) error { +func (q *Queue) SendMessageBatch(ctx context.Context, messages ...BatchMessage) error { entries, id2index := BuildBatchRequestEntry(messages...) req := &sqs.SendMessageBatchInput{ @@ -187,21 +169,16 @@ func (q *Queue) SendMessageBatchWithContext(ctx context.Context, messages ...Bat QueueUrl: q.URL, } - resp, err := q.SQS.SendMessageBatchWithContext(ctx, req) + resp, err := q.SQS.SendMessageBatch(ctx, req) if err != nil { return err } return NewBatchError(id2index, resp.Failed) } -// ReceiveMessage wraps ReceiveMessageWithContext using context.Background. -func (q *Queue) ReceiveMessage(opts ...option.ReceiveMessageInput) ([]*sqs.Message, error) { - return q.ReceiveMessageWithContext(context.Background(), opts...) -} - // ReceiveMessage receives messages from SQS queue. // opts are used to change parameters for a request. -func (q *Queue) ReceiveMessageWithContext(ctx context.Context, opts ...option.ReceiveMessageInput) ([]*sqs.Message, error) { +func (q *Queue) ReceiveMessage(ctx context.Context, opts ...option.ReceiveMessageInput) ([]types.Message, error) { req := &sqs.ReceiveMessageInput{ QueueUrl: q.URL, } @@ -210,39 +187,29 @@ func (q *Queue) ReceiveMessageWithContext(ctx context.Context, opts ...option.Re f(req) } - resp, err := q.SQS.ReceiveMessageWithContext(ctx, req) + resp, err := q.SQS.ReceiveMessage(ctx, req) if err != nil { return nil, err } return resp.Messages, nil } -// DeleteMessage wraps DeleteMessageWithContext using context.Background. -func (q *Queue) DeleteMessage(receiptHandle *string) error { - return q.DeleteMessageWithContext(context.Background(), receiptHandle) -} - // DeleteMessage deletes a message from SQS queue. -func (q *Queue) DeleteMessageWithContext(ctx context.Context, receiptHandle *string) error { - _, err := q.SQS.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{ +func (q *Queue) DeleteMessage(ctx context.Context, receiptHandle *string) error { + _, err := q.SQS.DeleteMessage(ctx, &sqs.DeleteMessageInput{ QueueUrl: q.URL, ReceiptHandle: receiptHandle, }) return err } -// DeleteMessageBatch wraps DeleteMessageBatchWithContext using context.Background. -func (q *Queue) DeleteMessageBatch(receiptHandles ...*string) error { - return q.DeleteMessageBatchWithContext(context.Background(), receiptHandles...) -} - -// DeleteMessageBatchWithContext deletes messages from SQS queue. -func (q *Queue) DeleteMessageBatchWithContext(ctx context.Context, receiptHandles ...*string) error { - entries := make([]*sqs.DeleteMessageBatchRequestEntry, len(receiptHandles)) +// DeleteMessageBatch deletes messages from SQS queue. +func (q *Queue) DeleteMessageBatch(ctx context.Context, receiptHandles ...*string) error { + entries := make([]types.DeleteMessageBatchRequestEntry, len(receiptHandles)) id2index := make(map[string]int) for i, rh := range receiptHandles { id := aws.String(fmt.Sprintf("msg-%d", i)) - entries[i] = &sqs.DeleteMessageBatchRequestEntry{ + entries[i] = types.DeleteMessageBatchRequestEntry{ Id: id, ReceiptHandle: rh, } @@ -254,52 +221,37 @@ func (q *Queue) DeleteMessageBatchWithContext(ctx context.Context, receiptHandle QueueUrl: q.URL, } - resp, err := q.SQS.DeleteMessageBatchWithContext(ctx, req) + resp, err := q.SQS.DeleteMessageBatch(ctx, req) if err != nil { return err } return NewBatchError(id2index, resp.Failed) } -// DeleteQueue wraps DeleteQueueWithContext using context.Background. -func (q *Queue) DeleteQueue() error { - return q.DeleteQueueWithContext(context.Background()) -} - // DeleteQueue deletes a queue in SQS. -func (q *Queue) DeleteQueueWithContext(ctx context.Context) error { - _, err := q.SQS.DeleteQueueWithContext(ctx, &sqs.DeleteQueueInput{ +func (q *Queue) DeleteQueue(ctx context.Context) error { + _, err := q.SQS.DeleteQueue(ctx, &sqs.DeleteQueueInput{ QueueUrl: q.URL, }) return err } -// PurgeQueue wraps PurgeQueueWithContext using context.Background. -func (q *Queue) PurgeQueue() error { - return q.PurgeQueueWithContext(context.Background()) -} - // PurgeQueue purges messages in SQS queue. // It deletes all messages in SQS queue. -func (q *Queue) PurgeQueueWithContext(ctx context.Context) error { - _, err := q.SQS.PurgeQueueWithContext(ctx, &sqs.PurgeQueueInput{ +func (q *Queue) PurgeQueue(ctx context.Context) error { + _, err := q.SQS.PurgeQueue(ctx, &sqs.PurgeQueueInput{ QueueUrl: q.URL, }) return err } -// GetQueueURL wraps GetQueueURLWithContext using context.Background. -func GetQueueURL(s sqsiface.SQSAPI, name string) (*string, error) { - return GetQueueURLWithContext(context.Background(), s, name) -} - -// GetQueueURLWithContext returns a URL for the given queue name. -func GetQueueURLWithContext(ctx context.Context, s sqsiface.SQSAPI, name string) (*string, error) { +// GetQueueURL returns a URL for the given queue name. +func GetQueueURL(ctx context.Context, s sqsiface.SQSAPI, name string) (*string, error) { req := &sqs.GetQueueUrlInput{ QueueName: aws.String(name), } - resp, err := s.GetQueueUrlWithContext(ctx, req) + resp, err := s.GetQueueUrl(ctx, req) if err != nil { return nil, err } @@ -307,8 +259,8 @@ func GetQueueURLWithContext(ctx context.Context, s sqsiface.SQSAPI, name string) } // BuildBatchRequestEntry builds batch entries and id2index map. -func BuildBatchRequestEntry(messages ...BatchMessage) ([]*sqs.SendMessageBatchRequestEntry, map[string]int) { - entries := make([]*sqs.SendMessageBatchRequestEntry, len(messages)) +func BuildBatchRequestEntry(messages ...BatchMessage) ([]types.SendMessageBatchRequestEntry, map[string]int) { + entries := make([]types.SendMessageBatchRequestEntry, len(messages)) id2index := make(map[string]int) for i, bm := range messages { req := &sqs.SendMessageInput{} @@ -317,7 +269,7 @@ func BuildBatchRequestEntry(messages ...BatchMessage) ([]*sqs.SendMessageBatchRe } id := aws.String(fmt.Sprintf("msg-%d", i)) - entries[i] = &sqs.SendMessageBatchRequestEntry{ + entries[i] = types.SendMessageBatchRequestEntry{ DelaySeconds: req.DelaySeconds, MessageAttributes: req.MessageAttributes, MessageBody: aws.String(bm.Body), diff --git a/queue/queue_test.go b/queue/queue_test.go index 606c6c7..61a409f 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -1,137 +1,138 @@ -package queue +package queue_test import ( + "context" "os" "testing" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/sqs" - "github.com/nabeken/aws-go-sqs/v3/queue/option" - "github.com/stretchr/testify/suite" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/nabeken/aws-go-sqs/v4/queue" + "github.com/nabeken/aws-go-sqs/v4/queue/option" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) -func testSQSQueue(name string) (*Queue, error) { - return New(sqs.New(session.New()), name) +type realSQSTestEnv struct { + queue *queue.Queue } -type SendMessageBatchSuite struct { - suite.Suite - - queue *Queue -} - -func (s *SendMessageBatchSuite) SetupSuite() { +func setupRealSQSTestenv(t *testing.T) *realSQSTestEnv { name := os.Getenv("TEST_SQS_QUEUE_NAME") if len(name) == 0 { - s.T().Skip("TEST_SQS_QUEUE_NAME must be set") + t.Skip("TEST_SQS_QUEUE_NAME must be set") } - q, err := testSQSQueue(name) + cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { - s.T().Fatal(err) + t.Fatalf("loading AWS config: %s", err.Error()) } - s.queue = q -} + q, err := queue.New(context.TODO(), sqs.NewFromConfig(cfg), name) + if err != nil { + t.Fatal(err) + } -func (s *SendMessageBatchSuite) SetupTest() { -} + env := &realSQSTestEnv{ + queue: q, + } -func (s *SendMessageBatchSuite) TearDownTest() { -} + t.Cleanup(func() { + t.Log("purging the queue...") -func (s *SendMessageBatchSuite) TearDownSuite() { - // don't care of the result but logs it - if err := s.queue.PurgeQueue(); err != nil { - s.T().Log(err) - } + if err := env.queue.PurgeQueue(context.TODO()); err != nil { + t.Log(err) + } + }) + + return env } -func (s *SendMessageBatchSuite) TestSendMessageBatch() { - attrs := map[string]interface{}{ - "ATTR1": "STRING!!", - "ATTR2": 12345, +func TestSendMessageBatch(t *testing.T) { + if testing.Short() { + t.Skip("skipping test") } - batchMessages := []BatchMessage{ - BatchMessage{ - Body: "body1", - Options: []option.SendMessageInput{option.MessageAttributes(attrs)}, - }, - BatchMessage{ - Body: "body2", - Options: []option.SendMessageInput{option.MessageAttributes(attrs)}, - }, - } + env := setupRealSQSTestenv(t) - if err := s.queue.SendMessageBatch(batchMessages...); !s.NoError(err) { - return - } + t.Run("OK", func(t *testing.T) { + attrs := map[string]interface{}{ + "ATTR1": "STRING!!", + "ATTR2": 12345, + } - messages, err := s.queue.ReceiveMessage( - option.MaxNumberOfMessages(5), - option.UseAllAttribute(), - ) - if !s.NoError(err) { - return - } + batchMessages := []queue.BatchMessage{ + { + Body: "body1", + Options: []option.SendMessageInput{option.MessageAttributes(attrs)}, + }, + { + Body: "body2", + Options: []option.SendMessageInput{option.MessageAttributes(attrs)}, + }, + } - s.Len(messages, 2) + err := env.queue.SendMessageBatch(context.Background(), batchMessages...) + require.NoError(t, err) + + messages, err := env.queue.ReceiveMessage( + context.Background(), + option.MaxNumberOfMessages(5), + option.UseAllAttribute(), + ) + + require.NoError(t, err) + assert.Len(t, messages, 2) + + for i, m := range messages { + assert.Len(t, m.MessageAttributes, 2) + for k, a := range m.MessageAttributes { + mav := option.MessageAttributeValue(attrs[k]) + assert.Equal(t, mav.StringValue, a.StringValue) + } + assert.Equal(t, batchMessages[i].Body, *m.Body) + env.queue.DeleteMessage(context.TODO(), m.ReceiptHandle) + } + }) - for i, m := range messages { - s.Len(m.MessageAttributes, 2) - for k, a := range m.MessageAttributes { - mav := option.MessageAttributeValue(attrs[k]) - s.Equal(mav.StringValue, a.StringValue) + t.Run("Error", func(t *testing.T) { + attrs := map[string]interface{}{ + "error": "", } - s.Equal(batchMessages[i].Body, *m.Body) - s.queue.DeleteMessage(m.ReceiptHandle) - } -} -func (s *SendMessageBatchSuite) TestSendMessageBatchError() { - attrs := map[string]interface{}{ - "error": "", - } + batchMessages := []queue.BatchMessage{ + { + Body: "success", + }, + { + Body: "failed", + Options: []option.SendMessageInput{option.MessageAttributes(attrs)}, + }, + } - batchMessages := []BatchMessage{ - BatchMessage{ - Body: "success", - }, - BatchMessage{ - Body: "failed", - Options: []option.SendMessageInput{option.MessageAttributes(attrs)}, - }, - } + err := env.queue.SendMessageBatch(context.Background(), batchMessages...) + require.Error(t, err) - if err := s.queue.SendMessageBatch(batchMessages...); s.Error(err) { - if berrs, ok := IsBatchError(err); s.True(ok, "error must contain *BatchError") { - s.Len(berrs, 1) + berrs, ok := queue.IsBatchError(err) - s.Equal(1, berrs[0].Index, "batchMessages[1] must be error") - s.Equal("InvalidParameterValue", berrs[0].Code) - s.Equal(true, berrs[0].SenderFault) - } - } + require.True(t, ok, "error must contain *BatchError") - messages, err := s.queue.ReceiveMessage( - option.MaxNumberOfMessages(5), - option.UseAllAttribute(), - ) - if !s.NoError(err) { - return - } + assert.Len(t, berrs, 1) + assert.Equal(t, 1, berrs[0].Index, "batchMessages[1] must be error") + assert.Equal(t, "InvalidParameterValue", berrs[0].Code) + assert.Equal(t, true, berrs[0].SenderFault) - s.Len(messages, 1) - for _, m := range messages { - s.queue.DeleteMessage(m.ReceiptHandle) - } -} + messages, err := env.queue.ReceiveMessage( + context.Background(), + option.MaxNumberOfMessages(5), + option.UseAllAttribute(), + ) -func TestSendMessageBatchSuite(t *testing.T) { - if testing.Short() { - t.Skip("skipping test") - } + require.NoError(t, err) - suite.Run(t, new(SendMessageBatchSuite)) + assert.Len(t, messages, 1) + for _, m := range messages { + env.queue.DeleteMessage(context.TODO(), m.ReceiptHandle) + } + }) }