Skip to content
Permalink
Browse files

Merge pull request #19 from jfontan/fix/read-retries

Support int16 and int64 in retries header
  • Loading branch information...
jfontan committed Mar 7, 2019
2 parents a667403 + c01d65a commit dc171cdd582e12c07dd42a20e01913e5dea2fa2d
Showing with 109 additions and 8 deletions.
  1. +43 −7 amqp/amqp.go
  2. +66 −1 amqp/amqp_test.go
@@ -1,7 +1,8 @@
package queue
package amqp

import (
"fmt"
"math"
"os"
"strings"
"sync"
@@ -626,18 +627,53 @@ func fromDelivery(d *amqp.Delivery) (*queue.Job, error) {
j.Raw = d.Body

if retries, ok := d.Headers[DefaultConfiguration.RetriesHeader]; ok {
retries, ok := retries.(int32)
if !ok {
return nil, ErrRetrievingHeader.New(DefaultConfiguration.RetriesHeader, d.MessageId)
}
switch r := retries.(type) {
case int16:
j.Retries = int32(r)

case int32:
j.Retries = int32(r)

case int64:
if r <= math.MaxInt32 {
j.Retries = int32(r)
} else {
j.Retries = 0
}

j.Retries = retries
default:
err = d.Reject(false)
if err != nil {
return nil, ErrRetrievingHeader.Wrap(
err,
DefaultConfiguration.RetriesHeader,
d.MessageId,
)
}

return nil, ErrRetrievingHeader.New(
DefaultConfiguration.RetriesHeader,
d.MessageId,
)
}
}

if errorType, ok := d.Headers[DefaultConfiguration.ErrorHeader]; ok {
errorType, ok := errorType.(string)
if !ok {
return nil, ErrRetrievingHeader.New(DefaultConfiguration.ErrorHeader, d.MessageId)
err = d.Reject(false)
if err != nil {
return nil, ErrRetrievingHeader.Wrap(
err,
DefaultConfiguration.ErrorHeader,
d.MessageId,
)
}

return nil, ErrRetrievingHeader.New(
DefaultConfiguration.ErrorHeader,
d.MessageId,
)
}

j.ErrorType = errorType
@@ -1,4 +1,4 @@
package queue
package amqp

import (
"context"
@@ -11,6 +11,7 @@ import (
"gopkg.in/src-d/go-queue.v1"
"gopkg.in/src-d/go-queue.v1/test"

"github.com/streadway/amqp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
@@ -177,6 +178,70 @@ func TestAMQPHeaders(t *testing.T) {
}
}

func TestAMQPHeaderRetriesType(t *testing.T) {
broker, err := queue.NewBroker(testAMQPURI)
require.NoError(t, err)
defer func() { require.NoError(t, broker.Close()) }()

q, err := broker.Queue(test.NewName())
require.NoError(t, err)

qa, ok := q.(*Queue)
require.True(t, ok)

tests := []struct {
name string
retries interface{}
}{
{
name: "int16",
retries: int16(42),
},
{
name: "int32",
retries: int32(42),
},
{
name: "int64",
retries: int64(42),
},
}

for _, test := range tests {
headers := amqp.Table{}
headers[DefaultConfiguration.RetriesHeader] = test.retries
err := qa.conn.channel().Publish(
"", // exchange
qa.queue.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
MessageId: "id",
Priority: uint8(queue.PriorityNormal),
Timestamp: time.Now(),
ContentType: "application/msgpack",
Body: []byte("gaxSZXBvc2l0b3J5SUTEEAFmXSlGxxOsFGMLs/gl7Qw="),
Headers: headers,
},
)
require.NoError(t, err)
}

jobIter, err := q.Consume(len(tests))
require.NoError(t, err)

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
job, err := jobIter.Next()
require.NoError(t, err)
require.NotNil(t, job)

require.Equal(t, int32(42), job.Retries)
})
}
}

func TestAMQPRepublishBuried(t *testing.T) {
broker, err := queue.NewBroker(testAMQPURI)
require.NoError(t, err)

0 comments on commit dc171cd

Please sign in to comment.
You can’t perform that action at this time.