Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable returning a message back from the dead to the queue #112

Merged
merged 25 commits into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions gbus/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type BusConfiguration struct {
//Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus
type Bus interface {
HandlerRegister
RegisterDeadletterHandler
Deadlettering
BusSwitch
Messaging
SagaRegister
Expand Down Expand Up @@ -124,8 +124,9 @@ type Saga interface {
}

//RegisterDeadletterHandler provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue
type RegisterDeadletterHandler interface {
type Deadlettering interface {
HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error)
ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
}

//RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess
Expand Down
6 changes: 3 additions & 3 deletions gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ func (builder *defaultBuilder) ConfigureHealthCheck(timeoutInSeconds time.Durati
}

func (builder *defaultBuilder) WithConfiguration(config gbus.BusConfiguration) gbus.Builder {
if config.MaxRetryCount > 0 {
gbus.MaxRetryCount = config.MaxRetryCount
}

gbus.MaxRetryCount = config.MaxRetryCount

if config.BaseRetryDuration > 0 {
gbus.BaseRetryDuration = time.Millisecond * time.Duration(config.BaseRetryDuration)
}
Expand Down
84 changes: 56 additions & 28 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,25 @@ func (b *DefaultBus) sendWithTx(ctx context.Context, ambientTx *sql.Tx, toServic
return b.withTx(send, ambientTx)
}

func (b *DefaultBus) returnDeadToQueue(ctx context.Context, ambientTx *sql.Tx, publishing *amqp.Publishing) error {
if !b.started {
return errors.New("bus not strated or already shutdown, make sure you call bus.Start() before sending messages")
}
//publishing.Headers.
exchange := fmt.Sprintf("%v", publishing.Headers["x-first-death-exchange"])
routingKey := fmt.Sprintf("%v", publishing.Headers["x-first-death-queue"])

delete(publishing.Headers, "x-death")
delete(publishing.Headers, "x-first-death-queue")
delete(publishing.Headers, "x-first-death-reason")
delete(publishing.Headers, "x-first-death-exchange")

send := func(tx *sql.Tx) error {
return b.publish(tx, exchange, routingKey, publishing)
}
return b.withTx(send, ambientTx)
}

//Publish implements GBus.Publish(topic, message)
func (b *DefaultBus) Publish(ctx context.Context, exchange, topic string, message *BusMessage, policies ...MessagePolicy) error {
return b.publishWithTx(ctx, nil, exchange, topic, message, policies...)
Expand Down Expand Up @@ -548,6 +567,11 @@ func (b *DefaultBus) HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Deli
b.deadletterHandler = handler
}

//ReturnDeadToQueue returns a message to its original destination
func (b *DefaultBus) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error {
return b.returnDeadToQueue(ctx, nil, publishing)
}

//RegisterSaga impements GBus.RegisterSaga
func (b *DefaultBus) RegisterSaga(saga Saga, conf ...SagaConfFn) error {
if b.Glue == nil {
Expand Down Expand Up @@ -589,6 +613,37 @@ func (b *DefaultBus) monitorAMQPErrors() {
}
}

func (b *DefaultBus) publish(tx *sql.Tx, exchange, routingKey string, msg *amqp.Publishing) error {
publish := func() error {
//send to the transactional outbox if the bus is transactional
//otherwise send directly to amqp
if b.IsTxnl && tx != nil {
b.Log().WithField("message_id", msg.MessageId).Debug("sending message to outbox")
saveErr := b.Outbox.Save(tx, exchange, routingKey, *msg)
if saveErr != nil {
b.Log().WithError(saveErr).Error("failed to save to transactional outbox")
}
return saveErr
}
//do not attempt to contact the borker if backpressure is being applied
if b.backpressure {
return errors.New("can't send message due to backpressure from amqp broker")
}
_, outgoingErr := b.Outgoing.Post(exchange, routingKey, *msg)
return outgoingErr
}
//currently only one thread can publish at a time
//TODO:add a publishing workers

err := b.SafeWithRetries(publish, MaxRetryCount)

if err != nil {
b.Log().Printf("failed publishing message.\n error:%v", err)
return err
}
return err
}

func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, replyTo, exchange, topic string, message *BusMessage, policies ...MessagePolicy) (er error) {
b.SenderLock.Lock()
defer b.SenderLock.Unlock()
Expand Down Expand Up @@ -640,34 +695,7 @@ func (b *DefaultBus) sendImpl(sctx context.Context, tx *sql.Tx, toService, reply
key = topic
}

publish := func() error {
//send to the transactional outbox if the bus is transactional
//otherwise send directly to amqp
if b.IsTxnl && tx != nil {
b.Log().WithField("message_id", msg.MessageId).Debug("sending message to outbox")
saveErr := b.Outbox.Save(tx, exchange, key, msg)
if saveErr != nil {
b.Log().WithError(saveErr).Error("failed to save to transactional outbox")
}
return saveErr
}
//do not attempt to contact the borker if backpressure is being applied
if b.backpressure {
return errors.New("can't send message due to backpressure from amqp broker")
}
_, outgoingErr := b.Outgoing.Post(exchange, key, msg)
return outgoingErr
}
//currently only one thread can publish at a time
//TODO:add a publishing workers

err = b.SafeWithRetries(publish, MaxRetryCount)

if err != nil {
b.Log().Printf("failed publishing message.\n error:%v", err)
return err
}
return err
return b.publish(tx, exchange, key, &msg)
}

func (b *DefaultBus) registerHandlerImpl(exchange, routingKey string, msg Message, handler MessageHandler) error {
Expand Down
70 changes: 67 additions & 3 deletions tests/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ func TestSubscribingOnTopic(t *testing.T) {

var (
handlerRetryProceed = make(chan bool)
attempts = 0
attempts = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by default, integers are set to 0

)

func TestHandlerRetry(t *testing.T) {

c1 := Command1{}
Expand Down Expand Up @@ -231,8 +232,8 @@ func TestDeadlettering(t *testing.T) {
var waitgroup sync.WaitGroup
waitgroup.Add(2)
poision := gbus.NewBusMessage(PoisionMessage{})
service1 := createBusWithOptions(testSvc1, "grabbit-dead", true, true)
deadletterSvc := createBusWithOptions("deadletterSvc", "grabbit-dead", true, true)
service1 := createNamedBusForTest(testSvc1)
deadletterSvc := createNamedBusForTest("deadletterSvc")

deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error {
waitgroup.Done()
Expand Down Expand Up @@ -261,6 +262,51 @@ func TestDeadlettering(t *testing.T) {
}
}

func TestReturnDeadToQueue(t *testing.T) {

var visited bool
proceed := make(chan bool, 0)
poision := gbus.NewBusMessage(Command1{})

service1 := createBusWithConfig(testSvc1, "grabbit-dead", true, true,
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})

deadletterSvc := createBusWithConfig("deadletterSvc", "grabbit-dead", true, true,
gbus.BusConfiguration{MaxRetryCount: 0, BaseRetryDuration: 0})

deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error {
pub := amqpDeliveryToPublishing(poision)
deadletterSvc.ReturnDeadToQueue(context.Background(), &pub)
return nil
}

faultyHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error {
if visited {
proceed <- true
return nil
}
visited = true
return errors.New("fail")
}

deadletterSvc.HandleDeadletter(deadMessageHandler)
service1.HandleMessage(Command1{}, faultyHandler)

deadletterSvc.Start()
defer deadletterSvc.Shutdown()
service1.Start()
defer service1.Shutdown()

service1.Send(context.Background(), testSvc1, poision)

select {
case <-proceed:
fmt.Println("success")
case <-time.After(2 * time.Second):
t.Fatal("timeout, failed to resend dead message to queue")
}
}

func TestRegistrationAfterBusStarts(t *testing.T) {
event := Event1{}
b := createBusForTest()
Expand Down Expand Up @@ -374,6 +420,24 @@ func noopTraceContext() context.Context {
// return ctx
}

func amqpDeliveryToPublishing(del amqp.Delivery) (pub amqp.Publishing) {
pub.Headers = del.Headers
pub.ContentType = del.ContentType
pub.ContentEncoding = del.ContentEncoding
pub.DeliveryMode = del.DeliveryMode
pub.Priority = del.Priority
pub.CorrelationId = del.CorrelationId
pub.ReplyTo = del.ReplyTo
pub.Expiration = del.Expiration
pub.MessageId = del.MessageId
pub.Timestamp = del.Timestamp
pub.Type = del.Type
pub.UserId = del.UserId
pub.AppId = del.AppId
pub.Body = del.Body
return
}

type panicPolicy struct {
}

Expand Down
14 changes: 7 additions & 7 deletions tests/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@ func init() {
testSvc3 = "testSvc3"
}

func createBusForTest() gbus.Bus {
return createNamedBusForTest(testSvc1)
}

func createBusWithOptions(svcName string, deadletter string, txnl, pos bool) gbus.Bus {
func createBusWithConfig(svcName string, deadletter string, txnl, pos bool, conf gbus.BusConfiguration) gbus.Bus {
busBuilder := builder.
New().
Bus(connStr).
WithPolicies(&policy.Durable{}, &policy.TTL{Duration: time.Second * 3600}).
WorkerNum(3, 1).
WithConfirms().
WithConfiguration(gbus.BusConfiguration{MaxRetryCount: 4, BaseRetryDuration: 15})
WithConfiguration(conf)

if txnl {
busBuilder = busBuilder.Txnl("mysql", "rhinof:rhinof@/rhinof")
Expand All @@ -46,6 +42,10 @@ func createBusWithOptions(svcName string, deadletter string, txnl, pos bool) gbu
return busBuilder.Build(svcName)
}

func createBusForTest() gbus.Bus {
return createNamedBusForTest(testSvc1)
}

func createNamedBusForTest(svcName string) gbus.Bus {
return createBusWithOptions(svcName, "dead-grabbit", true, true)
return createBusWithConfig(svcName, "dead-grabbit", true, true, gbus.BusConfiguration{MaxRetryCount: 4, BaseRetryDuration: 15})
}