Skip to content

Commit

Permalink
cunsume the messages channel via ranging over the channel to prevent (#…
Browse files Browse the repository at this point in the history
…125)

empty delivreies
  • Loading branch information
Guy Baron committed Aug 18, 2019
1 parent d5d04b9 commit 2b91211
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 45 deletions.
16 changes: 4 additions & 12 deletions gbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,6 @@ func (b *DefaultBus) bindServiceQueue() error {
return nil
}

func (b *DefaultBus) createAMQPChannel(conn *amqp.Connection) (*amqp.Channel, error) {
channel, e := conn.Channel()
if e != nil {
return nil, e
}
return channel, nil
}

//Start implements GBus.Start()
func (b *DefaultBus) Start() error {

Expand All @@ -188,10 +180,10 @@ func (b *DefaultBus) Start() error {
return e
}

if b.ingressChannel, e = b.createAMQPChannel(b.ingressConn); e != nil {
if b.ingressChannel, e = b.ingressConn.Channel(); e != nil {
return e
}
if b.egressChannel, e = b.createAMQPChannel(b.egressConn); e != nil {
if b.egressChannel, e = b.egressConn.Channel(); e != nil {
return e
}

Expand All @@ -209,7 +201,7 @@ func (b *DefaultBus) Start() error {
TODO://the design is crap and needs to be refactored
*/
var amqpChan *amqp.Channel
if amqpChan, e = b.createAMQPChannel(b.egressConn); e != nil {
if amqpChan, e = b.egressConn.Channel(); e != nil {
b.Log().WithError(e).Error("failed to create amqp channel for transactional outbox")
return e
}
Expand Down Expand Up @@ -272,7 +264,7 @@ func (b *DefaultBus) createBusWorkers(workerNum uint) ([]*worker, error) {
workers := make([]*worker, 0)
for i := uint(0); i < workerNum; i++ {
//create a channel per worker as we can't share channels across go routines
amqpChan, createChanErr := b.createAMQPChannel(b.ingressConn)
amqpChan, createChanErr := b.ingressConn.Channel()
if createChanErr != nil {
return nil, createChanErr
}
Expand Down
45 changes: 12 additions & 33 deletions gbus/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (worker *worker) Start() error {
worker.rpcMessages = rpcmsgs

go worker.consumeMessages()

go worker.consumeRPC()
return nil
}

Expand All @@ -93,43 +93,22 @@ func (worker *worker) createMessagesChannel(q amqp.Queue, consumerTag string) (<

func (worker *worker) consumeMessages() {

//TODO:Handle panics due to tx errors so the consumption of messages will continue
for {

var isRPCreply bool
var delivery amqp.Delivery
var shouldProceed bool

select {

case <-worker.stop:
worker.log().Info("stopped consuming messages")
return
case msgDelivery, ok := <-worker.messages:
if ok {
shouldProceed = true
}
delivery = msgDelivery
isRPCreply = false
case rpcDelivery, ok := <-worker.rpcMessages:
if ok {
shouldProceed = true
}
delivery = rpcDelivery
isRPCreply = true
for msg := range worker.messages {
if msg.Body == nil || len(msg.Body) == 0 {
continue
}
worker.processMessage(msg, false)
}
}

/*
as the bus shuts down and amqp connection is killed the messages channel (b.msgs) gets closed
and delivery is a zero value so in order not to panic down the road we return if bus is shutdown
*/
if shouldProceed {
func (worker *worker) consumeRPC() {

worker.processMessage(delivery, isRPCreply)
for msg := range worker.rpcMessages {
if msg.Body == nil || len(msg.Body) == 0 {
continue
}

worker.processMessage(msg, true)
}

}

func (worker *worker) extractBusMessage(delivery amqp.Delivery) (*BusMessage, error) {
Expand Down

0 comments on commit 2b91211

Please sign in to comment.