Skip to content

Commit

Permalink
removed non transactional bus mode (#120)
Browse files Browse the repository at this point in the history
  • Loading branch information
Guy Baron committed Aug 14, 2019
1 parent de202b1 commit 5b6de51
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 300 deletions.
48 changes: 19 additions & 29 deletions gbus/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/saga"
"github.com/wework/grabbit/gbus/saga/stores"
"github.com/wework/grabbit/gbus/serialization"
"github.com/wework/grabbit/gbus/tx/mysql"
)
Expand Down Expand Up @@ -37,17 +36,14 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
gb := &gbus.DefaultBus{
AmqpConnStr: builder.connStr,
PrefetchCount: builder.PrefetchCount,
Outgoing: &gbus.AMQPOutbox{
SvcName: svcName,
},

SvcName: svcName,
PurgeOnStartup: builder.purgeOnStartup,
DelayedSubscriptions: [][]string{},
HandlersLock: &sync.Mutex{},
RPCLock: &sync.Mutex{},
SenderLock: &sync.Mutex{},
ConsumerLock: &sync.Mutex{},
IsTxnl: builder.txnl,
Registrations: make([]*gbus.Registration, 0),
RPCHandlers: make(map[string]gbus.MessageHandler),
Serializer: builder.serializer,
Expand All @@ -72,36 +68,30 @@ func (builder *defaultBuilder) Build(svcName string) gbus.Bus {
sagaStore saga.Store
timeoutManager gbus.TimeoutManager
)
if builder.txnl {
gb.IsTxnl = true
switch builder.txnlProvider {

case "mysql":
mysqltx, err := mysql.NewTxProvider(builder.txConnStr)
switch builder.txnlProvider {

case "mysql":
mysqltx, err := mysql.NewTxProvider(builder.txConnStr)
if err != nil {
panic(err)
}
gb.TxProvider = mysqltx
//TODO move purge logic into the NewSagaStore factory method
sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx)
if builder.purgeOnStartup {
err := sagaStore.Purge()
if err != nil {
panic(err)
}
gb.TxProvider = mysqltx
//TODO move purge logic into the NewSagaStore factory method
sagaStore = mysql.NewSagaStore(gb.SvcName, mysqltx)
if builder.purgeOnStartup {
err := sagaStore.Purge()
if err != nil {
panic(err)
}
}
gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup)
timeoutManager = mysql.NewTimeoutManager(gb, gb.TxProvider, gb.Log, svcName, builder.purgeOnStartup)

default:
err := fmt.Errorf("no provider found for passed in value %v", builder.txnlProvider)
panic(err)
}
} else {
sagaStore = stores.NewInMemoryStore()
timeoutManager = &saga.InMemoryTimeoutManager{}
}
gb.Outbox = mysql.NewOutbox(gb.SvcName, mysqltx, builder.purgeOnStartup)
timeoutManager = mysql.NewTimeoutManager(gb, gb.TxProvider, gb.Log, svcName, builder.purgeOnStartup)

default:
err := fmt.Errorf("no provider found for passed in value %v", builder.txnlProvider)
panic(err)
}
if builder.usingPingTimeout {
gb.DbPingTimeout = builder.dbPingTimeout
}
Expand Down
131 changes: 53 additions & 78 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ var _ Bus = &DefaultBus{}
type DefaultBus struct {
*Safety
*Glogged
Outgoing *AMQPOutbox
Outbox TxOutbox
PrefetchCount uint
AmqpConnStr string
Expand All @@ -41,6 +40,7 @@ type DefaultBus struct {
amqpErrors chan *amqp.Error
amqpBlocks chan amqp.Blocking
Registrations []*Registration
amqpOutbox *AMQPOutbox

RPCHandlers map[string]MessageHandler
deadletterHandler func(tx *sql.Tx, poision amqp.Delivery) error
Expand All @@ -54,16 +54,16 @@ type DefaultBus struct {
started bool
Glue SagaGlue
TxProvider TxProvider
IsTxnl bool
WorkerNum uint
Serializer Serializer
DLX string
DefaultPolicies []MessagePolicy
Confirm bool
healthChan chan error
backpressure bool
DbPingTimeout time.Duration
amqpConnected bool

WorkerNum uint
Serializer Serializer
DLX string
DefaultPolicies []MessagePolicy
Confirm bool
healthChan chan error
backpressure bool
DbPingTimeout time.Duration
amqpConnected bool
}

var (
Expand Down Expand Up @@ -203,37 +203,28 @@ func (b *DefaultBus) Start() error {
b.egressConn.NotifyClose(b.amqpErrors)
b.egressConn.NotifyBlocked(b.amqpBlocks)
b.egressChannel.NotifyClose(b.amqpErrors)
//TODO:Figure out what should be done

//init the outbox that sends the messages to the amqp transport and handles publisher confirms
if e := b.Outgoing.init(b.egressChannel, b.Confirm, true); e != nil {
return e
}
/*
start the transactional outbox, make sure calling b.TxOutgoing.Start() is done only after b.Outgoing.init is called
TODO://the design is crap and needs to be refactored
*/
if b.IsTxnl {

var amqpChan *amqp.Channel
if amqpChan, e = b.createAMQPChannel(b.egressConn); e != nil {
b.Log().WithError(e).Error("failed to create amqp channel for transactional outbox")
return e
}
amqpChan.NotifyClose(b.amqpErrors)
amqpOutbox := &AMQPOutbox{
SvcName: b.SvcName,
}
err := amqpOutbox.init(amqpChan, b.Confirm, false)
if err != nil {
b.Log().WithError(err).Error("failed initializing amqpOutbox")
return err
}
if startErr := b.Outbox.Start(amqpOutbox); startErr != nil {
b.Log().WithError(startErr).Error("failed to start transactional outbox")
return startErr
}

var amqpChan *amqp.Channel
if amqpChan, e = b.createAMQPChannel(b.egressConn); e != nil {
b.Log().WithError(e).Error("failed to create amqp channel for transactional outbox")
return e
}
amqpChan.NotifyClose(b.amqpErrors)
b.amqpOutbox = &AMQPOutbox{
SvcName: b.SvcName,
}
err := b.amqpOutbox.init(amqpChan, b.Confirm, false)
if err != nil {
b.Log().WithError(err).Error("failed initializing amqpOutbox")
return err
}
if startErr := b.Outbox.Start(b.amqpOutbox); startErr != nil {
b.Log().WithError(startErr).Error("failed to start transactional outbox")
return startErr
}

//declare queue
Expand All @@ -244,10 +235,10 @@ func (b *DefaultBus) Start() error {
b.serviceQueue = q

//bind queue
err := b.bindServiceQueue()
if err != nil {
bindErr := b.bindServiceQueue()
if bindErr != nil {
b.Log().WithError(err).Error("could not bind service to queue")
return err
return bindErr
}

//declare rpc queue
Expand Down Expand Up @@ -299,7 +290,6 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
q: b.serviceQueue,
rpcq: b.rpcQueue,
svcName: b.SvcName,
isTxnl: b.IsTxnl,
txProvider: b.TxProvider,
rpcLock: b.RPCLock,
rpcHandlers: b.RPCHandlers,
Expand Down Expand Up @@ -339,23 +329,19 @@ func (b *DefaultBus) Shutdown() (shutdwonErr error) {
return err
}
}
b.Outgoing.shutdown()

if err := b.Glue.Stop(); err != nil {
return err
}
b.started = false
if b.IsTxnl {

err := b.Outbox.Stop()

if err != nil {
b.Log().WithError(err).Error("could not shutdown outbox")
return err
}
b.TxProvider.Dispose()
err := b.Outbox.Stop()

if err != nil {
b.Log().WithError(err).Error("could not shutdown outbox")
return err
}
b.amqpOutbox.Shutdown()
b.TxProvider.Dispose()

return nil
}
Expand All @@ -370,11 +356,8 @@ func (b *DefaultBus) NotifyHealth(health chan error) {

//GetHealth implements Health.GetHealth
func (b *DefaultBus) GetHealth() HealthCard {
var dbConnected bool

if b.IsTxnl {
dbConnected = b.TxProvider.Ping(b.DbPingTimeout)
}
dbConnected := b.TxProvider.Ping(b.DbPingTimeout)

return HealthCard{
DbConnected: dbConnected,
Expand All @@ -387,11 +370,11 @@ func (b *DefaultBus) withTx(action func(tx *sql.Tx) error, ambientTx *sql.Tx) er
var shouldCommitTx bool
var activeTx *sql.Tx
//create a new transaction only if there is no active one already passed in
if b.IsTxnl && ambientTx == nil {
if ambientTx == nil {

/*
if the passed in ambient transaction is not nil it means that some caller has created the transaction
and knows when should this transaction bee committed or rolledback.
and knows when should this transaction be committed or rolledback.
In these cases we only invoke the passed in action with the passed in transaction
and do not commit/rollback the transaction.action
If no ambient transaction is passed in then we create a new transaction and commit or rollback after
Expand All @@ -414,11 +397,7 @@ func (b *DefaultBus) withTx(action func(tx *sql.Tx) error, ambientTx *sql.Tx) er
}
actionErr := b.SafeWithRetries(retryAction, MaxRetryCount)

/*
if the bus is transactional and there is no ambient transaction then create a new one else use the ambient tranaction.
if the bus is not transactional a nil transaction reference will be passed
*/
if b.IsTxnl && shouldCommitTx {
if shouldCommitTx {
if actionErr != nil {
err := activeTx.Rollback()
if err != nil {
Expand Down Expand Up @@ -464,7 +443,12 @@ func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *Bu
rpcID: rpcID}

b.Serializer.Register(reply.Payload)
err := b.sendImpl(ctx, nil, service, b.rpcQueue.Name, "", "", request, rpc)

sendRPC := func(tx *sql.Tx) error {
return b.sendImpl(ctx, tx, service, b.rpcQueue.Name, "", "", request, rpc)
}

err := b.withTx(sendRPC, nil)
if err != nil {
b.Log().WithError(err).Error("could not send message")
return nil, err
Expand Down Expand Up @@ -624,22 +608,13 @@ func (b *DefaultBus) monitorAMQPErrors() {

func (b *DefaultBus) publish(tx *sql.Tx, exchange, routingKey string, msg *amqp.Publishing) error {
publish := func() error {
//send to the transactional outbox if the bus is transactional
//otherwise send directly to amqp
if b.IsTxnl && tx != nil {
b.Log().WithField("message_id", msg.MessageId).Debug("sending message to outbox")
saveErr := b.Outbox.Save(tx, exchange, routingKey, *msg)
if saveErr != nil {
b.Log().WithError(saveErr).Error("failed to save to transactional outbox")
}
return saveErr
}
//do not attempt to contact the borker if backpressure is being applied
if b.backpressure {
return errors.New("can't send message due to backpressure from amqp broker")

b.Log().WithField("message_id", msg.MessageId).Debug("sending message to outbox")
saveErr := b.Outbox.Save(tx, exchange, routingKey, *msg)
if saveErr != nil {
b.Log().WithError(saveErr).Error("failed to save to transactional outbox")
}
_, outgoingErr := b.Outgoing.Post(exchange, routingKey, *msg)
return outgoingErr
return saveErr
}
//currently only one thread can publish at a time
//TODO:add a publishing workers
Expand Down
4 changes: 2 additions & 2 deletions gbus/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func (out *AMQPOutbox) init(amqp *amqp.Channel, confirm, resendOnNack bool) erro
return nil
}

func (out *AMQPOutbox) shutdown() {
out.stop <- true
func (out *AMQPOutbox) Shutdown() {
close(out.stop)

}

Expand Down
72 changes: 0 additions & 72 deletions gbus/saga/inmemory_timeout.go

This file was deleted.

Loading

0 comments on commit 5b6de51

Please sign in to comment.