From c875004cf9279c1407fabc5797d7b92490381c42 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Tue, 7 May 2019 18:40:50 +0300 Subject: [PATCH 01/17] added a test for health prob and fixed a minor race condition in worker initalization --- gbus/bus.go | 20 ++++++++++---------- gbus/worker.go | 3 ++- tests/bus_test.go | 15 +++++++++++++++ 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/gbus/bus.go b/gbus/bus.go index 54a2368..d8a3e01 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -57,8 +57,8 @@ type DefaultBus struct { Confirm bool healthChan chan error backpreasure bool - rabbitFailure bool DbPingTimeout time.Duration + amqpConnected bool } var ( @@ -257,7 +257,7 @@ func (b *DefaultBus) Start() error { //start monitoring on amqp related errors go b.monitorAMQPErrors() //start consuming messags from service queue - + b.amqpConnected = true return nil } @@ -293,12 +293,11 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { serializer: b.Serializer, b: b, amqpErrors: b.amqpErrors} - go func() { - err := w.Start() - if err != nil { - log.WithError(err) - } - }() + + err := w.Start() + if err != nil { + log.WithError(err).Error("failed to start worker") + } workers = append(workers, w) } @@ -321,6 +320,7 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) { err := worker.Stop() if err != nil { b.log().WithError(err).Error("could not stop worker") + return err } } b.Outgoing.shutdown() @@ -359,7 +359,7 @@ func (b *DefaultBus) GetHealth() HealthCard { return HealthCard{ DbConnected: dbConnected, RabbitBackPressure: b.backpreasure, - RabbitConnected: !b.rabbitFailure, + RabbitConnected: b.amqpConnected, } } @@ -577,7 +577,7 @@ func (b *DefaultBus) monitorAMQPErrors() { } b.backpreasure = blocked.Active case amqpErr := <-b.amqpErrors: - b.rabbitFailure = true + b.amqpConnected = false b.log().WithField("amqp_error", amqpErr).Error("amqp error") if b.healthChan != nil { b.healthChan <- amqpErr diff --git a/gbus/worker.go b/gbus/worker.go index d38cc64..0f798c3 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -47,6 +47,7 @@ type worker struct { func (worker *worker) Start() error { worker.log().Info("starting worker") + worker.stop = make(chan bool) worker.channel.NotifyClose(worker.amqpErrors) var ( @@ -62,7 +63,7 @@ func (worker *worker) Start() error { } worker.messages = messages worker.rpcMessages = rpcmsgs - worker.stop = make(chan bool) + go worker.consumeMessages() return nil diff --git a/tests/bus_test.go b/tests/bus_test.go index ff0fb51..037dffb 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -323,6 +323,21 @@ func TestSendingPanic(t *testing.T) { } } +func TestHealthCheck(t *testing.T) { + svc1 := createNamedBusForTest(testSvc1) + err := svc1.Start() + if err != nil { + t.Error(err.Error()) + } + defer svc1.Shutdown() + health := svc1.GetHealth() + + fmt.Printf("%v", health) + if !health.DbConnected || !health.RabbitConnected || health.RabbitBackPressure { + t.Error("bus expected to be healthy but failed health check") + } +} + func noopTraceContext() context.Context { return context.Background() // tracer := opentracing.NoopTracer{} From 592287c930a4b399b38e674ce60e69aca8a05cd1 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Tue, 7 May 2019 19:53:47 +0300 Subject: [PATCH 02/17] fixed builder to set default bolicies when building the bus and changed tests to run with multiple workers --- gbus/builder/builder.go | 2 +- tests/consts.go | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index b57ee3c..c09cfd7 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -49,7 +49,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { RPCHandlers: make(map[string]gbus.MessageHandler), Serializer: builder.serializer, DLX: builder.dlx, - DefaultPolicies: make([]gbus.MessagePolicy, 0), + DefaultPolicies: builder.defaultPolicies, DbPingTimeout: 3} gb.Confirm = builder.confirm diff --git a/tests/consts.go b/tests/consts.go index 3234a4a..365ef06 100644 --- a/tests/consts.go +++ b/tests/consts.go @@ -1,6 +1,8 @@ package tests import ( + "time" + "github.com/wework/grabbit/gbus" "github.com/wework/grabbit/gbus/builder" "github.com/wework/grabbit/gbus/policy" @@ -26,7 +28,8 @@ func createBusWithOptions(svcName string, deadletter string, txnl, pos bool) gbu busBuilder := builder. New(). Bus(connStr). - WithPolicies(&policy.Durable{}). + WithPolicies(&policy.Durable{}, &policy.TTL{Duration: time.Second * 3600}). + WorkerNum(3, 1). WithConfirms() if txnl { From a3069de2e5ad356e7a502f2b0c067a1a1346d900 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Tue, 7 May 2019 20:22:24 +0300 Subject: [PATCH 03/17] improved deadlettering test the test now validates that messages get rejected when handlers return an error and all rety attempts did not sccueed --- tests/bus_test.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/tests/bus_test.go b/tests/bus_test.go index 037dffb..eb21c5b 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -3,8 +3,10 @@ package tests import ( "context" "database/sql" + "errors" "fmt" "reflect" + "sync" "testing" "time" @@ -209,28 +211,34 @@ func TestRPC(t *testing.T) { } func TestDeadlettering(t *testing.T) { + + var waitgroup sync.WaitGroup + waitgroup.Add(2) poision := gbus.NewBusMessage(PoisionMessage{}) service1 := createBusWithOptions(testSvc1, "grabbit-dead", true, true) deadletterSvc := createBusWithOptions("deadletterSvc", "grabbit-dead", true, true) - proceed := make(chan bool) - handler := func(tx *sql.Tx, poision amqp.Delivery) error { - proceed <- true + + deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error { + waitgroup.Done() return nil } - deadletterSvc.HandleDeadletter(handler) + faultyHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error { + return errors.New("fail") + } + + deadletterSvc.HandleDeadletter(deadMessageHandler) + service1.HandleMessage(Command1{}, faultyHandler) deadletterSvc.Start() defer deadletterSvc.Shutdown() service1.Start() defer service1.Shutdown() - e := service1.Send(context.Background(), testSvc1, poision) - if e != nil { - log.Printf("send error: %v", e) - } + service1.Send(context.Background(), testSvc1, poision) + service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{})) - <-proceed + waitgroup.Wait() } func TestRegistrationAfterBusStarts(t *testing.T) { From 905f904464b40cf73ebf10eaa7abde0609986c41 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Thu, 9 May 2019 11:24:26 +0300 Subject: [PATCH 04/17] added tests for protobuf serlializer --- tests/protoMessages.pb.go | 86 ++++++++++++++++++++++++++++++++ tests/protoMessages.proto | 7 +++ tests/protoMessagesBase.go | 5 ++ tests/protoSerialization_test.go | 49 ++++++++++++++++++ 4 files changed, 147 insertions(+) create mode 100644 tests/protoMessages.pb.go create mode 100644 tests/protoMessages.proto create mode 100644 tests/protoMessagesBase.go create mode 100644 tests/protoSerialization_test.go diff --git a/tests/protoMessages.pb.go b/tests/protoMessages.pb.go new file mode 100644 index 0000000..37aedc5 --- /dev/null +++ b/tests/protoMessages.pb.go @@ -0,0 +1,86 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: protoMessages.proto + +package tests + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type ProtoCommand struct { + SomeNumber int64 `protobuf:"varint,1,opt,name=some_number,json=someNumber,proto3" json:"some_number,omitempty"` + SomeData string `protobuf:"bytes,2,opt,name=some_data,json=someData,proto3" json:"some_data,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ProtoCommand) Reset() { *m = ProtoCommand{} } +func (m *ProtoCommand) String() string { return proto.CompactTextString(m) } +func (*ProtoCommand) ProtoMessage() {} +func (*ProtoCommand) Descriptor() ([]byte, []int) { + return fileDescriptor_a1929c2ebf17cd0b, []int{0} +} + +func (m *ProtoCommand) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ProtoCommand.Unmarshal(m, b) +} +func (m *ProtoCommand) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ProtoCommand.Marshal(b, m, deterministic) +} +func (m *ProtoCommand) XXX_Merge(src proto.Message) { + xxx_messageInfo_ProtoCommand.Merge(m, src) +} +func (m *ProtoCommand) XXX_Size() int { + return xxx_messageInfo_ProtoCommand.Size(m) +} +func (m *ProtoCommand) XXX_DiscardUnknown() { + xxx_messageInfo_ProtoCommand.DiscardUnknown(m) +} + +var xxx_messageInfo_ProtoCommand proto.InternalMessageInfo + +func (m *ProtoCommand) GetSomeNumber() int64 { + if m != nil { + return m.SomeNumber + } + return 0 +} + +func (m *ProtoCommand) GetSomeData() string { + if m != nil { + return m.SomeData + } + return "" +} + +func init() { + proto.RegisterType((*ProtoCommand)(nil), "tests.ProtoCommand") +} + +func init() { proto.RegisterFile("protoMessages.proto", fileDescriptor_a1929c2ebf17cd0b) } + +var fileDescriptor_a1929c2ebf17cd0b = []byte{ + // 119 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2e, 0x28, 0xca, 0x2f, + 0xc9, 0xf7, 0x4d, 0x2d, 0x2e, 0x4e, 0x4c, 0x4f, 0x2d, 0xd6, 0x03, 0xf3, 0x84, 0x58, 0x4b, 0x52, + 0x8b, 0x4b, 0x8a, 0x95, 0x7c, 0xb8, 0x78, 0x02, 0x40, 0x7c, 0xe7, 0xfc, 0xdc, 0xdc, 0xc4, 0xbc, + 0x14, 0x21, 0x79, 0x2e, 0xee, 0xe2, 0xfc, 0xdc, 0xd4, 0xf8, 0xbc, 0xd2, 0xdc, 0xa4, 0xd4, 0x22, + 0x09, 0x46, 0x05, 0x46, 0x0d, 0xe6, 0x20, 0x2e, 0x90, 0x90, 0x1f, 0x58, 0x44, 0x48, 0x9a, 0x8b, + 0x13, 0xac, 0x20, 0x25, 0xb1, 0x24, 0x51, 0x82, 0x49, 0x81, 0x51, 0x83, 0x33, 0x88, 0x03, 0x24, + 0xe0, 0x92, 0x58, 0x92, 0x98, 0xc4, 0x06, 0x36, 0xdb, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0xe2, + 0x61, 0x85, 0xcd, 0x72, 0x00, 0x00, 0x00, +} diff --git a/tests/protoMessages.proto b/tests/protoMessages.proto new file mode 100644 index 0000000..bddad18 --- /dev/null +++ b/tests/protoMessages.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; +package tests; + +message ProtoCommand { + int64 some_number = 1; + string some_data = 2; +} diff --git a/tests/protoMessagesBase.go b/tests/protoMessagesBase.go new file mode 100644 index 0000000..16462ed --- /dev/null +++ b/tests/protoMessagesBase.go @@ -0,0 +1,5 @@ +package tests + +func (*ProtoCommand) SchemaName() string { + return "ProtoCommand" +} diff --git a/tests/protoSerialization_test.go b/tests/protoSerialization_test.go new file mode 100644 index 0000000..9e14a9c --- /dev/null +++ b/tests/protoSerialization_test.go @@ -0,0 +1,49 @@ +package tests + +import ( + "reflect" + "testing" + + log "github.com/sirupsen/logrus" + "github.com/wework/grabbit/gbus/serialization" +) + +func TestProtoSerialization(t *testing.T) { + + logger := log.WithField("test", "proto_serialization") + serializer := serialization.NewProtoSerializer(logger) + cmd := &ProtoCommand{} + schemaName := cmd.SchemaName() + cmd.SomeNumber = 15 + cmd.SomeData = "rhinof" + encodedBytes, encErr := serializer.Encode(cmd) + + if encErr != nil { + t.Errorf("encoding returned an error: %v", encErr) + } + + //Calling Decode with out first registering the schema should fail and return an error + _, decErr := serializer.Decode(encodedBytes, schemaName) + if decErr == nil { + t.Error("decoding expected to fail but did not return an error") + } + + serializer.Register(cmd) + decodedMsg, noErr := serializer.Decode(encodedBytes, schemaName) + if noErr != nil { + t.Errorf("decoding of message failed with error:%v", noErr) + } + + cmdCopy, ok := decodedMsg.(*ProtoCommand) + + if !ok { + t.Errorf("decoded message was of wrong type. expected:%v actual:%v", reflect.TypeOf(cmd), reflect.TypeOf(decodedMsg)) + } + + if cmdCopy.SomeNumber != cmd.SomeNumber || cmdCopy.SomeData != cmd.SomeData { + log.Infof("expected:%v\n", cmd) + log.Infof("actual:%v\n", cmdCopy) + t.Errorf("decoded message has unexpected or missing data") + } + +} From 9e5407bb675d5070df1155d3ac7a32737bafdd56 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Thu, 9 May 2019 17:52:04 +0300 Subject: [PATCH 05/17] added tests for protobuf serializer error scenarios --- tests/protoSerialization_test.go | 57 +++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/tests/protoSerialization_test.go b/tests/protoSerialization_test.go index 9e14a9c..f1e4b0b 100644 --- a/tests/protoSerialization_test.go +++ b/tests/protoSerialization_test.go @@ -8,34 +8,46 @@ import ( "github.com/wework/grabbit/gbus/serialization" ) -func TestProtoSerialization(t *testing.T) { +func getProtoCommand() *ProtoCommand { + return &ProtoCommand{ + SomeNumber: 15, + SomeData: "rhinof"} +} +func TestProtoSerialization(t *testing.T) { + // so the tests logs do not get littered with log entries from serlializer + log.SetLevel(log.PanicLevel) + defer log.SetLevel(log.InfoLevel) logger := log.WithField("test", "proto_serialization") + serializer := serialization.NewProtoSerializer(logger) - cmd := &ProtoCommand{} + + name := serializer.Name() + if name != "proto" { + t.Fatalf("incorrect serializer name. expected:proto actual:%s", name) + } + cmd := getProtoCommand() schemaName := cmd.SchemaName() - cmd.SomeNumber = 15 - cmd.SomeData = "rhinof" - encodedBytes, encErr := serializer.Encode(cmd) + encodedBytes, encErr := serializer.Encode(cmd) if encErr != nil { - t.Errorf("encoding returned an error: %v", encErr) + t.Fatalf("encoding returned an error: %v", encErr) } //Calling Decode with out first registering the schema should fail and return an error _, decErr := serializer.Decode(encodedBytes, schemaName) if decErr == nil { - t.Error("decoding expected to fail but did not return an error") + t.Fatalf("decoding expected to fail but did not return an error") } serializer.Register(cmd) + decodedMsg, noErr := serializer.Decode(encodedBytes, schemaName) if noErr != nil { - t.Errorf("decoding of message failed with error:%v", noErr) + t.Fatalf("decoding of message failed with error:%v", noErr) } cmdCopy, ok := decodedMsg.(*ProtoCommand) - if !ok { t.Errorf("decoded message was of wrong type. expected:%v actual:%v", reflect.TypeOf(cmd), reflect.TypeOf(decodedMsg)) } @@ -47,3 +59,30 @@ func TestProtoSerialization(t *testing.T) { } } + +func TestProtoSerializationErrors(t *testing.T) { + // so the tests logs do not get littered with log entries from serlializer + log.SetLevel(log.PanicLevel) + defer log.SetLevel(log.InfoLevel) + logger := log.WithField("test", "proto_serialization") + + serializer := serialization.NewProtoSerializer(logger) + + // test that encoding a non protobuf generated strcut fails and returns an error + _, encErr := serializer.Encode(Command1{}) + if encErr == nil { + t.Errorf("serializer expected to return an error for non proto generated messages but di not") + } + + cmd := getProtoCommand() + encodedBytes, encErr := serializer.Encode(cmd) + if encErr != nil { + t.Fatalf("encoding returned an error: %v", encErr) + } + + //decoding an unregistered schema fails and returns an error + if _, decErr := serializer.Decode(encodedBytes, "kong"); decErr == nil { + t.Errorf("decoding an unregistred schema is expected to return an error but did not") + } + +} From c1040a57f17cb9d8ecce65a3f35782c488064746 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Thu, 9 May 2019 18:01:07 +0300 Subject: [PATCH 06/17] added more error testing for proto serializer --- tests/protoSerialization_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/protoSerialization_test.go b/tests/protoSerialization_test.go index f1e4b0b..ef5c91e 100644 --- a/tests/protoSerialization_test.go +++ b/tests/protoSerialization_test.go @@ -1,6 +1,7 @@ package tests import ( + "crypto/rand" "reflect" "testing" @@ -85,4 +86,11 @@ func TestProtoSerializationErrors(t *testing.T) { t.Errorf("decoding an unregistred schema is expected to return an error but did not") } + //decoding junk fails and returns an error + junk := make([]byte, 16) + rand.Read(junk) + if _, decErr := serializer.Decode(junk, cmd.SchemaName()); decErr == nil { + t.Errorf("decoding junk is expected to return an error but did not") + } + } From 5490d3d5892f610bf84a4f9f2619a51dfd7ddd1b Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Thu, 9 May 2019 18:10:54 +0300 Subject: [PATCH 07/17] fixed validation in proto serialization error tests --- tests/protoSerialization_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/protoSerialization_test.go b/tests/protoSerialization_test.go index ef5c91e..efe4524 100644 --- a/tests/protoSerialization_test.go +++ b/tests/protoSerialization_test.go @@ -86,6 +86,7 @@ func TestProtoSerializationErrors(t *testing.T) { t.Errorf("decoding an unregistred schema is expected to return an error but did not") } + serializer.Register(cmd) //decoding junk fails and returns an error junk := make([]byte, 16) rand.Read(junk) From 6d1480db0bd49c36ae34a140a4226bb0a00adbb1 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Mon, 13 May 2019 09:55:00 +0300 Subject: [PATCH 08/17] fixing a bug where saga configuration functions were run only when saga was created. This fixes #74 --- gbus/saga/def.go | 10 +++- gbus/saga/glue.go | 4 +- gbus/saga/instance.go | 6 +-- tests/saga_test.go | 121 ++++++++++++++++++++++++++++++++++-------- 4 files changed, 113 insertions(+), 28 deletions(-) diff --git a/gbus/saga/def.go b/gbus/saga/def.go index b09721f..e390817 100644 --- a/gbus/saga/def.go +++ b/gbus/saga/def.go @@ -66,8 +66,16 @@ func getFunNameFromHandler(handler gbus.MessageHandler) string { } func (sd *Def) newInstance() *Instance { - return NewInstance(sd.sagaType, sd.msgToFunc, sd.sagaConfFns...) + instance := NewInstance(sd.sagaType, sd.msgToFunc) + return sd.configureSaga(instance) +} +func (sd *Def) configureSaga(instance *Instance) *Instance { + saga := instance.UnderlyingInstance + for _, conf := range sd.sagaConfFns { + saga = conf(saga) + } + return instance } func (sd *Def) shouldStartNewSaga(message *gbus.BusMessage) bool { diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index 6d68ff7..611cdd5 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -154,7 +154,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) e := fmt.Errorf("Warning:Failed message routed with SagaCorrelationID:%v but no saga instance with the same id found ", message.SagaCorrelationID) return e } - + def.configureSaga(instance) if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil { imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga") return invkErr @@ -176,7 +176,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) imsm.log().WithFields(log.Fields{"message": msgName, "instances_fetched": len(instances)}).Info("fetched saga instances") for _, instance := range instances { - + def.configureSaga(instance) if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil { imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga") return invkErr diff --git a/gbus/saga/instance.go b/gbus/saga/instance.go index 5cdf99d..f8303d9 100644 --- a/gbus/saga/instance.go +++ b/gbus/saga/instance.go @@ -79,7 +79,7 @@ func (si *Instance) requestsTimeout() (bool, time.Duration) { //NewInstance create a new instance of a Saga -func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair, confFns ...gbus.SagaConfFn) *Instance { +func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair) *Instance { var newSagaPtr interface{} if sagaType.Kind() == reflect.Ptr { @@ -91,9 +91,7 @@ func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair, confFns saga := newSagaPtr.(gbus.Saga) newSaga := saga.New() - for _, conf := range confFns { - newSaga = conf(newSaga) - } + //newSagaPtr := reflect.New(sagaType).Elem() newInstance := &Instance{ ID: xid.New().String(), diff --git a/tests/saga_test.go b/tests/saga_test.go index 3c86325..74cf2b8 100644 --- a/tests/saga_test.go +++ b/tests/saga_test.go @@ -257,6 +257,54 @@ func TestSagaSelfMessaging(t *testing.T) { } +func TestSagaConfFunctions(t *testing.T) { + proceed := make(chan bool) + fail := make(chan bool) + + b := createNamedBusForTest(testSvc1) + + handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error { + + _, routingKey := invocation.Routing() + + if routingKey == "saga.config.functions.not.run" { + fail <- true + } else { + + proceed <- true + } + + return nil + } + + err := b.HandleEvent("test_exchange", "saga.config.functions.not.run", Event1{}, handler) + if err != nil { + t.Fatalf("Registering handler returned false, expected true with error: %s", err.Error()) + } + + err = b.HandleEvent("test_exchange", "saga.config.functions.executed", Event1{}, handler) + if err != nil { + t.Fatalf("Registering handler returned false, expected true with error: %s", err.Error()) + } + + b.RegisterSaga(&ConfigurableSaga{}, func(saga gbus.Saga) gbus.Saga { + sagaInstance := saga.(*ConfigurableSaga) + sagaInstance.nonPersistedSField = "rhinof" + return sagaInstance + }) + + b.Start() + defer b.Shutdown() + + b.Send(context.TODO(), testSvc1, gbus.NewBusMessage(Command1{})) + select { + case <-fail: + t.Fatalf("saga configurtion functions not executed") + case <-proceed: + } + +} + /*Test Sagas*/ type SagaA struct { @@ -279,39 +327,25 @@ func (s *SagaA) New() gbus.Saga { } func (s *SagaA) RegisterAllHandlers(register gbus.HandlerRegister) { - register.HandleMessage(Command1{ - Data: "SagaA.RegisterAllHandlers", - }, s.HandleCommand1) - register.HandleMessage(Command2{ - Data: "SagaA.RegisterAllHandlers", - }, s.HandleCommand2) - register.HandleEvent("test_exchange", "some.topic.1", Event1{ - Data: "SagaA.RegisterAllHandlers", - }, s.HandleEvent1) - register.HandleEvent("test_exchange", "some.topic.2", Event2{ - Data: "SagaA.RegisterAllHandlers", - }, s.HandleEvent1) + register.HandleMessage(Command1{}, s.HandleCommand1) + register.HandleMessage(Command2{}, s.HandleCommand2) + register.HandleEvent("test_exchange", "some.topic.1", Event1{}, s.HandleEvent1) + register.HandleEvent("test_exchange", "some.topic.2", Event2{}, s.HandleEvent1) } func (s *SagaA) HandleCommand1(invocation gbus.Invocation, message *gbus.BusMessage) error { - reply := gbus.NewBusMessage(Reply1{ - Data: "SagaA.HandleCommand1", - }) + reply := gbus.NewBusMessage(Reply1{}) return invocation.Reply(noopTraceContext(), reply) } func (s *SagaA) HandleCommand2(invocation gbus.Invocation, message *gbus.BusMessage) error { log.Println("command2 received") - reply := gbus.NewBusMessage(Reply2{ - Data: "SagaA.HandleCommand2", - }) + reply := gbus.NewBusMessage(Reply2{}) return invocation.Reply(noopTraceContext(), reply) } func (s *SagaA) HandleEvent1(invocation gbus.Invocation, message *gbus.BusMessage) error { - reply := gbus.NewBusMessage(Reply2{ - Data: "SagaA.HandleEvent1", - }) + reply := gbus.NewBusMessage(Reply2{}) log.Println("event1 received") return invocation.Reply(noopTraceContext(), reply) } @@ -440,3 +474,48 @@ func (s *SelfSendingSaga) HandleReply2(invocation gbus.Invocation, message *gbus evt1 := gbus.NewBusMessage(Event1{}) return invocation.Bus().Publish(invocation.Ctx(), "test_exchange", "test_topic", evt1) } + +type ConfigurableSaga struct { + //this field should be set via a saga configuration function + nonPersistedSField string + Complete bool +} + +func (*ConfigurableSaga) StartedBy() []gbus.Message { + starters := make([]gbus.Message, 0) + return append(starters, Command1{}) +} + +func (s *ConfigurableSaga) RegisterAllHandlers(register gbus.HandlerRegister) { + register.HandleMessage(Command1{}, s.HandleCommand1) + register.HandleMessage(Command2{}, s.HandleCommand2) +} + +func (s *ConfigurableSaga) HandleCommand1(invocation gbus.Invocation, message *gbus.BusMessage) error { + + if s.nonPersistedSField == "" { + invocation.Bus().Publish(invocation.Ctx(), "test_exchange", "saga.config.functions.not.run", gbus.NewBusMessage(Event1{})) + return nil + } + + _, selfService := invocation.Routing() + invocation.Bus().Send(invocation.Ctx(), selfService, gbus.NewBusMessage(Command2{})) + return nil +} + +func (s *ConfigurableSaga) HandleCommand2(invocation gbus.Invocation, message *gbus.BusMessage) error { + if s.nonPersistedSField == "" { + invocation.Bus().Publish(invocation.Ctx(), "test_exchange", "saga.config.functions.not.run", gbus.NewBusMessage(Event1{})) + return nil + } + invocation.Bus().Publish(invocation.Ctx(), "test_exchange", "saga.config.functions.executed", gbus.NewBusMessage(Event1{})) + return nil +} + +func (s *ConfigurableSaga) IsComplete() bool { + return s.Complete +} + +func (s *ConfigurableSaga) New() gbus.Saga { + return &ConfigurableSaga{} +} From fc06b816408bb013f3742e0e6f9a31d94d92dc0c Mon Sep 17 00:00:00 2001 From: adiweiss <46281796+adiweiss@users.noreply.github.com> Date: Wed, 15 May 2019 16:50:10 +0300 Subject: [PATCH 09/17] Update SAGA.md --- docs/SAGA.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/SAGA.md b/docs/SAGA.md index b53f062..1470038 100644 --- a/docs/SAGA.md +++ b/docs/SAGA.md @@ -186,7 +186,7 @@ So in order to fulfill our requerment we will need to add the following to our s ```go -func (s *BookVacationSaga) RequestTimeout() time.Duration { +func (s *BookVacationSaga) TimeoutDuration() time.Duration { //request to timeout if after 15 minutes the saga is not complete return time.Minute * 15 } From a019efbbcca012319d4b57a97e3939bfce7039e4 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Thu, 16 May 2019 12:59:04 +0300 Subject: [PATCH 10/17] increasing test timeouts to 60s --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 39e312f..687838d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -35,7 +35,7 @@ jobs: command: go get -v -t -d ./... - run: name: Run tests - command: go test -v -covermode=count -coverprofile=coverage.out -coverpkg=./... -timeout 30s ./... + command: go test -v -covermode=count -coverprofile=coverage.out -coverpkg=./... -timeout 60s ./... - run: name: Report Coverage command: | From 212181d482333977432dbd7584c22b93588db081 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sun, 19 May 2019 07:27:43 +0300 Subject: [PATCH 11/17] first commit towards supporting persistent timeouts Timeouts now are not passed as bus messages to avoid the need to deal with serialization. Timeout interface changed so that handling a timeout now accepts a transaction and a bus instance instead of an invocation and a bus message --- gbus/abstractions.go | 2 +- gbus/builder/builder.go | 2 +- gbus/saga/glue.go | 30 ++++++++++++++++++------------ gbus/saga/instance.go | 13 ++++++++++++- gbus/saga/timeout.go | 34 ++++++++++++++++++++++++---------- tests/saga_test.go | 7 +++---- 6 files changed, 59 insertions(+), 29 deletions(-) diff --git a/gbus/abstractions.go b/gbus/abstractions.go index b0a6faa..5f95632 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -126,7 +126,7 @@ type RegisterDeadletterHandler interface { //RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess type RequestSagaTimeout interface { TimeoutDuration() time.Duration - Timeout(invocation Invocation, message *BusMessage) error + Timeout(tx *sql.Tx, bus Messaging) error } //SagaConfFn is a function to allow configuration of a saga in the context of the gbus diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index c09cfd7..5711143 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -99,7 +99,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { panic(err) } } - gb.Glue = saga.NewGlue(gb, sagaStore, svcName) + gb.Glue = saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider) return gb } diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index 6d68ff7..6cd1628 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -75,15 +75,6 @@ func (imsm *Glue) RegisterSaga(saga gbus.Saga, conf ...gbus.SagaConfFn) error { WithFields(log.Fields{"saga_type": def.sagaType.String(), "handles_messages": len(msgNames)}). Info("registered saga with messages") - //register on timeout messages - timeoutEtfs, requestsTimeout := saga.(gbus.RequestSagaTimeout) - if requestsTimeout { - timeoutMessage := gbus.SagaTimeoutMessage{} - timeoutMsgName := timeoutMessage.SchemaName() - _ = def.HandleMessage(timeoutMessage, timeoutEtfs.Timeout) - imsm.addMsgNameToDef(timeoutMsgName, def) - - } return nil } @@ -236,20 +227,35 @@ func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) erro return imsm.bus.HandleEvent(exchange, topic, event, imsm.handler) } +func (imsm *Glue) timeoutSaga(tx *sql.Tx, sagaID string) error { + + saga, err := imsm.sagaStore.GetSagaByID(tx, sagaID) + if err != nil { + return err + } + timeoutErr := saga.timeout(tx, imsm.bus) + if timeoutErr != nil { + imsm.log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga") + return timeoutErr + } + return imsm.sagaStore.DeleteSaga(tx, saga) +} + func (imsm *Glue) log() *log.Entry { return log.WithField("_service", imsm.svcName) } //NewGlue creates a new Sagamanager -func NewGlue(bus gbus.Bus, sagaStore Store, svcName string) *Glue { - return &Glue{ +func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider) *Glue { + g := &Glue{ svcName: svcName, bus: bus, sagaDefs: make([]*Def, 0), lock: &sync.Mutex{}, alreadyRegistred: make(map[string]bool), msgToDefMap: make(map[string][]*Def), - timeoutManger: TimeoutManager{bus: bus}, sagaStore: sagaStore, } + g.timeoutManger = TimeoutManager{bus: bus, txp: txp, glue: g} + return g } diff --git a/gbus/saga/instance.go b/gbus/saga/instance.go index 5cdf99d..f40f5e8 100644 --- a/gbus/saga/instance.go +++ b/gbus/saga/instance.go @@ -1,6 +1,7 @@ package saga import ( + "database/sql" "fmt" "log" "reflect" @@ -16,6 +17,7 @@ type Instance struct { ConcurrencyCtrl int UnderlyingInstance gbus.Saga MsgToMethodMap []*MsgToFuncPair + timeoutFuncName string } func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocation, message *gbus.BusMessage) error { @@ -77,8 +79,17 @@ func (si *Instance) requestsTimeout() (bool, time.Duration) { return canTimeout, timeoutDuration } -//NewInstance create a new instance of a Saga +func (si *Instance) timeout(tx *sql.Tx, bus gbus.Messaging) error { + + saga, canTimeout := si.UnderlyingInstance.(gbus.RequestSagaTimeout) + if !canTimeout { + return fmt.Errorf("saga instance does not support timeouts") + + } + return saga.Timeout(tx, bus) +} +//NewInstance create a new instance of a Saga func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair, confFns ...gbus.SagaConfFn) *Instance { var newSagaPtr interface{} diff --git a/gbus/saga/timeout.go b/gbus/saga/timeout.go index 4592c35..6f492db 100644 --- a/gbus/saga/timeout.go +++ b/gbus/saga/timeout.go @@ -1,17 +1,17 @@ package saga import ( - "context" "time" - "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" ) //TimeoutManager manages timeouts for sagas //TODO:Make it persistent type TimeoutManager struct { - bus gbus.Bus + bus gbus.Bus + glue *Glue + txp gbus.TxProvider } //RequestTimeout requests a timeout from the timeout manager @@ -20,13 +20,27 @@ func (tm *TimeoutManager) RequestTimeout(svcName, sagaID string, duration time.D go func(svcName, sagaID string, tm *TimeoutManager) { c := time.After(duration) <-c - reuqestTimeout := gbus.SagaTimeoutMessage{ - SagaID: sagaID} - msg := gbus.NewBusMessage(reuqestTimeout) - msg.SagaCorrelationID = sagaID - if err := tm.bus.Send(context.Background(), svcName, msg); err != nil { - //TODO: add logger - logrus.WithError(err).Error("could not send timeout to bus") + if tm.txp == nil { + tm.glue.timeoutSaga(nil, sagaID) + return + } + tx, txe := tm.txp.New() + if txe != nil { + tm.glue.log().WithError(txe).Warn("timeout manager failed to create a transaction") + } else { + callErr := tm.glue.timeoutSaga(tx, sagaID) + if callErr != nil { + tm.glue.log().WithError(callErr).WithField("sagaID", sagaID).Error("timing out a saga failed") + rlbe := tx.Rollback() + if rlbe != nil { + tm.glue.log().WithError(rlbe).Warn("timeout manager failed to rollback transaction") + } + } else { + cmte := tx.Commit() + if cmte != nil { + tm.glue.log().WithError(cmte).Warn("timeout manager failed to rollback transaction") + } + } } }(svcName, sagaID, tm) diff --git a/tests/saga_test.go b/tests/saga_test.go index 3c86325..5d67671 100644 --- a/tests/saga_test.go +++ b/tests/saga_test.go @@ -2,6 +2,7 @@ package tests import ( "context" + "database/sql" "log" "reflect" "testing" @@ -397,11 +398,9 @@ func (s *TimingOutSaga) TimeoutDuration() time.Duration { return time.Second * 1 } -func (s *TimingOutSaga) Timeout(invocation gbus.Invocation, message *gbus.BusMessage) error { +func (s *TimingOutSaga) Timeout(tx *sql.Tx, bus gbus.Messaging) error { s.TimedOut = true - return invocation.Bus().Publish(noopTraceContext(), "test_exchange", "some.topic.1", gbus.NewBusMessage(Event1{ - Data: "TimingOutSaga.Timeout", - })) + return bus.Publish(context.Background(), "test_exchange", "some.topic.1", gbus.NewBusMessage(Event1{})) } type SelfSendingSaga struct { From e45a8a1f6ef9eda624a353cd6bc9f2e81fc1241e Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sun, 19 May 2019 07:33:58 +0300 Subject: [PATCH 12/17] a timed out saga only gets deleted if it is complete This allows saga instances ifthey choose to continue react to messages after they timeout --- gbus/saga/glue.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index 6cd1628..d9283eb 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -238,7 +238,10 @@ func (imsm *Glue) timeoutSaga(tx *sql.Tx, sagaID string) error { imsm.log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga") return timeoutErr } - return imsm.sagaStore.DeleteSaga(tx, saga) + if saga.isComplete() { + return imsm.sagaStore.DeleteSaga(tx, saga) + } + return nil } func (imsm *Glue) log() *log.Entry { From dd62b4332c3c39e337141b91b5707708475eee53 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sun, 19 May 2019 07:50:01 +0300 Subject: [PATCH 13/17] deleteing the saga instance after timeout only if it is complete otherwise updating it In addition fixing some linting errors --- gbus/saga/glue.go | 15 +++++---------- gbus/saga/timeout.go | 6 +++++- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index d9283eb..0bc8104 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -151,7 +151,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) return invkErr } - return imsm.completeOrUpdateSaga(invocation.Tx(), instance, message) + return imsm.completeOrUpdateSaga(invocation.Tx(), instance) } else if message.Semantics == gbus.CMD { e := fmt.Errorf("Warning:Command or Reply message with no saga reference received. message will be dropped.\nmessage as of type:%v", reflect.TypeOf(message).Name()) @@ -172,7 +172,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga") return invkErr } - e = imsm.completeOrUpdateSaga(invocation.Tx(), instance, message) + e = imsm.completeOrUpdateSaga(invocation.Tx(), instance) if e != nil { return e } @@ -196,11 +196,9 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat return instance.invoke(exchange, routingKey, sginv, message) } -func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance, lastMessage *gbus.BusMessage) error { +func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance) error { - _, timedOut := lastMessage.Payload.(gbus.SagaTimeoutMessage) - - if instance.isComplete() || timedOut { + if instance.isComplete() { imsm.log().WithField("saga_id", instance.ID).Info("saga has completed and will be deleted") return imsm.sagaStore.DeleteSaga(tx, instance) @@ -238,10 +236,7 @@ func (imsm *Glue) timeoutSaga(tx *sql.Tx, sagaID string) error { imsm.log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga") return timeoutErr } - if saga.isComplete() { - return imsm.sagaStore.DeleteSaga(tx, saga) - } - return nil + return imsm.completeOrUpdateSaga(tx, saga) } func (imsm *Glue) log() *log.Entry { diff --git a/gbus/saga/timeout.go b/gbus/saga/timeout.go index 6f492db..4012353 100644 --- a/gbus/saga/timeout.go +++ b/gbus/saga/timeout.go @@ -20,8 +20,12 @@ func (tm *TimeoutManager) RequestTimeout(svcName, sagaID string, duration time.D go func(svcName, sagaID string, tm *TimeoutManager) { c := time.After(duration) <-c + //TODO:if the bus is not transactional, moving forward we should not allow using sagas in a non transactional bus if tm.txp == nil { - tm.glue.timeoutSaga(nil, sagaID) + tme := tm.glue.timeoutSaga(nil, sagaID) + if tme != nil { + tm.glue.log().WithError(tme).WithField("sagaID", sagaID).Error("timing out a saga failed") + } return } tx, txe := tm.txp.New() From d05664fe1f465006e7226a5c736271e635c1b0a4 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sun, 19 May 2019 08:16:32 +0300 Subject: [PATCH 14/17] removing unused field in saga instance (linting error) --- gbus/saga/instance.go | 1 - 1 file changed, 1 deletion(-) diff --git a/gbus/saga/instance.go b/gbus/saga/instance.go index f40f5e8..2b3c889 100644 --- a/gbus/saga/instance.go +++ b/gbus/saga/instance.go @@ -17,7 +17,6 @@ type Instance struct { ConcurrencyCtrl int UnderlyingInstance gbus.Saga MsgToMethodMap []*MsgToFuncPair - timeoutFuncName string } func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocation, message *gbus.BusMessage) error { From 80be1cc30e05f5b090d4ad6e971c0281f2adffd8 Mon Sep 17 00:00:00 2001 From: adiweiss <46281796+adiweiss@users.noreply.github.com> Date: Wed, 22 May 2019 14:45:44 +0300 Subject: [PATCH 15/17] Update SAGA.md update documentation after changes of timeout interface : https://github.com/wework/grabbit/pull/78/files --- docs/SAGA.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/SAGA.md b/docs/SAGA.md index b53f062..a3534b0 100644 --- a/docs/SAGA.md +++ b/docs/SAGA.md @@ -178,7 +178,7 @@ In order to define a timeout for the saga and have grabbit call the saga instanc ```go type RequestSagaTimeout interface { TimeoutDuration() time.Duration - Timeout(invocation Invocation, message *BusMessage) error + Timeout(tx *sql.Tx, bus Messaging) error } ``` @@ -192,7 +192,7 @@ func (s *BookVacationSaga) RequestTimeout() time.Duration { } func (s *BookVacationSaga) Timeout(invocation gbus.Invocation, message *gbus.BusMessage) error { - return invocation.Bus().Publish(invocation.Ctx(), "some_exchange", "some.topic.1", gbus.NewBusMessage(VacationBookingTimedOut{})) + return bus.Publish(context.Background(), "some_exchange", "some.topic.1", gbus.NewBusMessage(VacationBookingTimedOut{})) } ``` From bcb1974573e7b9caa6306e0e422c7f5f43b04a22 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Wed, 5 Jun 2019 06:15:40 +0300 Subject: [PATCH 16/17] fixing a bug in which all handler retires executed within the same transaction When a handler fails and gets retried each retry should be run in a new transaction --- gbus/worker.go | 91 ++++++++++++++++++++++---------------------------- 1 file changed, 40 insertions(+), 51 deletions(-) diff --git a/gbus/worker.go b/gbus/worker.go index 0f798c3..3b6f80c 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -315,61 +315,31 @@ func (worker *worker) processMessage(delivery amqp.Delivery, isRPCreply bool) { return } - var tx *sql.Tx - var txErr error - if worker.isTxnl { - tx, txErr = worker.txProvider.New() - if txErr != nil { - worker.log().WithError(txErr).Error("failed to create transaction") - worker.span.LogFields(slog.Error(txErr)) - //reject the message but requeue it so it gets redelivered until we can create transactions - _ = worker.reject(true, delivery) - return - } - } - err = worker.invokeHandlers(ctx, handlers, bm, &delivery, tx) - - // if all handlers executed with out errors then commit the transactional if the bus is transactional - // if the tranaction committed successfully then ack the message. - // if the bus is not transactional then just ack the message + err = worker.invokeHandlers(ctx, handlers, bm, &delivery) if err == nil { - if worker.isTxnl { - err = worker.SafeWithRetries(tx.Commit, MaxRetryCount) - if err == nil { - worker.log().Info("bus transaction committed successfully ") - //ack the message - _ = worker.ack(delivery) - } else { - worker.span.LogFields(slog.Error(err)) - worker.log().WithError(err).Error("failed committing transaction") - //if the commit failed we will reject the message - _ = worker.reject(false, delivery) - } - } else { /*if the bus in not transactional just try acking the message*/ - _ = worker.ack(delivery) - } - //else there was an error in the invokation then try rollingback the transaction and reject the message + _ = worker.ack(delivery) } else { - worker.span.LogFields(slog.Error(err)) - worker.log().WithError(err).WithFields(log.Fields{"message_name": bm.PayloadFQN, "semantics": bm.Semantics}).Error("Failed to consume message due to failure of one or more handlers.\n Message rejected as poison") - - if worker.isTxnl { - worker.log().Warn("rolling back transaction") - err = worker.SafeWithRetries(tx.Rollback, MaxRetryCount) - - if err != nil { - worker.span.LogFields(slog.Error(err)) - worker.log().WithError(err).Error("failed to rollback transaction") - } - } - _ = worker.reject(false, delivery) } } -func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHandler, message *BusMessage, delivery *amqp.Delivery, tx *sql.Tx) (err error) { +func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHandler, message *BusMessage, delivery *amqp.Delivery) (err error) { + + //this is the action that will get retried + // each retry shoukd run a new and sperate transaction which should end with a commit or rollback action := func(attempts uint) (actionErr error) { + var tx *sql.Tx + var txCreateErr error + if worker.isTxnl { + tx, txCreateErr = worker.txProvider.New() + if txCreateErr != nil { + worker.log().WithError(txCreateErr).Error("failed creating new tx") + worker.span.LogFields(slog.Error(txCreateErr)) + return txCreateErr + } + } + worker.span, sctx = opentracing.StartSpanFromContext(sctx, "invokeHandlers") worker.span.LogFields(slog.Uint64("attempt", uint64(attempts+1))) defer func() { @@ -377,6 +347,12 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan pncMsg := fmt.Sprintf("%v\n%s", p, debug.Stack()) worker.log().WithField("stack", pncMsg).Error("recovered from panic while invoking handler") actionErr = errors.New(pncMsg) + if worker.isTxnl { + rbkErr := tx.Rollback() + if rbkErr != nil { + worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler panic") + } + } worker.span.LogFields(slog.Error(actionErr)) } worker.span.Finish() @@ -393,10 +369,23 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan ctx: hsctx, exchange: delivery.Exchange, routingKey: delivery.RoutingKey} - e := handler(ctx, message) - if e != nil { - hspan.LogFields(slog.Error(e)) - return e + handlerErr := handler(ctx, message) + if handlerErr != nil { + hspan.LogFields(slog.Error(handlerErr)) + if worker.isTxnl { + rbkErr := tx.Rollback() + if rbkErr != nil { + worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler error") + } + } + return handlerErr + } + } + if worker.isTxnl { + cmtErr := tx.Commit() + if cmtErr != nil { + worker.log().WithError(cmtErr).Error("failed commiting transaction after invoking handlers") + return cmtErr } } return nil From bad2ebf84042b99a35471b4e937abe09d626561e Mon Sep 17 00:00:00 2001 From: "avigail.berger" Date: Wed, 5 Jun 2019 07:39:05 +0300 Subject: [PATCH 17/17] making the retry num configurable and more random apart --- gbus/abstractions.go | 3 +++ gbus/builder/builder.go | 5 ++++ gbus/bus.go | 5 ++-- gbus/worker.go | 11 ++++++-- go.mod | 15 +++++++---- go.sum | 57 +++++++++++++++-------------------------- tests/consts.go | 3 ++- 7 files changed, 52 insertions(+), 47 deletions(-) diff --git a/gbus/abstractions.go b/gbus/abstractions.go index 5f95632..55e911f 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -166,6 +166,9 @@ type Builder interface { //ConfigureHealthCheck defines the default timeout in seconds for the db ping check ConfigureHealthCheck(timeoutInSeconds time.Duration) Builder + //RetriesNum defines the number of retries upon error + RetriesNum(retries uint) Builder + //Build the bus Build(svcName string) Bus } diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index 5711143..48d3149 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -170,6 +170,11 @@ func (builder *defaultBuilder) ConfigureHealthCheck(timeoutInSeconds time.Durati return builder } +func (builder *defaultBuilder) RetriesNum(retries uint) gbus.Builder { + gbus.MaxRetryCount = retries + return builder +} + //New :) func New() Nu { return Nu{} diff --git a/gbus/bus.go b/gbus/bus.go index d8a3e01..cea18b3 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -62,9 +62,8 @@ type DefaultBus struct { } var ( - //TODO: Replace constants with configuration - - //MaxRetryCount defines the max times a retry can run + //MaxRetryCount defines the max times a retry can run. + //Default is 3 but it is configurable MaxRetryCount uint = 3 //RpcHeaderName used to define the header in grabbit for RPC RpcHeaderName = "x-grabbit-msg-rpc-id" diff --git a/gbus/worker.go b/gbus/worker.go index 0f798c3..b44cc14 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "math/rand" "reflect" "runtime" "runtime/debug" @@ -13,6 +14,7 @@ import ( "github.com/Rican7/retry" "github.com/Rican7/retry/backoff" + "github.com/Rican7/retry/jitter" "github.com/Rican7/retry/strategy" "github.com/opentracing-contrib/go-amqp/amqptracer" "github.com/opentracing/opentracing-go" @@ -402,10 +404,15 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan return nil } - //retry for MaxRetryCount, back off by a Fibonacci series 50, 50, 100, 150, 250 ms + //retry for MaxRetryCount, back off by a jittered stratergy + seed := time.Now().UnixNano() + random := rand.New(rand.NewSource(seed)) return retry.Retry(action, strategy.Limit(MaxRetryCount), - strategy.Backoff(backoff.Fibonacci(50*time.Millisecond))) + strategy.BackoffWithJitter( + backoff.BinaryExponential(10*time.Millisecond), + jitter.Deviation(random, 0.5), + )) } func (worker *worker) log() *log.Entry { diff --git a/go.mod b/go.mod index d143cbb..3841bbc 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,14 @@ module github.com/wework/grabbit require ( github.com/DataDog/zstd v1.4.0 // indirect github.com/Rican7/retry v0.1.0 - github.com/Shopify/sarama v1.22.0 // indirect + github.com/Shopify/sarama v1.22.1 // indirect github.com/bsm/sarama-cluster v2.1.15+incompatible // indirect github.com/dangkaka/go-kafka-avro v0.0.0-20181108134201-d57aece51a15 github.com/go-sql-driver/mysql v1.4.1 github.com/golang/protobuf v1.3.1 github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect github.com/kr/pretty v0.1.0 // indirect + github.com/kr/pty v1.1.4 // indirect github.com/linkedin/goavro v2.1.0+incompatible github.com/onsi/ginkgo v1.8.0 // indirect github.com/onsi/gomega v1.5.0 // indirect @@ -17,11 +18,15 @@ require ( github.com/opentracing/opentracing-go v1.1.0 github.com/pierrec/lz4 v2.0.5+incompatible // indirect github.com/rs/xid v1.2.1 - github.com/sirupsen/logrus v1.4.1 + github.com/sirupsen/logrus v1.4.2 github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 - golang.org/x/net v0.0.0-20190420063019-afa5a82059c6 // indirect - golang.org/x/sys v0.0.0-20190419153524-e8e3143a4f4a // indirect - google.golang.org/appengine v1.5.0 // indirect + github.com/stretchr/objx v0.2.0 // indirect + golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5 // indirect + golang.org/x/net v0.0.0-20190603091049-60506f45cf65 // indirect + golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed // indirect + golang.org/x/text v0.3.2 // indirect + golang.org/x/tools v0.0.0-20190603231351-8aaa1484dc10 // indirect + google.golang.org/appengine v1.6.0 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect gopkg.in/yaml.v2 v2.2.2 // indirect diff --git a/go.sum b/go.sum index 6078870..5476080 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,10 @@ -github.com/DataDog/zstd v1.3.5 h1:DtpNbljikUepEPD16hD4LvIcmhnhdLTiW/5pHgbmp14= -github.com/DataDog/zstd v1.3.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= +github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/DataDog/zstd v1.4.0 h1:vhoV+DUHnRZdKW1i5UMjAk2G4JY8wN4ayRfYDNdEhwo= github.com/DataDog/zstd v1.4.0/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/Rican7/retry v0.1.0 h1:FqK94z34ly8Baa6K+G8Mmza9rYWTKOJk+yckIBB5qVk= github.com/Rican7/retry v0.1.0/go.mod h1:FgOROf8P5bebcC1DS0PdOQiqGUridaZvikzUmkFW6gg= -github.com/Shopify/sarama v1.22.0 h1:rtiODsvY4jW6nUV6n3K+0gx/8WlAwVt+Ixt6RIvpYyo= -github.com/Shopify/sarama v1.22.0/go.mod h1:lm3THZ8reqBDBQKQyb5HB3sY1lKp3grEbQ81aWSgPp4= -github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= +github.com/Shopify/sarama v1.22.1 h1:exyEsKLGyCsDiqpV5Lr4slFi8ev2KiM3cP1KZ6vnCQ0= +github.com/Shopify/sarama v1.22.1/go.mod h1:FRzlvRpMFO/639zY1SDxUxkqH97Y0ndM5CbGj6oG3As= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/bsm/sarama-cluster v2.1.15+incompatible h1:RkV6WiNRnqEEbp81druK8zYhmnIgdOjqSVi0+9Cnl2A= github.com/bsm/sarama-cluster v2.1.15+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJchr2kQhqxgmAp2iEX5W96gMM= @@ -21,33 +19,25 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= -github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= -github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/pty v1.1.4/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/linkedin/goavro v2.1.0+incompatible h1:DV2aUlj2xZiuxQyvag8Dy7zjY69ENjS66bWkSfdpddY= github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620 h1:OhtbNVqXz6DuVXGvwPYXnNwQy1n2rI+2mID9CQOok9U= github.com/opentracing-contrib/go-amqp v0.0.0-20171102191528-e26701f95620/go.mod h1:UTAgTV5+tXpWiYqczgUb2kCslN9sqcshFQdmHSTyzlU= @@ -57,55 +47,50 @@ github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0je github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rs/xid v1.2.1 h1:mhH9Nq+C1fY2l1XIpgxIiUOfNpRBYH1kKcr+qfKgjRc= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= -github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k= -github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 h1:0ngsPmuP6XIjiFRNFYlvKwSr5zff2v+uPHaffZ6/M4k= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= -golang.org/x/net v0.0.0-20180724234803-3673e40ba225 h1:kNX+jCowfMYzvlSvJu5pQWEmyWFrBXJ3PBy10xKMXK8= +golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190420063019-afa5a82059c6 h1:HdqqaWmYAUI7/dmByKKEw+yxDksGSo+9GjkUc9Zp34E= -golang.org/x/net v0.0.0-20190420063019-afa5a82059c6/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= +golang.org/x/net v0.0.0-20190603091049-60506f45cf65 h1:+rhAzEzT3f4JtomfC371qB+0Ola2caSKcY69NUBZrRQ= +golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= -golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190419153524-e8e3143a4f4a h1:XCr/YX7O0uxRkLq2k1ApNQMims9eCioF9UpzIPBDmuo= -golang.org/x/sys v0.0.0-20190419153524-e8e3143a4f4a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed h1:uPxWBzB3+mlnjy9W58qY1j/cjyFjutgw/Vhan2zLy/A= +golang.org/x/sys v0.0.0-20190602015325-4c4f7f33c9ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -google.golang.org/appengine v1.5.0 h1:KxkO13IPW4Lslp2bz+KHP2E3gtFlrIGNThxkZQ3g+4c= -google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190603231351-8aaa1484dc10/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +google.golang.org/appengine v1.6.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= -gopkg.in/linkedin/goavro.v1 v1.0.5 h1:BJa69CDh0awSsLUmZ9+BowBdokpduDZSM9Zk8oKHfN4= gopkg.in/linkedin/goavro.v1 v1.0.5/go.mod h1:Aw5GdAbizjOEl0kAMHV9iHmA8reZzW/OKuJAl4Hb9F0= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/tests/consts.go b/tests/consts.go index 365ef06..d79d560 100644 --- a/tests/consts.go +++ b/tests/consts.go @@ -30,7 +30,8 @@ func createBusWithOptions(svcName string, deadletter string, txnl, pos bool) gbu Bus(connStr). WithPolicies(&policy.Durable{}, &policy.TTL{Duration: time.Second * 3600}). WorkerNum(3, 1). - WithConfirms() + WithConfirms(). + RetriesNum(4) if txnl { busBuilder = busBuilder.Txnl("mysql", "rhinof:rhinof@/rhinof")