Skip to content

Commit

Permalink
Add support for setting the idempotency key on a BusMessage (#208)
Browse files Browse the repository at this point in the history
* Added x-idempotency-key header and the ability for client code to set it

#106

* Set the value of BusMessage.ID as the default value of BusMessage.IdempotencyKey
  • Loading branch information
Guy Baron committed Oct 15, 2019
1 parent 46cabb4 commit d605440
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
10 changes: 10 additions & 0 deletions gbus/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gbus
import (
"errors"
"fmt"
"strings"

"github.com/opentracing/opentracing-go/log"
"github.com/rs/xid"
Expand All @@ -12,6 +13,7 @@ import (
//BusMessage the structure that gets sent to the underlying transport
type BusMessage struct {
ID string
IdempotencyKey string
CorrelationID string
SagaID string
SagaCorrelationID string
Expand All @@ -26,6 +28,7 @@ func NewBusMessage(payload Message) *BusMessage {
bm := &BusMessage{
ID: xid.New().String(),
}
bm.SetIdempotencyKey(bm.ID)
bm.SetPayload(payload)
return bm
}
Expand Down Expand Up @@ -57,6 +60,7 @@ func GetMessageName(delivery amqp.Delivery) string {
//GetAMQPHeaders convert to AMQP headers Table everything but a payload
func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table) {
headers = amqp.Table{}
headers["x-idempotency-key"] = bm.IdempotencyKey
headers["x-msg-saga-id"] = bm.SagaID
headers["x-msg-saga-correlation-id"] = bm.SagaCorrelationID
headers["x-grabbit-msg-rpc-id"] = bm.RPCID
Expand All @@ -68,6 +72,7 @@ func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table) {
//SetFromAMQPHeaders convert from AMQP headers Table everything but a payload
func (bm *BusMessage) SetFromAMQPHeaders(delivery amqp.Delivery) {
headers := delivery.Headers
bm.IdempotencyKey = castToString(headers["x-idempotency-key"])
bm.SagaID = castToString(headers["x-msg-saga-id"])
bm.SagaCorrelationID = castToString(headers["x-msg-saga-correlation-id"])
bm.RPCID = castToString(headers["x-grabbit-msg-rpc-id"])
Expand All @@ -81,6 +86,10 @@ func (bm *BusMessage) SetPayload(payload Message) {
bm.Payload = payload
}

func (bm *BusMessage) SetIdempotencyKey(idempotencyKey string) {
bm.IdempotencyKey = strings.TrimSpace(idempotencyKey)
}

//TargetSaga allows sending the message to a specific Saga instance
func (bm *BusMessage) TargetSaga(sagaID string) {
bm.SagaCorrelationID = sagaID
Expand All @@ -91,6 +100,7 @@ func (bm *BusMessage) GetTraceLog() (fields []log.Field) {
return []log.Field{
log.String("message", bm.PayloadFQN),
log.String("ID", bm.ID),
log.String("IdempotencyKey", bm.IdempotencyKey),
log.String("SagaID", bm.SagaID),
log.String("CorrelationID", bm.CorrelationID),
log.String("SagaCorrelationID", bm.SagaCorrelationID),
Expand Down
35 changes: 35 additions & 0 deletions tests/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -760,6 +761,40 @@ func TestSanitizingSvcName(t *testing.T) {
fmt.Println("succeeded sanitizing service name")
}

func TestIdempotencyKeyHeaders(t *testing.T) {
var wg sync.WaitGroup
wg.Add(2)
keys := make([]string, 0)
handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error {
keys = append(keys, message.IdempotencyKey)
wg.Done()
return nil
}

bus := createNamedBusForTest(testSvc1)

bus.HandleMessage(Command1{}, handler)
bus.Start()
defer bus.Shutdown()

cmd1 := gbus.NewBusMessage(Command1{})
cmd1.SetIdempotencyKey("some-unique-key")

cmd2 := gbus.NewBusMessage(Command1{})
cmd2.SetIdempotencyKey("some-unique-key")

//send two commands with the same IdempotencyKey to test that the same IdempotencyKey is propogated
bus.Send(context.Background(), testSvc1, cmd1)
bus.Send(context.Background(), testSvc1, cmd2)

wg.Wait()

if keys[0] != keys[1] && keys[0] != "" {
t.Errorf("expected same IdempotencyKey. actual key1:%s, key2%s", keys[0], keys[1])
}

}

func amqpDeliveryToPublishing(del *amqp.Delivery) (pub amqp.Publishing) {
pub.Headers = del.Headers
pub.ContentType = del.ContentType
Expand Down

0 comments on commit d605440

Please sign in to comment.