diff --git a/gbus/builder/builder.go b/gbus/builder/builder.go index 332ec90..9b4c0d1 100644 --- a/gbus/builder/builder.go +++ b/gbus/builder/builder.go @@ -9,7 +9,6 @@ import ( "github.com/wework/grabbit/gbus" "github.com/wework/grabbit/gbus/saga" - "github.com/wework/grabbit/gbus/saga/stores" "github.com/wework/grabbit/gbus/serialization" "github.com/wework/grabbit/gbus/tx/mysql" ) @@ -37,9 +36,7 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { gb := &gbus.DefaultBus{ AmqpConnStr: builder.connStr, PrefetchCount: builder.PrefetchCount, - Outgoing: &gbus.AMQPOutbox{ - SvcName: svcName, - }, + SvcName: svcName, PurgeOnStartup: builder.purgeOnStartup, DelayedSubscriptions: [][]string{}, @@ -47,7 +44,6 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { RPCLock: &sync.Mutex{}, SenderLock: &sync.Mutex{}, ConsumerLock: &sync.Mutex{}, - IsTxnl: builder.txnl, Registrations: make([]*gbus.Registration, 0), RPCHandlers: make(map[string]gbus.MessageHandler), Serializer: builder.serializer, @@ -72,36 +68,30 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus { sagaStore saga.Store timeoutManager gbus.TimeoutManager ) - if builder.txnl { - gb.IsTxnl = true - switch builder.txnlProvider { - case "mysql": - mysqltx, err := mysql.NewTxProvider(builder.txConnStr) + switch builder.txnlProvider { + + case "mysql": + mysqltx, err := mysql.NewTxProvider(builder.txConnStr) + if err != nil { + panic(err) + } + gb.TxProvider = mysqltx + //TODO move purge logic into the NewSagaStore factory method + sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx) + if builder.purgeOnStartup { + err := sagaStore.Purge() if err != nil { panic(err) } - gb.TxProvider = mysqltx - //TODO move purge logic into the NewSagaStore factory method - sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx) - if builder.purgeOnStartup { - err := sagaStore.Purge() - if err != nil { - panic(err) - } - } - gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup) - timeoutManager = mysql.NewTimeoutManager(gb, gb.TxProvider, gb.Log, svcName, builder.purgeOnStartup) - - default: - err := fmt.Errorf("no provider found for passed in value %v", builder.txnlProvider) - panic(err) } - } else { - sagaStore = stores.NewInMemoryStore() - timeoutManager = &saga.InMemoryTimeoutManager{} - } + gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup) + timeoutManager = mysql.NewTimeoutManager(gb, gb.TxProvider, gb.Log, svcName, builder.purgeOnStartup) + default: + err := fmt.Errorf("no provider found for passed in value %v", builder.txnlProvider) + panic(err) + } if builder.usingPingTimeout { gb.DbPingTimeout = builder.dbPingTimeout } diff --git a/gbus/bus.go b/gbus/bus.go index b86af33..a0e7c4c 100644 --- a/gbus/bus.go +++ b/gbus/bus.go @@ -26,7 +26,6 @@ var _ Bus = &DefaultBus{} type DefaultBus struct { *Safety *Glogged - Outgoing *AMQPOutbox Outbox TxOutbox PrefetchCount uint AmqpConnStr string @@ -41,6 +40,7 @@ type DefaultBus struct { amqpErrors chan *amqp.Error amqpBlocks chan amqp.Blocking Registrations []*Registration + amqpOutbox *AMQPOutbox RPCHandlers map[string]MessageHandler deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error @@ -54,16 +54,16 @@ type DefaultBus struct { started bool Glue SagaGlue TxProvider TxProvider - IsTxnl bool - WorkerNum uint - Serializer Serializer - DLX string - DefaultPolicies []MessagePolicy - Confirm bool - healthChan chan error - backpressure bool - DbPingTimeout time.Duration - amqpConnected bool + + WorkerNum uint + Serializer Serializer + DLX string + DefaultPolicies []MessagePolicy + Confirm bool + healthChan chan error + backpressure bool + DbPingTimeout time.Duration + amqpConnected bool } var ( @@ -203,37 +203,28 @@ func (b *DefaultBus) Start() error { b.egressConn.NotifyClose(b.amqpErrors) b.egressConn.NotifyBlocked(b.amqpBlocks) b.egressChannel.NotifyClose(b.amqpErrors) - //TODO:Figure out what should be done - //init the outbox that sends the messages to the amqp transport and handles publisher confirms - if e := b.Outgoing.init(b.egressChannel, b.Confirm, true); e != nil { - return e - } /* start the transactional outbox, make sure calling b.TxOutgoing.Start() is done only after b.Outgoing.init is called TODO://the design is crap and needs to be refactored */ - if b.IsTxnl { - - var amqpChan *amqp.Channel - if amqpChan, e = b.createAMQPChannel(b.egressConn); e != nil { - b.Log().WithError(e).Error("failed to create amqp channel for transactional outbox") - return e - } - amqpChan.NotifyClose(b.amqpErrors) - amqpOutbox := &AMQPOutbox{ - SvcName: b.SvcName, - } - err := amqpOutbox.init(amqpChan, b.Confirm, false) - if err != nil { - b.Log().WithError(err).Error("failed initializing amqpOutbox") - return err - } - if startErr := b.Outbox.Start(amqpOutbox); startErr != nil { - b.Log().WithError(startErr).Error("failed to start transactional outbox") - return startErr - } - + var amqpChan *amqp.Channel + if amqpChan, e = b.createAMQPChannel(b.egressConn); e != nil { + b.Log().WithError(e).Error("failed to create amqp channel for transactional outbox") + return e + } + amqpChan.NotifyClose(b.amqpErrors) + b.amqpOutbox = &AMQPOutbox{ + SvcName: b.SvcName, + } + err := b.amqpOutbox.init(amqpChan, b.Confirm, false) + if err != nil { + b.Log().WithError(err).Error("failed initializing amqpOutbox") + return err + } + if startErr := b.Outbox.Start(b.amqpOutbox); startErr != nil { + b.Log().WithError(startErr).Error("failed to start transactional outbox") + return startErr } //declare queue @@ -244,10 +235,10 @@ func (b *DefaultBus) Start() error { b.serviceQueue = q //bind queue - err := b.bindServiceQueue() - if err != nil { + bindErr := b.bindServiceQueue() + if bindErr != nil { b.Log().WithError(err).Error("could not bind service to queue") - return err + return bindErr } //declare rpc queue @@ -299,7 +290,6 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) { q: b.serviceQueue, rpcq: b.rpcQueue, svcName: b.SvcName, - isTxnl: b.IsTxnl, txProvider: b.TxProvider, rpcLock: b.RPCLock, rpcHandlers: b.RPCHandlers, @@ -339,23 +329,19 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) { return err } } - b.Outgoing.shutdown() if err := b.Glue.Stop(); err != nil { return err } b.started = false - if b.IsTxnl { - - err := b.Outbox.Stop() - - if err != nil { - b.Log().WithError(err).Error("could not shutdown outbox") - return err - } - b.TxProvider.Dispose() + err := b.Outbox.Stop() + if err != nil { + b.Log().WithError(err).Error("could not shutdown outbox") + return err } + b.amqpOutbox.Shutdown() + b.TxProvider.Dispose() return nil } @@ -370,11 +356,8 @@ func (b *DefaultBus) NotifyHealth(health chan error) { //GetHealth implements Health.GetHealth func (b *DefaultBus) GetHealth() HealthCard { - var dbConnected bool - if b.IsTxnl { - dbConnected = b.TxProvider.Ping(b.DbPingTimeout) - } + dbConnected := b.TxProvider.Ping(b.DbPingTimeout) return HealthCard{ DbConnected: dbConnected, @@ -387,11 +370,11 @@ func (b *DefaultBus) withTx(action func(tx *sql.Tx) error, ambientTx *sql.Tx) er var shouldCommitTx bool var activeTx *sql.Tx //create a new transaction only if there is no active one already passed in - if b.IsTxnl && ambientTx == nil { + if ambientTx == nil { /* if the passed in ambient transaction is not nil it means that some caller has created the transaction - and knows when should this transaction bee committed or rolledback. + and knows when should this transaction be committed or rolledback. In these cases we only invoke the passed in action with the passed in transaction and do not commit/rollback the transaction.action If no ambient transaction is passed in then we create a new transaction and commit or rollback after @@ -414,11 +397,7 @@ func (b *DefaultBus) withTx(action func(tx *sql.Tx) error, ambientTx *sql.Tx) er } actionErr := b.SafeWithRetries(retryAction, MaxRetryCount) - /* - if the bus is transactional and there is no ambient transaction then create a new one else use the ambient tranaction. - if the bus is not transactional a nil transaction reference will be passed - */ - if b.IsTxnl && shouldCommitTx { + if shouldCommitTx { if actionErr != nil { err := activeTx.Rollback() if err != nil { @@ -464,7 +443,12 @@ func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *Bu rpcID: rpcID} b.Serializer.Register(reply.Payload) - err := b.sendImpl(ctx, nil, service, b.rpcQueue.Name, "", "", request, rpc) + + sendRPC := func(tx *sql.Tx) error { + return b.sendImpl(ctx, tx, service, b.rpcQueue.Name, "", "", request, rpc) + } + + err := b.withTx(sendRPC, nil) if err != nil { b.Log().WithError(err).Error("could not send message") return nil, err @@ -624,22 +608,13 @@ func (b *DefaultBus) monitorAMQPErrors() { func (b *DefaultBus) publish(tx *sql.Tx, exchange, routingKey string, msg *amqp.Publishing) error { publish := func() error { - //send to the transactional outbox if the bus is transactional - //otherwise send directly to amqp - if b.IsTxnl && tx != nil { - b.Log().WithField("message_id", msg.MessageId).Debug("sending message to outbox") - saveErr := b.Outbox.Save(tx, exchange, routingKey, *msg) - if saveErr != nil { - b.Log().WithError(saveErr).Error("failed to save to transactional outbox") - } - return saveErr - } - //do not attempt to contact the borker if backpressure is being applied - if b.backpressure { - return errors.New("can't send message due to backpressure from amqp broker") + + b.Log().WithField("message_id", msg.MessageId).Debug("sending message to outbox") + saveErr := b.Outbox.Save(tx, exchange, routingKey, *msg) + if saveErr != nil { + b.Log().WithError(saveErr).Error("failed to save to transactional outbox") } - _, outgoingErr := b.Outgoing.Post(exchange, routingKey, *msg) - return outgoingErr + return saveErr } //currently only one thread can publish at a time //TODO:add a publishing workers diff --git a/gbus/outbox.go b/gbus/outbox.go index 323a37a..3358250 100644 --- a/gbus/outbox.go +++ b/gbus/outbox.go @@ -50,8 +50,8 @@ func (out *AMQPOutbox) init(amqp *amqp.Channel, confirm, resendOnNack bool) erro return nil } -func (out *AMQPOutbox) shutdown() { - out.stop <- true +func (out *AMQPOutbox) Shutdown() { + close(out.stop) } diff --git a/gbus/saga/inmemory_timeout.go b/gbus/saga/inmemory_timeout.go deleted file mode 100644 index f9fd499..0000000 --- a/gbus/saga/inmemory_timeout.go +++ /dev/null @@ -1,72 +0,0 @@ -package saga - -import ( - "database/sql" - "time" - - "github.com/wework/grabbit/gbus" -) - -var _ gbus.TimeoutManager = &InMemoryTimeoutManager{} - -//InMemoryTimeoutManager should not be used in production -type InMemoryTimeoutManager struct { - glue *Glue - txp gbus.TxProvider -} - -//RegisterTimeout requests a timeout from the timeout manager -func (tm *InMemoryTimeoutManager) RegisterTimeout(tx *sql.Tx, sagaID string, duration time.Duration) error { - - go func(svcName, sagaID string, tm *InMemoryTimeoutManager) { - c := time.After(duration) - <-c - //TODO:if the bus is not transactional, moving forward we should not allow using sagas in a non transactional bus - if tm.txp == nil { - tme := tm.glue.TimeoutSaga(nil, sagaID) - if tme != nil { - tm.glue.Log().WithError(tme).WithField("sagaID", sagaID).Error("timing out a saga failed") - } - return - } - tx, txe := tm.txp.New() - if txe != nil { - tm.glue.Log().WithError(txe).Warn("timeout manager failed to create a transaction") - } else { - callErr := tm.glue.TimeoutSaga(tx, sagaID) - if callErr != nil { - tm.glue.Log().WithError(callErr).WithField("sagaID", sagaID).Error("timing out a saga failed") - rlbe := tx.Rollback() - if rlbe != nil { - tm.glue.Log().WithError(rlbe).Warn("timeout manager failed to rollback transaction") - } - } else { - cmte := tx.Commit() - if cmte != nil { - tm.glue.Log().WithError(cmte).Warn("timeout manager failed to rollback transaction") - } - } - } - - }(tm.glue.svcName, sagaID, tm) - - return nil -} - -//ClearTimeout clears a timeout for a specific saga -func (tm *InMemoryTimeoutManager) ClearTimeout(tx *sql.Tx, sagaID string) error { - return nil -} - -//SetTimeoutFunction accepts the timeouting function -func (tm *InMemoryTimeoutManager) SetTimeoutFunction(fun func(tx *sql.Tx, sagaID string) error) {} - -//Start starts the timeout manager -func (tm *InMemoryTimeoutManager) Start() error { - return nil -} - -//Stop starts the timeout manager -func (tm *InMemoryTimeoutManager) Stop() error { - return nil -} diff --git a/gbus/saga/invocation.go b/gbus/saga/invocation.go index 18c85e6..b2cfbed 100644 --- a/gbus/saga/invocation.go +++ b/gbus/saga/invocation.go @@ -83,7 +83,3 @@ func (si *sagaInvocation) Routing() (exchange, routingKey string) { func (si *sagaInvocation) DeliveryInfo() gbus.DeliveryInfo { return si.decoratedInvocation.DeliveryInfo() } - -//func (si *sagaInvocation) Log() logrus.FieldLogger { -// return si.decoratedInvocation.Log().WithField("saga_id", si.sagaID) -//} diff --git a/gbus/saga/stores/memstore.go b/gbus/saga/stores/memstore.go deleted file mode 100644 index 0d43734..0000000 --- a/gbus/saga/stores/memstore.go +++ /dev/null @@ -1,100 +0,0 @@ -package stores - -import ( - "database/sql" - "errors" - "reflect" - - "github.com/wework/grabbit/gbus" - "github.com/wework/grabbit/gbus/saga" -) - -//InMemorySagaStore stores the saga instances in-memory, not intended for production use -type InMemorySagaStore struct { - instances map[reflect.Type][]*saga.Instance -} - -//GetSagaByID implements SagaStore.GetSagaByID -func (store *InMemorySagaStore) GetSagaByID(tx *sql.Tx, sagaID string) (*saga.Instance, error) { - for _, instances := range store.instances { - for _, instance := range instances { - if instance.ID == sagaID { - return instance, nil - } - } - } - return nil, errors.New("no saga found for provided id") -} - -//RegisterSagaType implements SagaStore.RegisterSagaType -func (store *InMemorySagaStore) RegisterSagaType(saga gbus.Saga) { - -} - -//SaveNewSaga implements SagaStore.SaveNewSaga -func (store *InMemorySagaStore) SaveNewSaga(tx *sql.Tx, sagaType reflect.Type, newInstance *saga.Instance) error { - instances := store.instances[sagaType] - if instances == nil { - instances = make([]*saga.Instance, 0) - - } - instances = append(instances, newInstance) - store.instances[sagaType] = instances - - return nil - -} - -//UpdateSaga implements SagaStore.UpdateSaga -func (store *InMemorySagaStore) UpdateSaga(tx *sql.Tx, instance *saga.Instance) error { - - return nil -} - -//DeleteSaga implements SagaStore.DeleteSaga -func (store *InMemorySagaStore) DeleteSaga(tx *sql.Tx, instance *saga.Instance) error { - - for key, value := range store.instances { - var sagaIndexFound bool - var sagaIndexToDelete int - for i := 0; i < len(value); i++ { - candidate := value[i] - if candidate.ID == instance.ID { - sagaIndexToDelete = i - sagaIndexFound = true - break - } - } - if sagaIndexFound { - value[sagaIndexToDelete] = value[len(value)-1] - value = value[:len(value)-1] - store.instances[key] = value - - } - } - return nil -} - -//GetSagasByType implements SagaStore.GetSagasByType -func (store *InMemorySagaStore) GetSagasByType(tx *sql.Tx, t reflect.Type) ([]*saga.Instance, error) { - instances := make([]*saga.Instance, 0) - - for key, val := range store.instances { - if key == t { - instances = append(instances, val...) - } - } - - return instances, nil -} - -//Purge is used for nothing in this case since the store is not persisted use pgstore or mysqlstore -func (store *InMemorySagaStore) Purge() error { - return nil -} - -//NewInMemoryStore is a factory method for the InMemorySagaStore -func NewInMemoryStore() saga.Store { - return &InMemorySagaStore{ - instances: make(map[reflect.Type][]*saga.Instance)} -} diff --git a/gbus/worker.go b/gbus/worker.go index 3dc3b05..1a16a02 100644 --- a/gbus/worker.go +++ b/gbus/worker.go @@ -37,7 +37,6 @@ type worker struct { registrations []*Registration rpcHandlers map[string]MessageHandler deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error - isTxnl bool b *DefaultBus serializer Serializer txProvider TxProvider @@ -338,16 +337,13 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan // each retry should run a new and separate transaction which should end with a commit or rollback action := func(attempt uint) (actionErr error) { - var tx *sql.Tx - var txCreateErr error - if worker.isTxnl { - tx, txCreateErr = worker.txProvider.New() + + tx, txCreateErr := worker.txProvider.New() if txCreateErr != nil { worker.log().WithError(txCreateErr).Error("failed creating new tx") worker.span.LogFields(slog.Error(txCreateErr)) return txCreateErr } - } worker.span, sctx = opentracing.StartSpanFromContext(sctx, "invokeHandlers") worker.span.LogFields(slog.Uint64("attempt", uint64(attempt+1))) @@ -356,12 +352,10 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan pncMsg := fmt.Sprintf("%v\n%s", p, debug.Stack()) worker.log().WithField("stack", pncMsg).Error("recovered from panic while invoking handler") actionErr = errors.New(pncMsg) - if worker.isTxnl { - rbkErr := tx.Rollback() + rbkErr := tx.Rollback() if rbkErr != nil { worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler panic") } - } worker.span.LogFields(slog.Error(actionErr)) } worker.span.Finish() @@ -397,22 +391,18 @@ func (worker *worker) invokeHandlers(sctx context.Context, handlers []MessageHan } if handlerErr != nil { hspan.LogFields(slog.Error(handlerErr)) - if worker.isTxnl { - rbkErr := tx.Rollback() + rbkErr := tx.Rollback() if rbkErr != nil { worker.log().WithError(rbkErr).Error("failed rolling back transaction when recovering from handler error") } - } hspan.Finish() return handlerErr } - if worker.isTxnl { - cmtErr := tx.Commit() + cmtErr := tx.Commit() if cmtErr != nil { worker.log().WithError(cmtErr).Error("failed committing transaction after invoking handlers") return cmtErr } - } return nil }