Skip to content

Commit

Permalink
Enable returning a message back from the dead to the queue (#112)
Browse files Browse the repository at this point in the history
* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* enable sending raw messages

* return to q

* return to q

* return to q

* return to q

* return dead to q

* allow no retries

* test - resend dead to queue

* test - resend dead to queue

* test - resend dead to queue

* test - resend dead to queue - fixes after cr

* test - resend dead to queue - fixes after cr

* test - resend dead to queue - fixes after cr
  • Loading branch information
adiweiss authored and Guy Baron committed Aug 7, 2019
1 parent d84bc71 commit 858d962
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 43 deletions.
5 changes: 3 additions & 2 deletions gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type BusConfiguration struct {
//Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus
type Bus interface {
HandlerRegister
RegisterDeadletterHandler
Deadlettering
BusSwitch
Messaging
SagaRegister
Expand Down Expand Up @@ -128,8 +128,9 @@ type Saga interface {
}

//RegisterDeadletterHandler provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
type RegisterDeadletterHandler interface {
type Deadlettering interface {
HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error)
ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
}

//RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess
Expand Down
6 changes: 3 additions & 3 deletions gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ func (builder *defaultBuilder) ConfigureHealthCheck(timeoutInSeconds time.Durati
}

func (builder *defaultBuilder) WithConfiguration(config gbus.BusConfiguration) gbus.Builder {
if config.MaxRetryCount > 0 {
gbus.MaxRetryCount = config.MaxRetryCount
}

gbus.MaxRetryCount = config.MaxRetryCount

if config.BaseRetryDuration > 0 {
gbus.BaseRetryDuration = time.Millisecond * time.Duration(config.BaseRetryDuration)
}
Expand Down
84 changes: 56 additions & 28 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,25 @@ func (b *DefaultBus) sendWithTx(ctx context.Context, ambientTx *sql.Tx, toServic
return b.withTx(send, ambientTx)
}

func (b *DefaultBus) returnDeadToQueue(ctx context.Context, ambientTx *sql.Tx, publishing *amqp.Publishing) error {
if !b.started {
return errors.New("bus not strated or already shutdown, make sure you call bus.Start() before sending messages")
}
//publishing.Headers.
exchange := fmt.Sprintf("%v", publishing.Headers["x-first-death-exchange"])
routingKey := fmt.Sprintf("%v", publishing.Headers["x-first-death-queue"])

delete(publishing.Headers, "x-death")
delete(publishing.Headers, "x-first-death-queue")
delete(publishing.Headers, "x-first-death-reason")
delete(publishing.Headers, "x-first-death-exchange")

send := func(tx *sql.Tx) error {
return b.publish(tx, exchange, routingKey, publishing)
}
return b.withTx(send, ambientTx)
}

//Publish implements GBus.Publish(topic, message)
func (b *DefaultBus) Publish(ctx context.Context, exchange, topic string, message *BusMessage, policies ...MessagePolicy) error {
return b.publishWithTx(ctx, nil, exchange, topic, message, policies...)
Expand Down Expand Up @@ -557,6 +576,11 @@ func (b *DefaultBus) HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Deli
b.deadletterHandler = handler
}

//ReturnDeadToQueue returns a message to its original destination
func (b *DefaultBus) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error {
return b.returnDeadToQueue(ctx, nil, publishing)
}

//RegisterSaga impements GBus.RegisterSaga
func (b *DefaultBus) RegisterSaga(saga Saga, conf ...SagaConfFn) error {
if b.Glue == nil {
Expand Down Expand Up @@ -598,6 +622,37 @@ func (b *DefaultBus) monitorAMQPErrors() {
}
}

func (b *DefaultBus) publish(tx *sql.Tx, exchange, routingKey string, msg *amqp.Publishing) error {
publish := func() error {
//send to the transactional outbox if the bus is transactional
//otherwise send directly to amqp
if b.IsTxnl && tx != nil {
b.Log().WithField("message_id", msg.MessageId).Debug("sending message to outbox")
saveErr := b.Outbox.Save(tx, exchange, routingKey, *msg)
if saveErr != nil {
b.Log().WithError(saveErr).Error("failed to save to transactional outbox")
}
return saveErr
}
//do not attempt to contact the borker if backpressure is being applied
if b.backpressure {
return errors.New("can't send message due to backpressure from amqp broker")
}
_, outgoingErr := b.Outgoing.Post(exchange, routingKey, *msg)
return outgoingErr
}
//currently only one thread can publish at a time
//TODO:add a publishing workers

err := b.SafeWithRetries(publish, MaxRetryCount)

if err != nil {
b.Log().Printf("failed publishing message.\n error:%v", err)
return err
}
return err
}

func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, replyTo, exchange, topic string, message *BusMessage, policies ...MessagePolicy) (er error) {
b.SenderLock.Lock()
defer b.SenderLock.Unlock()
Expand Down Expand Up @@ -649,34 +704,7 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply
key = topic
}

publish := func() error {
//send to the transactional outbox if the bus is transactional
//otherwise send directly to amqp
if b.IsTxnl && tx != nil {
b.Log().WithField("message_id", msg.MessageId).Debug("sending message to outbox")
saveErr := b.Outbox.Save(tx, exchange, key, msg)
if saveErr != nil {
b.Log().WithError(saveErr).Error("failed to save to transactional outbox")
}
return saveErr
}
//do not attempt to contact the borker if backpressure is being applied
if b.backpressure {
return errors.New("can't send message due to backpressure from amqp broker")
}
_, outgoingErr := b.Outgoing.Post(exchange, key, msg)
return outgoingErr
}
//currently only one thread can publish at a time
//TODO:add a publishing workers

err = b.SafeWithRetries(publish, MaxRetryCount)

if err != nil {
b.Log().Printf("failed publishing message.\n error:%v", err)
return err
}
return err
return b.publish(tx, exchange, key, &msg)
}

func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Message, handler MessageHandler) error {
Expand Down
70 changes: 67 additions & 3 deletions tests/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ func TestSubscribingOnTopic(t *testing.T) {

var (
handlerRetryProceed = make(chan bool)
attempts = 0
attempts = 0
)

func TestHandlerRetry(t *testing.T) {

c1 := Command1{}
Expand Down Expand Up @@ -231,8 +232,8 @@ func TestDeadlettering(t *testing.T) {
var waitgroup sync.WaitGroup
waitgroup.Add(2)
poision := gbus.NewBusMessage(PoisionMessage{})
service1 := createBusWithOptions(testSvc1, "grabbit-dead", true, true)
deadletterSvc := createBusWithOptions("deadletterSvc", "grabbit-dead", true, true)
service1 := createNamedBusForTest(testSvc1)
deadletterSvc := createNamedBusForTest("deadletterSvc")

deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error {
waitgroup.Done()
Expand Down Expand Up @@ -261,6 +262,51 @@ func TestDeadlettering(t *testing.T) {
}
}

func TestReturnDeadToQueue(t *testing.T) {

var visited bool
proceed := make(chan bool, 0)
poision := gbus.NewBusMessage(Command1{})

service1 := createBusWithConfig(testSvc1, "grabbit-dead", true, true,
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})

deadletterSvc := createBusWithConfig("deadletterSvc", "grabbit-dead", true, true,
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})

deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error {
pub := amqpDeliveryToPublishing(poision)
deadletterSvc.ReturnDeadToQueue(context.Background(), &pub)
return nil
}

faultyHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error {
if visited {
proceed <- true
return nil
}
visited = true
return errors.New("fail")
}

deadletterSvc.HandleDeadletter(deadMessageHandler)
service1.HandleMessage(Command1{}, faultyHandler)

deadletterSvc.Start()
defer deadletterSvc.Shutdown()
service1.Start()
defer service1.Shutdown()

service1.Send(context.Background(), testSvc1, poision)

select {
case <-proceed:
fmt.Println("success")
case <-time.After(2 * time.Second):
t.Fatal("timeout, failed to resend dead message to queue")
}
}

func TestRegistrationAfterBusStarts(t *testing.T) {
event := Event1{}
b := createBusForTest()
Expand Down Expand Up @@ -374,6 +420,24 @@ func noopTraceContext() context.Context {
// return ctx
}

func amqpDeliveryToPublishing(del amqp.Delivery) (pub amqp.Publishing) {
pub.Headers = del.Headers
pub.ContentType = del.ContentType
pub.ContentEncoding = del.ContentEncoding
pub.DeliveryMode = del.DeliveryMode
pub.Priority = del.Priority
pub.CorrelationId = del.CorrelationId
pub.ReplyTo = del.ReplyTo
pub.Expiration = del.Expiration
pub.MessageId = del.MessageId
pub.Timestamp = del.Timestamp
pub.Type = del.Type
pub.UserId = del.UserId
pub.AppId = del.AppId
pub.Body = del.Body
return
}

type panicPolicy struct {
}

Expand Down
14 changes: 7 additions & 7 deletions tests/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@ func init() {
testSvc3 = "testSvc3"
}

func createBusForTest() gbus.Bus {
return createNamedBusForTest(testSvc1)
}

func createBusWithOptions(svcName string, deadletter string, txnl, pos bool) gbus.Bus {
func createBusWithConfig(svcName string, deadletter string, txnl, pos bool, conf gbus.BusConfiguration) gbus.Bus {
busBuilder := builder.
New().
Bus(connStr).
WithPolicies(&policy.Durable{}, &policy.TTL{Duration: time.Second * 3600}).
WorkerNum(3, 1).
WithConfirms().
WithConfiguration(gbus.BusConfiguration{MaxRetryCount: 4, BaseRetryDuration: 15})
WithConfiguration(conf)

if txnl {
busBuilder = busBuilder.Txnl("mysql", "rhinof:rhinof@/rhinof")
Expand All @@ -46,6 +42,10 @@ func createBusWithOptions(svcName string, deadletter string, txnl, pos bool) gbu
return busBuilder.Build(svcName)
}

func createBusForTest() gbus.Bus {
return createNamedBusForTest(testSvc1)
}

func createNamedBusForTest(svcName string) gbus.Bus {
return createBusWithOptions(svcName, "dead-grabbit", true, true)
return createBusWithConfig(svcName, "dead-grabbit", true, true, gbus.BusConfiguration{MaxRetryCount: 4, BaseRetryDuration: 15})
}

0 comments on commit 858d962

Please sign in to comment.