From d605440cde01c1ba5aacf65660d3d12f17f7a366 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Tue, 15 Oct 2019 21:21:23 +0300 Subject: [PATCH] Add support for setting the idempotency key on a BusMessage (#208) * Added x-idempotency-key header and the ability for client code to set it https://github.com/wework/grabbit/issues/106 * Set the value of BusMessage.ID as the default value of BusMessage.IdempotencyKey --- gbus/messages.go | 10 ++++++++++ tests/bus_test.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/gbus/messages.go b/gbus/messages.go index d6c0bcb..eeaaa51 100644 --- a/gbus/messages.go +++ b/gbus/messages.go @@ -3,6 +3,7 @@ package gbus import ( "errors" "fmt" + "strings" "github.com/opentracing/opentracing-go/log" "github.com/rs/xid" @@ -12,6 +13,7 @@ import ( //BusMessage the structure that gets sent to the underlying transport type BusMessage struct { ID string + IdempotencyKey string CorrelationID string SagaID string SagaCorrelationID string @@ -26,6 +28,7 @@ func NewBusMessage(payload Message) *BusMessage { bm := &BusMessage{ ID: xid.New().String(), } + bm.SetIdempotencyKey(bm.ID) bm.SetPayload(payload) return bm } @@ -57,6 +60,7 @@ func GetMessageName(delivery amqp.Delivery) string { //GetAMQPHeaders convert to AMQP headers Table everything but a payload func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table) { headers = amqp.Table{} + headers["x-idempotency-key"] = bm.IdempotencyKey headers["x-msg-saga-id"] = bm.SagaID headers["x-msg-saga-correlation-id"] = bm.SagaCorrelationID headers["x-grabbit-msg-rpc-id"] = bm.RPCID @@ -68,6 +72,7 @@ func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table) { //SetFromAMQPHeaders convert from AMQP headers Table everything but a payload func (bm *BusMessage) SetFromAMQPHeaders(delivery amqp.Delivery) { headers := delivery.Headers + bm.IdempotencyKey = castToString(headers["x-idempotency-key"]) bm.SagaID = castToString(headers["x-msg-saga-id"]) bm.SagaCorrelationID = castToString(headers["x-msg-saga-correlation-id"]) bm.RPCID = castToString(headers["x-grabbit-msg-rpc-id"]) @@ -81,6 +86,10 @@ func (bm *BusMessage) SetPayload(payload Message) { bm.Payload = payload } +func (bm *BusMessage) SetIdempotencyKey(idempotencyKey string) { + bm.IdempotencyKey = strings.TrimSpace(idempotencyKey) +} + //TargetSaga allows sending the message to a specific Saga instance func (bm *BusMessage) TargetSaga(sagaID string) { bm.SagaCorrelationID = sagaID @@ -91,6 +100,7 @@ func (bm *BusMessage) GetTraceLog() (fields []log.Field) { return []log.Field{ log.String("message", bm.PayloadFQN), log.String("ID", bm.ID), + log.String("IdempotencyKey", bm.IdempotencyKey), log.String("SagaID", bm.SagaID), log.String("CorrelationID", bm.CorrelationID), log.String("SagaCorrelationID", bm.SagaCorrelationID), diff --git a/tests/bus_test.go b/tests/bus_test.go index d546ba4..c8cd443 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "reflect" + "sync" "testing" "time" @@ -760,6 +761,40 @@ func TestSanitizingSvcName(t *testing.T) { fmt.Println("succeeded sanitizing service name") } +func TestIdempotencyKeyHeaders(t *testing.T) { + var wg sync.WaitGroup + wg.Add(2) + keys := make([]string, 0) + handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error { + keys = append(keys, message.IdempotencyKey) + wg.Done() + return nil + } + + bus := createNamedBusForTest(testSvc1) + + bus.HandleMessage(Command1{}, handler) + bus.Start() + defer bus.Shutdown() + + cmd1 := gbus.NewBusMessage(Command1{}) + cmd1.SetIdempotencyKey("some-unique-key") + + cmd2 := gbus.NewBusMessage(Command1{}) + cmd2.SetIdempotencyKey("some-unique-key") + + //send two commands with the same IdempotencyKey to test that the same IdempotencyKey is propogated + bus.Send(context.Background(), testSvc1, cmd1) + bus.Send(context.Background(), testSvc1, cmd2) + + wg.Wait() + + if keys[0] != keys[1] && keys[0] != "" { + t.Errorf("expected same IdempotencyKey. actual key1:%s, key2%s", keys[0], keys[1]) + } + +} + func amqpDeliveryToPublishing(del *amqp.Delivery) (pub amqp.Publishing) { pub.Headers = del.Headers pub.ContentType = del.ContentType