From c875004cf9279c1407fabc5797d7b92490381c42 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Tue, 7 May 2019 18:40:50 +0300 Subject: [PATCH 01/15] added a test for health prob and fixed a minor race condition in worker initalization --- gbus/bus.go | 20 ++++++++++---------- gbus/worker.go | 3 ++- tests/bus_test.go | 15 +++++++++++++++ 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/gbus/bus.go b/gbus/bus.go index 54a2368..d8a3e01 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -57,8 +57,8 @@ type DefaultBus struct { Confirm bool healthChan chan error backpreasure bool - rabbitFailure bool DbPingTimeout time.Duration + amqpConnected bool } var ( @@ -257,7 +257,7 @@ func (b *DefaultBus) Start() error { //start monitoring on amqp related errors go b.monitorAMQPErrors() //start consuming messags from service queue - + b.amqpConnected = true return nil } @@ -293,12 +293,11 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { serializer: b.Serializer, b: b, amqpErrors: b.amqpErrors} - go func() { - err := w.Start() - if err != nil { - log.WithError(err) - } - }() + + err := w.Start() + if err != nil { + log.WithError(err).Error("failed to start worker") + } workers = append(workers, w) } @@ -321,6 +320,7 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) { err := worker.Stop() if err != nil { b.log().WithError(err).Error("could not stop worker") + return err } } b.Outgoing.shutdown() @@ -359,7 +359,7 @@ func (b *DefaultBus) GetHealth() HealthCard { return HealthCard{ DbConnected: dbConnected, RabbitBackPressure: b.backpreasure, - RabbitConnected: !b.rabbitFailure, + RabbitConnected: b.amqpConnected, } } @@ -577,7 +577,7 @@ func (b *DefaultBus) monitorAMQPErrors() { } b.backpreasure = blocked.Active case amqpErr := <-b.amqpErrors: - b.rabbitFailure = true + b.amqpConnected = false b.log().WithField("amqp_error", amqpErr).Error("amqp error") if b.healthChan != nil { b.healthChan <- amqpErr diff --git a/gbus/worker.go b/gbus/worker.go index d38cc64..0f798c3 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -47,6 +47,7 @@ type worker struct { func (worker *worker) Start() error { worker.log().Info("starting worker") + worker.stop = make(chan bool) worker.channel.NotifyClose(worker.amqpErrors) var ( @@ -62,7 +63,7 @@ func (worker *worker) Start() error { } worker.messages = messages worker.rpcMessages = rpcmsgs - worker.stop = make(chan bool) + go worker.consumeMessages() return nil diff --git a/tests/bus_test.go b/tests/bus_test.go index ff0fb51..037dffb 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -323,6 +323,21 @@ func TestSendingPanic(t *testing.T) { } } +func TestHealthCheck(t *testing.T) { + svc1 := createNamedBusForTest(testSvc1) + err := svc1.Start() + if err != nil { + t.Error(err.Error()) + } + defer svc1.Shutdown() + health := svc1.GetHealth() + + fmt.Printf("%v", health) + if !health.DbConnected || !health.RabbitConnected || health.RabbitBackPressure { + t.Error("bus expected to be healthy but failed health check") + } +} + func noopTraceContext() context.Context { return context.Background() // tracer := opentracing.NoopTracer{} From 592287c930a4b399b38e674ce60e69aca8a05cd1 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Tue, 7 May 2019 19:53:47 +0300 Subject: [PATCH 02/15] fixed builder to set default bolicies when building the bus and changed tests to run with multiple workers --- gbus/builder/builder.go | 2 +- tests/consts.go | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index b57ee3c..c09cfd7 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -49,7 +49,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { RPCHandlers: make(map[string]gbus.MessageHandler), Serializer: builder.serializer, DLX: builder.dlx, - DefaultPolicies: make([]gbus.MessagePolicy, 0), + DefaultPolicies: builder.defaultPolicies, DbPingTimeout: 3} gb.Confirm = builder.confirm diff --git a/tests/consts.go b/tests/consts.go index 3234a4a..365ef06 100644 --- a/tests/consts.go +++ b/tests/consts.go @@ -1,6 +1,8 @@ package tests import ( + "time" + "github.com/wework/grabbit/gbus" "github.com/wework/grabbit/gbus/builder" "github.com/wework/grabbit/gbus/policy" @@ -26,7 +28,8 @@ func createBusWithOptions(svcName string, deadletter string, txnl, pos bool) gbu busBuilder := builder. New(). Bus(connStr). - WithPolicies(&policy.Durable{}). + WithPolicies(&policy.Durable{}, &policy.TTL{Duration: time.Second * 3600}). + WorkerNum(3, 1). WithConfirms() if txnl { From a3069de2e5ad356e7a502f2b0c067a1a1346d900 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Tue, 7 May 2019 20:22:24 +0300 Subject: [PATCH 03/15] improved deadlettering test the test now validates that messages get rejected when handlers return an error and all rety attempts did not sccueed --- tests/bus_test.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/tests/bus_test.go b/tests/bus_test.go index 037dffb..eb21c5b 100644 --- a/tests/bus_test.go +++ b/tests/bus_test.go @@ -3,8 +3,10 @@ package tests import ( "context" "database/sql" + "errors" "fmt" "reflect" + "sync" "testing" "time" @@ -209,28 +211,34 @@ func TestRPC(t *testing.T) { } func TestDeadlettering(t *testing.T) { + + var waitgroup sync.WaitGroup + waitgroup.Add(2) poision := gbus.NewBusMessage(PoisionMessage{}) service1 := createBusWithOptions(testSvc1, "grabbit-dead", true, true) deadletterSvc := createBusWithOptions("deadletterSvc", "grabbit-dead", true, true) - proceed := make(chan bool) - handler := func(tx *sql.Tx, poision amqp.Delivery) error { - proceed <- true + + deadMessageHandler := func(tx *sql.Tx, poision amqp.Delivery) error { + waitgroup.Done() return nil } - deadletterSvc.HandleDeadletter(handler) + faultyHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error { + return errors.New("fail") + } + + deadletterSvc.HandleDeadletter(deadMessageHandler) + service1.HandleMessage(Command1{}, faultyHandler) deadletterSvc.Start() defer deadletterSvc.Shutdown() service1.Start() defer service1.Shutdown() - e := service1.Send(context.Background(), testSvc1, poision) - if e != nil { - log.Printf("send error: %v", e) - } + service1.Send(context.Background(), testSvc1, poision) + service1.Send(context.Background(), testSvc1, gbus.NewBusMessage(Command1{})) - <-proceed + waitgroup.Wait() } func TestRegistrationAfterBusStarts(t *testing.T) { From 905f904464b40cf73ebf10eaa7abde0609986c41 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Thu, 9 May 2019 11:24:26 +0300 Subject: [PATCH 04/15] added tests for protobuf serlializer --- tests/protoMessages.pb.go | 86 ++++++++++++++++++++++++++++++++ tests/protoMessages.proto | 7 +++ tests/protoMessagesBase.go | 5 ++ tests/protoSerialization_test.go | 49 ++++++++++++++++++ 4 files changed, 147 insertions(+) create mode 100644 tests/protoMessages.pb.go create mode 100644 tests/protoMessages.proto create mode 100644 tests/protoMessagesBase.go create mode 100644 tests/protoSerialization_test.go diff --git a/tests/protoMessages.pb.go b/tests/protoMessages.pb.go new file mode 100644 index 0000000..37aedc5 --- /dev/null +++ b/tests/protoMessages.pb.go @@ -0,0 +1,86 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: protoMessages.proto + +package tests + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type ProtoCommand struct { + SomeNumber int64 `protobuf:"varint,1,opt,name=some_number,json=someNumber,proto3" json:"some_number,omitempty"` + SomeData string `protobuf:"bytes,2,opt,name=some_data,json=someData,proto3" json:"some_data,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ProtoCommand) Reset() { *m = ProtoCommand{} } +func (m *ProtoCommand) String() string { return proto.CompactTextString(m) } +func (*ProtoCommand) ProtoMessage() {} +func (*ProtoCommand) Descriptor() ([]byte, []int) { + return fileDescriptor_a1929c2ebf17cd0b, []int{0} +} + +func (m *ProtoCommand) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ProtoCommand.Unmarshal(m, b) +} +func (m *ProtoCommand) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ProtoCommand.Marshal(b, m, deterministic) +} +func (m *ProtoCommand) XXX_Merge(src proto.Message) { + xxx_messageInfo_ProtoCommand.Merge(m, src) +} +func (m *ProtoCommand) XXX_Size() int { + return xxx_messageInfo_ProtoCommand.Size(m) +} +func (m *ProtoCommand) XXX_DiscardUnknown() { + xxx_messageInfo_ProtoCommand.DiscardUnknown(m) +} + +var xxx_messageInfo_ProtoCommand proto.InternalMessageInfo + +func (m *ProtoCommand) GetSomeNumber() int64 { + if m != nil { + return m.SomeNumber + } + return 0 +} + +func (m *ProtoCommand) GetSomeData() string { + if m != nil { + return m.SomeData + } + return "" +} + +func init() { + proto.RegisterType((*ProtoCommand)(nil), "tests.ProtoCommand") +} + +func init() { proto.RegisterFile("protoMessages.proto", fileDescriptor_a1929c2ebf17cd0b) } + +var fileDescriptor_a1929c2ebf17cd0b = []byte{ + // 119 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2e, 0x28, 0xca, 0x2f, + 0xc9, 0xf7, 0x4d, 0x2d, 0x2e, 0x4e, 0x4c, 0x4f, 0x2d, 0xd6, 0x03, 0xf3, 0x84, 0x58, 0x4b, 0x52, + 0x8b, 0x4b, 0x8a, 0x95, 0x7c, 0xb8, 0x78, 0x02, 0x40, 0x7c, 0xe7, 0xfc, 0xdc, 0xdc, 0xc4, 0xbc, + 0x14, 0x21, 0x79, 0x2e, 0xee, 0xe2, 0xfc, 0xdc, 0xd4, 0xf8, 0xbc, 0xd2, 0xdc, 0xa4, 0xd4, 0x22, + 0x09, 0x46, 0x05, 0x46, 0x0d, 0xe6, 0x20, 0x2e, 0x90, 0x90, 0x1f, 0x58, 0x44, 0x48, 0x9a, 0x8b, + 0x13, 0xac, 0x20, 0x25, 0xb1, 0x24, 0x51, 0x82, 0x49, 0x81, 0x51, 0x83, 0x33, 0x88, 0x03, 0x24, + 0xe0, 0x92, 0x58, 0x92, 0x98, 0xc4, 0x06, 0x36, 0xdb, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0xe2, + 0x61, 0x85, 0xcd, 0x72, 0x00, 0x00, 0x00, +} diff --git a/tests/protoMessages.proto b/tests/protoMessages.proto new file mode 100644 index 0000000..bddad18 --- /dev/null +++ b/tests/protoMessages.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; +package tests; + +message ProtoCommand { + int64 some_number = 1; + string some_data = 2; +} diff --git a/tests/protoMessagesBase.go b/tests/protoMessagesBase.go new file mode 100644 index 0000000..16462ed --- /dev/null +++ b/tests/protoMessagesBase.go @@ -0,0 +1,5 @@ +package tests + +func (*ProtoCommand) SchemaName() string { + return "ProtoCommand" +} diff --git a/tests/protoSerialization_test.go b/tests/protoSerialization_test.go new file mode 100644 index 0000000..9e14a9c --- /dev/null +++ b/tests/protoSerialization_test.go @@ -0,0 +1,49 @@ +package tests + +import ( + "reflect" + "testing" + + log "github.com/sirupsen/logrus" + "github.com/wework/grabbit/gbus/serialization" +) + +func TestProtoSerialization(t *testing.T) { + + logger := log.WithField("test", "proto_serialization") + serializer := serialization.NewProtoSerializer(logger) + cmd := &ProtoCommand{} + schemaName := cmd.SchemaName() + cmd.SomeNumber = 15 + cmd.SomeData = "rhinof" + encodedBytes, encErr := serializer.Encode(cmd) + + if encErr != nil { + t.Errorf("encoding returned an error: %v", encErr) + } + + //Calling Decode with out first registering the schema should fail and return an error + _, decErr := serializer.Decode(encodedBytes, schemaName) + if decErr == nil { + t.Error("decoding expected to fail but did not return an error") + } + + serializer.Register(cmd) + decodedMsg, noErr := serializer.Decode(encodedBytes, schemaName) + if noErr != nil { + t.Errorf("decoding of message failed with error:%v", noErr) + } + + cmdCopy, ok := decodedMsg.(*ProtoCommand) + + if !ok { + t.Errorf("decoded message was of wrong type. expected:%v actual:%v", reflect.TypeOf(cmd), reflect.TypeOf(decodedMsg)) + } + + if cmdCopy.SomeNumber != cmd.SomeNumber || cmdCopy.SomeData != cmd.SomeData { + log.Infof("expected:%v\n", cmd) + log.Infof("actual:%v\n", cmdCopy) + t.Errorf("decoded message has unexpected or missing data") + } + +} From 9e5407bb675d5070df1155d3ac7a32737bafdd56 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Thu, 9 May 2019 17:52:04 +0300 Subject: [PATCH 05/15] added tests for protobuf serializer error scenarios --- tests/protoSerialization_test.go | 57 +++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/tests/protoSerialization_test.go b/tests/protoSerialization_test.go index 9e14a9c..f1e4b0b 100644 --- a/tests/protoSerialization_test.go +++ b/tests/protoSerialization_test.go @@ -8,34 +8,46 @@ import ( "github.com/wework/grabbit/gbus/serialization" ) -func TestProtoSerialization(t *testing.T) { +func getProtoCommand() *ProtoCommand { + return &ProtoCommand{ + SomeNumber: 15, + SomeData: "rhinof"} +} +func TestProtoSerialization(t *testing.T) { + // so the tests logs do not get littered with log entries from serlializer + log.SetLevel(log.PanicLevel) + defer log.SetLevel(log.InfoLevel) logger := log.WithField("test", "proto_serialization") + serializer := serialization.NewProtoSerializer(logger) - cmd := &ProtoCommand{} + + name := serializer.Name() + if name != "proto" { + t.Fatalf("incorrect serializer name. expected:proto actual:%s", name) + } + cmd := getProtoCommand() schemaName := cmd.SchemaName() - cmd.SomeNumber = 15 - cmd.SomeData = "rhinof" - encodedBytes, encErr := serializer.Encode(cmd) + encodedBytes, encErr := serializer.Encode(cmd) if encErr != nil { - t.Errorf("encoding returned an error: %v", encErr) + t.Fatalf("encoding returned an error: %v", encErr) } //Calling Decode with out first registering the schema should fail and return an error _, decErr := serializer.Decode(encodedBytes, schemaName) if decErr == nil { - t.Error("decoding expected to fail but did not return an error") + t.Fatalf("decoding expected to fail but did not return an error") } serializer.Register(cmd) + decodedMsg, noErr := serializer.Decode(encodedBytes, schemaName) if noErr != nil { - t.Errorf("decoding of message failed with error:%v", noErr) + t.Fatalf("decoding of message failed with error:%v", noErr) } cmdCopy, ok := decodedMsg.(*ProtoCommand) - if !ok { t.Errorf("decoded message was of wrong type. expected:%v actual:%v", reflect.TypeOf(cmd), reflect.TypeOf(decodedMsg)) } @@ -47,3 +59,30 @@ func TestProtoSerialization(t *testing.T) { } } + +func TestProtoSerializationErrors(t *testing.T) { + // so the tests logs do not get littered with log entries from serlializer + log.SetLevel(log.PanicLevel) + defer log.SetLevel(log.InfoLevel) + logger := log.WithField("test", "proto_serialization") + + serializer := serialization.NewProtoSerializer(logger) + + // test that encoding a non protobuf generated strcut fails and returns an error + _, encErr := serializer.Encode(Command1{}) + if encErr == nil { + t.Errorf("serializer expected to return an error for non proto generated messages but di not") + } + + cmd := getProtoCommand() + encodedBytes, encErr := serializer.Encode(cmd) + if encErr != nil { + t.Fatalf("encoding returned an error: %v", encErr) + } + + //decoding an unregistered schema fails and returns an error + if _, decErr := serializer.Decode(encodedBytes, "kong"); decErr == nil { + t.Errorf("decoding an unregistred schema is expected to return an error but did not") + } + +} From c1040a57f17cb9d8ecce65a3f35782c488064746 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Thu, 9 May 2019 18:01:07 +0300 Subject: [PATCH 06/15] added more error testing for proto serializer --- tests/protoSerialization_test.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/protoSerialization_test.go b/tests/protoSerialization_test.go index f1e4b0b..ef5c91e 100644 --- a/tests/protoSerialization_test.go +++ b/tests/protoSerialization_test.go @@ -1,6 +1,7 @@ package tests import ( + "crypto/rand" "reflect" "testing" @@ -85,4 +86,11 @@ func TestProtoSerializationErrors(t *testing.T) { t.Errorf("decoding an unregistred schema is expected to return an error but did not") } + //decoding junk fails and returns an error + junk := make([]byte, 16) + rand.Read(junk) + if _, decErr := serializer.Decode(junk, cmd.SchemaName()); decErr == nil { + t.Errorf("decoding junk is expected to return an error but did not") + } + } From 5490d3d5892f610bf84a4f9f2619a51dfd7ddd1b Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Thu, 9 May 2019 18:10:54 +0300 Subject: [PATCH 07/15] fixed validation in proto serialization error tests --- tests/protoSerialization_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/protoSerialization_test.go b/tests/protoSerialization_test.go index ef5c91e..efe4524 100644 --- a/tests/protoSerialization_test.go +++ b/tests/protoSerialization_test.go @@ -86,6 +86,7 @@ func TestProtoSerializationErrors(t *testing.T) { t.Errorf("decoding an unregistred schema is expected to return an error but did not") } + serializer.Register(cmd) //decoding junk fails and returns an error junk := make([]byte, 16) rand.Read(junk) From 6d1480db0bd49c36ae34a140a4226bb0a00adbb1 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Mon, 13 May 2019 09:55:00 +0300 Subject: [PATCH 08/15] fixing a bug where saga configuration functions were run only when saga was created. This fixes #74 --- gbus/saga/def.go | 10 +++- gbus/saga/glue.go | 4 +- gbus/saga/instance.go | 6 +-- tests/saga_test.go | 121 ++++++++++++++++++++++++++++++++++-------- 4 files changed, 113 insertions(+), 28 deletions(-) diff --git a/gbus/saga/def.go b/gbus/saga/def.go index b09721f..e390817 100644 --- a/gbus/saga/def.go +++ b/gbus/saga/def.go @@ -66,8 +66,16 @@ func getFunNameFromHandler(handler gbus.MessageHandler) string { } func (sd *Def) newInstance() *Instance { - return NewInstance(sd.sagaType, sd.msgToFunc, sd.sagaConfFns...) + instance := NewInstance(sd.sagaType, sd.msgToFunc) + return sd.configureSaga(instance) +} +func (sd *Def) configureSaga(instance *Instance) *Instance { + saga := instance.UnderlyingInstance + for _, conf := range sd.sagaConfFns { + saga = conf(saga) + } + return instance } func (sd *Def) shouldStartNewSaga(message *gbus.BusMessage) bool { diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index 6d68ff7..611cdd5 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -154,7 +154,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) e := fmt.Errorf("Warning:Failed message routed with SagaCorrelationID:%v but no saga instance with the same id found ", message.SagaCorrelationID) return e } - + def.configureSaga(instance) if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil { imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga") return invkErr @@ -176,7 +176,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) imsm.log().WithFields(log.Fields{"message": msgName, "instances_fetched": len(instances)}).Info("fetched saga instances") for _, instance := range instances { - + def.configureSaga(instance) if invkErr := imsm.invokeSagaInstance(instance, invocation, message); invkErr != nil { imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga") return invkErr diff --git a/gbus/saga/instance.go b/gbus/saga/instance.go index 5cdf99d..f8303d9 100644 --- a/gbus/saga/instance.go +++ b/gbus/saga/instance.go @@ -79,7 +79,7 @@ func (si *Instance) requestsTimeout() (bool, time.Duration) { //NewInstance create a new instance of a Saga -func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair, confFns ...gbus.SagaConfFn) *Instance { +func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair) *Instance { var newSagaPtr interface{} if sagaType.Kind() == reflect.Ptr { @@ -91,9 +91,7 @@ func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair, confFns saga := newSagaPtr.(gbus.Saga) newSaga := saga.New() - for _, conf := range confFns { - newSaga = conf(newSaga) - } + //newSagaPtr := reflect.New(sagaType).Elem() newInstance := &Instance{ ID: xid.New().String(), diff --git a/tests/saga_test.go b/tests/saga_test.go index 3c86325..74cf2b8 100644 --- a/tests/saga_test.go +++ b/tests/saga_test.go @@ -257,6 +257,54 @@ func TestSagaSelfMessaging(t *testing.T) { } +func TestSagaConfFunctions(t *testing.T) { + proceed := make(chan bool) + fail := make(chan bool) + + b := createNamedBusForTest(testSvc1) + + handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error { + + _, routingKey := invocation.Routing() + + if routingKey == "saga.config.functions.not.run" { + fail <- true + } else { + + proceed <- true + } + + return nil + } + + err := b.HandleEvent("test_exchange", "saga.config.functions.not.run", Event1{}, handler) + if err != nil { + t.Fatalf("Registering handler returned false, expected true with error: %s", err.Error()) + } + + err = b.HandleEvent("test_exchange", "saga.config.functions.executed", Event1{}, handler) + if err != nil { + t.Fatalf("Registering handler returned false, expected true with error: %s", err.Error()) + } + + b.RegisterSaga(&ConfigurableSaga{}, func(saga gbus.Saga) gbus.Saga { + sagaInstance := saga.(*ConfigurableSaga) + sagaInstance.nonPersistedSField = "rhinof" + return sagaInstance + }) + + b.Start() + defer b.Shutdown() + + b.Send(context.TODO(), testSvc1, gbus.NewBusMessage(Command1{})) + select { + case <-fail: + t.Fatalf("saga configurtion functions not executed") + case <-proceed: + } + +} + /*Test Sagas*/ type SagaA struct { @@ -279,39 +327,25 @@ func (s *SagaA) New() gbus.Saga { } func (s *SagaA) RegisterAllHandlers(register gbus.HandlerRegister) { - register.HandleMessage(Command1{ - Data: "SagaA.RegisterAllHandlers", - }, s.HandleCommand1) - register.HandleMessage(Command2{ - Data: "SagaA.RegisterAllHandlers", - }, s.HandleCommand2) - register.HandleEvent("test_exchange", "some.topic.1", Event1{ - Data: "SagaA.RegisterAllHandlers", - }, s.HandleEvent1) - register.HandleEvent("test_exchange", "some.topic.2", Event2{ - Data: "SagaA.RegisterAllHandlers", - }, s.HandleEvent1) + register.HandleMessage(Command1{}, s.HandleCommand1) + register.HandleMessage(Command2{}, s.HandleCommand2) + register.HandleEvent("test_exchange", "some.topic.1", Event1{}, s.HandleEvent1) + register.HandleEvent("test_exchange", "some.topic.2", Event2{}, s.HandleEvent1) } func (s *SagaA) HandleCommand1(invocation gbus.Invocation, message *gbus.BusMessage) error { - reply := gbus.NewBusMessage(Reply1{ - Data: "SagaA.HandleCommand1", - }) + reply := gbus.NewBusMessage(Reply1{}) return invocation.Reply(noopTraceContext(), reply) } func (s *SagaA) HandleCommand2(invocation gbus.Invocation, message *gbus.BusMessage) error { log.Println("command2 received") - reply := gbus.NewBusMessage(Reply2{ - Data: "SagaA.HandleCommand2", - }) + reply := gbus.NewBusMessage(Reply2{}) return invocation.Reply(noopTraceContext(), reply) } func (s *SagaA) HandleEvent1(invocation gbus.Invocation, message *gbus.BusMessage) error { - reply := gbus.NewBusMessage(Reply2{ - Data: "SagaA.HandleEvent1", - }) + reply := gbus.NewBusMessage(Reply2{}) log.Println("event1 received") return invocation.Reply(noopTraceContext(), reply) } @@ -440,3 +474,48 @@ func (s *SelfSendingSaga) HandleReply2(invocation gbus.Invocation, message *gbus evt1 := gbus.NewBusMessage(Event1{}) return invocation.Bus().Publish(invocation.Ctx(), "test_exchange", "test_topic", evt1) } + +type ConfigurableSaga struct { + //this field should be set via a saga configuration function + nonPersistedSField string + Complete bool +} + +func (*ConfigurableSaga) StartedBy() []gbus.Message { + starters := make([]gbus.Message, 0) + return append(starters, Command1{}) +} + +func (s *ConfigurableSaga) RegisterAllHandlers(register gbus.HandlerRegister) { + register.HandleMessage(Command1{}, s.HandleCommand1) + register.HandleMessage(Command2{}, s.HandleCommand2) +} + +func (s *ConfigurableSaga) HandleCommand1(invocation gbus.Invocation, message *gbus.BusMessage) error { + + if s.nonPersistedSField == "" { + invocation.Bus().Publish(invocation.Ctx(), "test_exchange", "saga.config.functions.not.run", gbus.NewBusMessage(Event1{})) + return nil + } + + _, selfService := invocation.Routing() + invocation.Bus().Send(invocation.Ctx(), selfService, gbus.NewBusMessage(Command2{})) + return nil +} + +func (s *ConfigurableSaga) HandleCommand2(invocation gbus.Invocation, message *gbus.BusMessage) error { + if s.nonPersistedSField == "" { + invocation.Bus().Publish(invocation.Ctx(), "test_exchange", "saga.config.functions.not.run", gbus.NewBusMessage(Event1{})) + return nil + } + invocation.Bus().Publish(invocation.Ctx(), "test_exchange", "saga.config.functions.executed", gbus.NewBusMessage(Event1{})) + return nil +} + +func (s *ConfigurableSaga) IsComplete() bool { + return s.Complete +} + +func (s *ConfigurableSaga) New() gbus.Saga { + return &ConfigurableSaga{} +} From fc06b816408bb013f3742e0e6f9a31d94d92dc0c Mon Sep 17 00:00:00 2001 From: adiweiss <46281796+adiweiss@users.noreply.github.com> Date: Wed, 15 May 2019 16:50:10 +0300 Subject: [PATCH 09/15] Update SAGA.md --- docs/SAGA.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/SAGA.md b/docs/SAGA.md index b53f062..1470038 100644 --- a/docs/SAGA.md +++ b/docs/SAGA.md @@ -186,7 +186,7 @@ So in order to fulfill our requerment we will need to add the following to our s ```go -func (s *BookVacationSaga) RequestTimeout() time.Duration { +func (s *BookVacationSaga) TimeoutDuration() time.Duration { //request to timeout if after 15 minutes the saga is not complete return time.Minute * 15 } From a019efbbcca012319d4b57a97e3939bfce7039e4 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Thu, 16 May 2019 12:59:04 +0300 Subject: [PATCH 10/15] increasing test timeouts to 60s --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 39e312f..687838d 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -35,7 +35,7 @@ jobs: command: go get -v -t -d ./... - run: name: Run tests - command: go test -v -covermode=count -coverprofile=coverage.out -coverpkg=./... -timeout 30s ./... + command: go test -v -covermode=count -coverprofile=coverage.out -coverpkg=./... -timeout 60s ./... - run: name: Report Coverage command: | From 212181d482333977432dbd7584c22b93588db081 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sun, 19 May 2019 07:27:43 +0300 Subject: [PATCH 11/15] first commit towards supporting persistent timeouts Timeouts now are not passed as bus messages to avoid the need to deal with serialization. Timeout interface changed so that handling a timeout now accepts a transaction and a bus instance instead of an invocation and a bus message --- gbus/abstractions.go | 2 +- gbus/builder/builder.go | 2 +- gbus/saga/glue.go | 30 ++++++++++++++++++------------ gbus/saga/instance.go | 13 ++++++++++++- gbus/saga/timeout.go | 34 ++++++++++++++++++++++++---------- tests/saga_test.go | 7 +++---- 6 files changed, 59 insertions(+), 29 deletions(-) diff --git a/gbus/abstractions.go b/gbus/abstractions.go index b0a6faa..5f95632 100644 --- a/gbus/abstractions.go +++ b/gbus/abstractions.go @@ -126,7 +126,7 @@ type RegisterDeadletterHandler interface { //RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess type RequestSagaTimeout interface { TimeoutDuration() time.Duration - Timeout(invocation Invocation, message *BusMessage) error + Timeout(tx *sql.Tx, bus Messaging) error } //SagaConfFn is a function to allow configuration of a saga in the context of the gbus diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index c09cfd7..5711143 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -99,7 +99,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { panic(err) } } - gb.Glue = saga.NewGlue(gb, sagaStore, svcName) + gb.Glue = saga.NewGlue(gb, sagaStore, svcName, gb.TxProvider) return gb } diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index 6d68ff7..6cd1628 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -75,15 +75,6 @@ func (imsm *Glue) RegisterSaga(saga gbus.Saga, conf ...gbus.SagaConfFn) error { WithFields(log.Fields{"saga_type": def.sagaType.String(), "handles_messages": len(msgNames)}). Info("registered saga with messages") - //register on timeout messages - timeoutEtfs, requestsTimeout := saga.(gbus.RequestSagaTimeout) - if requestsTimeout { - timeoutMessage := gbus.SagaTimeoutMessage{} - timeoutMsgName := timeoutMessage.SchemaName() - _ = def.HandleMessage(timeoutMessage, timeoutEtfs.Timeout) - imsm.addMsgNameToDef(timeoutMsgName, def) - - } return nil } @@ -236,20 +227,35 @@ func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) erro return imsm.bus.HandleEvent(exchange, topic, event, imsm.handler) } +func (imsm *Glue) timeoutSaga(tx *sql.Tx, sagaID string) error { + + saga, err := imsm.sagaStore.GetSagaByID(tx, sagaID) + if err != nil { + return err + } + timeoutErr := saga.timeout(tx, imsm.bus) + if timeoutErr != nil { + imsm.log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga") + return timeoutErr + } + return imsm.sagaStore.DeleteSaga(tx, saga) +} + func (imsm *Glue) log() *log.Entry { return log.WithField("_service", imsm.svcName) } //NewGlue creates a new Sagamanager -func NewGlue(bus gbus.Bus, sagaStore Store, svcName string) *Glue { - return &Glue{ +func NewGlue(bus gbus.Bus, sagaStore Store, svcName string, txp gbus.TxProvider) *Glue { + g := &Glue{ svcName: svcName, bus: bus, sagaDefs: make([]*Def, 0), lock: &sync.Mutex{}, alreadyRegistred: make(map[string]bool), msgToDefMap: make(map[string][]*Def), - timeoutManger: TimeoutManager{bus: bus}, sagaStore: sagaStore, } + g.timeoutManger = TimeoutManager{bus: bus, txp: txp, glue: g} + return g } diff --git a/gbus/saga/instance.go b/gbus/saga/instance.go index 5cdf99d..f40f5e8 100644 --- a/gbus/saga/instance.go +++ b/gbus/saga/instance.go @@ -1,6 +1,7 @@ package saga import ( + "database/sql" "fmt" "log" "reflect" @@ -16,6 +17,7 @@ type Instance struct { ConcurrencyCtrl int UnderlyingInstance gbus.Saga MsgToMethodMap []*MsgToFuncPair + timeoutFuncName string } func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocation, message *gbus.BusMessage) error { @@ -77,8 +79,17 @@ func (si *Instance) requestsTimeout() (bool, time.Duration) { return canTimeout, timeoutDuration } -//NewInstance create a new instance of a Saga +func (si *Instance) timeout(tx *sql.Tx, bus gbus.Messaging) error { + + saga, canTimeout := si.UnderlyingInstance.(gbus.RequestSagaTimeout) + if !canTimeout { + return fmt.Errorf("saga instance does not support timeouts") + + } + return saga.Timeout(tx, bus) +} +//NewInstance create a new instance of a Saga func NewInstance(sagaType reflect.Type, msgToMethodMap []*MsgToFuncPair, confFns ...gbus.SagaConfFn) *Instance { var newSagaPtr interface{} diff --git a/gbus/saga/timeout.go b/gbus/saga/timeout.go index 4592c35..6f492db 100644 --- a/gbus/saga/timeout.go +++ b/gbus/saga/timeout.go @@ -1,17 +1,17 @@ package saga import ( - "context" "time" - "github.com/sirupsen/logrus" "github.com/wework/grabbit/gbus" ) //TimeoutManager manages timeouts for sagas //TODO:Make it persistent type TimeoutManager struct { - bus gbus.Bus + bus gbus.Bus + glue *Glue + txp gbus.TxProvider } //RequestTimeout requests a timeout from the timeout manager @@ -20,13 +20,27 @@ func (tm *TimeoutManager) RequestTimeout(svcName, sagaID string, duration time.D go func(svcName, sagaID string, tm *TimeoutManager) { c := time.After(duration) <-c - reuqestTimeout := gbus.SagaTimeoutMessage{ - SagaID: sagaID} - msg := gbus.NewBusMessage(reuqestTimeout) - msg.SagaCorrelationID = sagaID - if err := tm.bus.Send(context.Background(), svcName, msg); err != nil { - //TODO: add logger - logrus.WithError(err).Error("could not send timeout to bus") + if tm.txp == nil { + tm.glue.timeoutSaga(nil, sagaID) + return + } + tx, txe := tm.txp.New() + if txe != nil { + tm.glue.log().WithError(txe).Warn("timeout manager failed to create a transaction") + } else { + callErr := tm.glue.timeoutSaga(tx, sagaID) + if callErr != nil { + tm.glue.log().WithError(callErr).WithField("sagaID", sagaID).Error("timing out a saga failed") + rlbe := tx.Rollback() + if rlbe != nil { + tm.glue.log().WithError(rlbe).Warn("timeout manager failed to rollback transaction") + } + } else { + cmte := tx.Commit() + if cmte != nil { + tm.glue.log().WithError(cmte).Warn("timeout manager failed to rollback transaction") + } + } } }(svcName, sagaID, tm) diff --git a/tests/saga_test.go b/tests/saga_test.go index 3c86325..5d67671 100644 --- a/tests/saga_test.go +++ b/tests/saga_test.go @@ -2,6 +2,7 @@ package tests import ( "context" + "database/sql" "log" "reflect" "testing" @@ -397,11 +398,9 @@ func (s *TimingOutSaga) TimeoutDuration() time.Duration { return time.Second * 1 } -func (s *TimingOutSaga) Timeout(invocation gbus.Invocation, message *gbus.BusMessage) error { +func (s *TimingOutSaga) Timeout(tx *sql.Tx, bus gbus.Messaging) error { s.TimedOut = true - return invocation.Bus().Publish(noopTraceContext(), "test_exchange", "some.topic.1", gbus.NewBusMessage(Event1{ - Data: "TimingOutSaga.Timeout", - })) + return bus.Publish(context.Background(), "test_exchange", "some.topic.1", gbus.NewBusMessage(Event1{})) } type SelfSendingSaga struct { From e45a8a1f6ef9eda624a353cd6bc9f2e81fc1241e Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sun, 19 May 2019 07:33:58 +0300 Subject: [PATCH 12/15] a timed out saga only gets deleted if it is complete This allows saga instances ifthey choose to continue react to messages after they timeout --- gbus/saga/glue.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index 6cd1628..d9283eb 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -238,7 +238,10 @@ func (imsm *Glue) timeoutSaga(tx *sql.Tx, sagaID string) error { imsm.log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga") return timeoutErr } - return imsm.sagaStore.DeleteSaga(tx, saga) + if saga.isComplete() { + return imsm.sagaStore.DeleteSaga(tx, saga) + } + return nil } func (imsm *Glue) log() *log.Entry { From dd62b4332c3c39e337141b91b5707708475eee53 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sun, 19 May 2019 07:50:01 +0300 Subject: [PATCH 13/15] deleteing the saga instance after timeout only if it is complete otherwise updating it In addition fixing some linting errors --- gbus/saga/glue.go | 15 +++++---------- gbus/saga/timeout.go | 6 +++++- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/gbus/saga/glue.go b/gbus/saga/glue.go index d9283eb..0bc8104 100644 --- a/gbus/saga/glue.go +++ b/gbus/saga/glue.go @@ -151,7 +151,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) return invkErr } - return imsm.completeOrUpdateSaga(invocation.Tx(), instance, message) + return imsm.completeOrUpdateSaga(invocation.Tx(), instance) } else if message.Semantics == gbus.CMD { e := fmt.Errorf("Warning:Command or Reply message with no saga reference received. message will be dropped.\nmessage as of type:%v", reflect.TypeOf(message).Name()) @@ -172,7 +172,7 @@ func (imsm *Glue) handler(invocation gbus.Invocation, message *gbus.BusMessage) imsm.log().WithError(invkErr).WithField("saga_id", instance.ID).Error("failed to invoke saga") return invkErr } - e = imsm.completeOrUpdateSaga(invocation.Tx(), instance, message) + e = imsm.completeOrUpdateSaga(invocation.Tx(), instance) if e != nil { return e } @@ -196,11 +196,9 @@ func (imsm *Glue) invokeSagaInstance(instance *Instance, invocation gbus.Invocat return instance.invoke(exchange, routingKey, sginv, message) } -func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance, lastMessage *gbus.BusMessage) error { +func (imsm *Glue) completeOrUpdateSaga(tx *sql.Tx, instance *Instance) error { - _, timedOut := lastMessage.Payload.(gbus.SagaTimeoutMessage) - - if instance.isComplete() || timedOut { + if instance.isComplete() { imsm.log().WithField("saga_id", instance.ID).Info("saga has completed and will be deleted") return imsm.sagaStore.DeleteSaga(tx, instance) @@ -238,10 +236,7 @@ func (imsm *Glue) timeoutSaga(tx *sql.Tx, sagaID string) error { imsm.log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga") return timeoutErr } - if saga.isComplete() { - return imsm.sagaStore.DeleteSaga(tx, saga) - } - return nil + return imsm.completeOrUpdateSaga(tx, saga) } func (imsm *Glue) log() *log.Entry { diff --git a/gbus/saga/timeout.go b/gbus/saga/timeout.go index 6f492db..4012353 100644 --- a/gbus/saga/timeout.go +++ b/gbus/saga/timeout.go @@ -20,8 +20,12 @@ func (tm *TimeoutManager) RequestTimeout(svcName, sagaID string, duration time.D go func(svcName, sagaID string, tm *TimeoutManager) { c := time.After(duration) <-c + //TODO:if the bus is not transactional, moving forward we should not allow using sagas in a non transactional bus if tm.txp == nil { - tm.glue.timeoutSaga(nil, sagaID) + tme := tm.glue.timeoutSaga(nil, sagaID) + if tme != nil { + tm.glue.log().WithError(tme).WithField("sagaID", sagaID).Error("timing out a saga failed") + } return } tx, txe := tm.txp.New() From d05664fe1f465006e7226a5c736271e635c1b0a4 Mon Sep 17 00:00:00 2001 From: Guy Baron Date: Sun, 19 May 2019 08:16:32 +0300 Subject: [PATCH 14/15] removing unused field in saga instance (linting error) --- gbus/saga/instance.go | 1 - 1 file changed, 1 deletion(-) diff --git a/gbus/saga/instance.go b/gbus/saga/instance.go index f40f5e8..2b3c889 100644 --- a/gbus/saga/instance.go +++ b/gbus/saga/instance.go @@ -17,7 +17,6 @@ type Instance struct { ConcurrencyCtrl int UnderlyingInstance gbus.Saga MsgToMethodMap []*MsgToFuncPair - timeoutFuncName string } func (si *Instance) invoke(exchange, routingKey string, invocation gbus.Invocation, message *gbus.BusMessage) error { From 80be1cc30e05f5b090d4ad6e971c0281f2adffd8 Mon Sep 17 00:00:00 2001 From: adiweiss <46281796+adiweiss@users.noreply.github.com> Date: Wed, 22 May 2019 14:45:44 +0300 Subject: [PATCH 15/15] Update SAGA.md update documentation after changes of timeout interface : https://github.com/wework/grabbit/pull/78/files --- docs/SAGA.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/SAGA.md b/docs/SAGA.md index b53f062..a3534b0 100644 --- a/docs/SAGA.md +++ b/docs/SAGA.md @@ -178,7 +178,7 @@ In order to define a timeout for the saga and have grabbit call the saga instanc ```go type RequestSagaTimeout interface { TimeoutDuration() time.Duration - Timeout(invocation Invocation, message *BusMessage) error + Timeout(tx *sql.Tx, bus Messaging) error } ``` @@ -192,7 +192,7 @@ func (s *BookVacationSaga) RequestTimeout() time.Duration { } func (s *BookVacationSaga) Timeout(invocation gbus.Invocation, message *gbus.BusMessage) error { - return invocation.Bus().Publish(invocation.Ctx(), "some_exchange", "some.topic.1", gbus.NewBusMessage(VacationBookingTimedOut{})) + return bus.Publish(context.Background(), "some_exchange", "some.topic.1", gbus.NewBusMessage(VacationBookingTimedOut{})) } ```