Permalink
Browse files

Initialize all queue fields when loading from persisted state

  • Loading branch information...
jeffjenkins committed Dec 5, 2015
1 parent e248540 commit fa8a40f0aaccd36b902bea7a43f8d49b19f8f231
Showing with 17 additions and 1 deletion.
  1. +17 −1 queue/queue.go
View
@@ -84,6 +84,11 @@ func NewFromPersistedState(state *gen.QueueState, msgStore *msgstore.MessageStor
ConnId: -1,
msgStore: msgStore,
deleteChan: deleteChan,
// Fields that aren't passed in
statProcOne: stats.MakeHistogram("queue-proc-one"),
queue: list.New(),
consumers: make([]*consumer.Consumer, 0, 1),
maybeReady: make(chan bool, 1),
}
}
@@ -172,6 +177,10 @@ func (q *Queue) LoadFromMsgStore(msgStore *msgstore.MessageStore) {
panic("Integrity error reading queue from disk! " + err.Error())
}
q.queue = queueList
select {
case q.maybeReady <- true:
default:
}
}
func (q *Queue) Close() {
@@ -256,7 +265,10 @@ func (q *Queue) Readd(queueName string, msg *amqp.QueueMessage) {
// so it means the message was not acked.
q.msgStore.IncrDeliveryCount(queueName, msg)
q.queue.PushFront(msg)
q.maybeReady <- true
select {
case q.maybeReady <- true:
default:
}
}
func (q *Queue) removeConsumer(consumerTag string) {
@@ -331,6 +343,10 @@ func (q *Queue) AddConsumer(c *consumer.Consumer, exclusive bool) (uint16, error
func (q *Queue) Start() {
go func() {
select {
case q.maybeReady <- true:
default:
}
for _ = range q.maybeReady {
if q.Closed {
fmt.Printf("Queue closed!\n")

0 comments on commit fa8a40f

Please sign in to comment.