Skip to content

Commit

Permalink
Merge dadf555 into 5825f00
Browse files Browse the repository at this point in the history
  • Loading branch information
valinurovam committed Oct 23, 2018
2 parents 5825f00 + dadf555 commit 1ef0c37
Show file tree
Hide file tree
Showing 13 changed files with 404 additions and 120 deletions.
32 changes: 24 additions & 8 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ type Consumer struct {
noAck bool
channel interfaces.Channel
queue *queue.Queue
statusLock sync.RWMutex
status int
qos []*qos.AmqpQos
consume chan bool
stopLock sync.RWMutex
}

// NewConsumer returns new instance of Consumer
Expand Down Expand Up @@ -73,8 +73,8 @@ func (consumer *Consumer) startConsume() {

func (consumer *Consumer) retrieveAndSendMessage() {
var message *amqp.Message
consumer.stopLock.RLock()
defer consumer.stopLock.RUnlock()
consumer.statusLock.RLock()
defer consumer.statusLock.RUnlock()
if consumer.status == stopped {
return
}
Expand Down Expand Up @@ -117,39 +117,55 @@ func (consumer *Consumer) retrieveAndSendMessage() {
consumer.queue.GetMetrics().Deliver.Counter.Inc(1)
consumer.queue.GetMetrics().ServerDeliver.Counter.Inc(1)

consumer.Consume()
consumer.consumeMsg()

return
}

// Pause pause consumer, used by channel.flow change
func (consumer *Consumer) Pause() {
consumer.statusLock.Lock()
defer consumer.statusLock.Unlock()
consumer.status = paused
}

// UnPause unpause consumer, used by channel.flow change
func (consumer *Consumer) UnPause() {
consumer.statusLock.Lock()
defer consumer.statusLock.Unlock()
consumer.status = started
}

// Consume send signal into consumer channel, than consumer can try to pop message from queue
func (consumer *Consumer) Consume() {
func (consumer *Consumer) Consume() bool {
consumer.statusLock.RLock()
defer consumer.statusLock.RUnlock()

return consumer.consumeMsg()
}

func (consumer *Consumer) consumeMsg() bool {
if consumer.status == stopped || consumer.status == paused {
return
return false
}

select {
case consumer.consume <- true:
return true
default:
return false
}
}

// Stop stops consumer and remove it from queue consumers list
func (consumer *Consumer) Stop() {
consumer.stopLock.Lock()
defer consumer.stopLock.Unlock()
consumer.statusLock.Lock()
if consumer.status == stopped {
consumer.statusLock.Unlock()
return
}
consumer.status = stopped
consumer.statusLock.Unlock()
consumer.queue.RemoveConsumer(consumer.ConsumerTag)
close(consumer.consume)
}
Expand Down
2 changes: 1 addition & 1 deletion interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Channel interface {

// Consumer represents base consumer public interface
type Consumer interface {
Consume()
Consume() bool
Tag() string
Cancel()
}
Expand Down
1 change: 1 addition & 0 deletions qos/qos.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (qos *AmqpQos) Release() {
qos.currentSize = 0
}

// Copy safe copy current qos instance to new one
func (qos *AmqpQos) Copy() *AmqpQos {
qos.Lock()
defer qos.Unlock()
Expand Down
16 changes: 16 additions & 0 deletions qos/qos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,19 @@ func TestAmqpQos_Release(t *testing.T) {
t.Fatalf("Release: Expected currentSize %d, actual %d", 0, q.currentCount)
}
}

func TestAmqpQos_Copy(t *testing.T) {
q := NewAmqpQos(5, 10)
q.Inc(1, 6)

q2 := q.Copy()
q.Release()

if q2.currentCount != 1 {
t.Fatalf("Expected currentCount %d, actual %d", 0, q.currentCount)
}

if q2.currentSize != 6 {
t.Fatalf("Expected currentSize %d, actual %d", 0, q.currentCount)
}
}
17 changes: 12 additions & 5 deletions queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Queue struct {
lastMemMsgId uint64
swappedToDisk bool
maybeLoadFromStorageCh chan bool
wg *sync.WaitGroup
}

// NewQueue returns new instance of Queue
Expand All @@ -85,6 +86,7 @@ func NewQueue(name string, connID uint64, exclusive bool, autoDelete bool, durab
currentConsumer: 0,
autoDeleteQueue: autoDeleteQueue,
swappedToDisk: false,
wg: &sync.WaitGroup{},
metrics: &MetricsState{
Ready: metrics.NewTrackCounter(0, true),
Unacked: metrics.NewTrackCounter(0, true),
Expand All @@ -110,7 +112,9 @@ func (queue *Queue) Start() {
defer queue.actLock.Unlock()

queue.active = true
queue.wg.Add(1)
go func() {
defer queue.wg.Done()
for range queue.call {
func() {
queue.cmrLock.RLock()
Expand All @@ -122,13 +126,17 @@ func (queue *Queue) Start() {
}
queue.currentConsumer = (queue.currentConsumer + 1) % cmrCount
cmr := queue.consumers[queue.currentConsumer]
cmr.Consume()
if cmr.Consume() {
return
}
}
}()
}
}()

queue.wg.Add(1)
go func() {
defer queue.wg.Done()
for range queue.maybeLoadFromStorageCh {
queue.mayBeLoadFromStorage()
}
Expand All @@ -142,6 +150,9 @@ func (queue *Queue) Stop() error {
defer queue.actLock.Unlock()

queue.active = false
close(queue.maybeLoadFromStorageCh)
close(queue.call)
queue.wg.Wait()
return nil
}

Expand Down Expand Up @@ -468,10 +479,6 @@ func (queue *Queue) Delete(ifUnused bool, ifEmpty bool) (uint64, error) {
queue.metrics.ServerTotal.Counter.Dec(int64(length))
queue.metrics.ServerReady.Counter.Dec(int64(length))

// TODO Proper close channels
//close(queue.maybeLoadFromStorageCh)
//close(queue.call)

return length, nil
}

Expand Down
9 changes: 5 additions & 4 deletions queue/queue_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (

// ConsumerMock implements AMQP consumer mock
type ConsumerMock struct {
tag string
tag string
cancel bool
}

// Consume send signal into consumer channel, than consumer can try to pop message from queue
func (consumer *ConsumerMock) Consume() {

func (consumer *ConsumerMock) Consume() bool {
return true
}

// Stop stops consumer and remove it from queue consumers list
Expand All @@ -21,7 +22,7 @@ func (consumer *ConsumerMock) Stop() {

// Cancel stops consumer and send basic.cancel method to the client
func (consumer *ConsumerMock) Cancel() {

consumer.cancel = true
}

// Tag returns consumer tag
Expand Down
2 changes: 1 addition & 1 deletion queue/queue_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (storage *MsgStorageMock) PurgeQueue(queue string) {
}

func (storage *MsgStorageMock) GetQueueLength(queue string) uint64 {
return 0
return uint64(len(storage.messages))
}

func (storage *MsgStorageMock) IterateByQueueFromMsgID(queue string, msgId uint64, limit uint64, fn func(message *amqp.Message)) uint64 {
Expand Down

0 comments on commit 1ef0c37

Please sign in to comment.