Permalink
Browse files

Reference all messages through the message store

  • Loading branch information...
jeffjenkins committed Nov 9, 2015
1 parent ee9aca2 commit 5da45516f8b82fecd798b7b24a28f122e66fc814
Showing with 281 additions and 118 deletions.
  1. +10 −9 amqp/messages.proto
  2. +11 −3 server/basicMethods.go
  3. +83 −34 server/channel.go
  4. +32 −6 server/consumer.go
  5. +31 −15 server/exchange.go
  6. +86 −31 server/persistMessages.go
  7. +20 −17 server/queue.go
  8. +5 −3 server/server.go
  9. +3 −0 static/admin.html
View
@@ -10,7 +10,7 @@ message WireFrame {
option (gogoproto.goproto_getters) = false;
optional uint32 frameType = 1 [(gogoproto.nullable) = false, (gogoproto.casttype) = "uint8"];
optional uint32 channel = 2 [(gogoproto.nullable) = false, (gogoproto.casttype) = "uint16"];
optional bytes payload = 3 ;
optional bytes payload = 3 ;
}
message IndexMessage {
@@ -36,6 +36,7 @@ message Message {
message QueueMessage {
optional int64 id = 1 [(gogoproto.nullable) = false];
optional int32 deliveryCount = 2 [(gogoproto.nullable) = false];
optional bool durable = 3 [(gogoproto.nullable) = false];
}
message ContentHeaderFrame {
@@ -47,19 +48,19 @@ message ContentHeaderFrame {
}
message TxMessage {
optional Message msg = 1;
optional string queue_name = 2 [(gogoproto.nullable) = false];
optional Message msg = 1;
optional string queue_name = 2 [(gogoproto.nullable) = false];
}
message TxAck {
optional uint64 tag = 1 [(gogoproto.nullable) = false];
optional bool multiple = 2 [(gogoproto.nullable) = false];
optional bool nack = 3 [(gogoproto.nullable) = false];
optional bool requeue_nack = 4 [(gogoproto.nullable) = false];
optional bool multiple = 2 [(gogoproto.nullable) = false];
optional bool nack = 3 [(gogoproto.nullable) = false];
optional bool requeue_nack = 4 [(gogoproto.nullable) = false];
}
message UnackedMessage {
optional string consumer_tag = 1 [(gogoproto.nullable) = false];
optional Message msg = 2;
optional string queue_name = 3 [(gogoproto.nullable) = false];
optional string consumer_tag = 1 [(gogoproto.nullable) = false];
optional QueueMessage msg = 2;
optional string queue_name = 3 [(gogoproto.nullable) = false];
}
View
@@ -137,14 +137,22 @@ func (channel *Channel) basicGet(method *amqp.BasicGet) error {
var classId, methodId = method.MethodIdentifier()
channel.channelErrorWithMethod(404, "Queue not found", classId, methodId)
}
var msg = queue.getOneForced()
if msg == nil {
var qm = queue.getOneForced()
if qm == nil {
channel.sendMethod(&amqp.BasicGetEmpty{})
return nil
}
msg, err := channel.server.msgStore.getAndDecrRef(qm.Id, queue.name)
if err != nil {
// TODO: return 500 error
channel.sendMethod(&amqp.BasicGetEmpty{})
return nil
}
channel.sendContent(&amqp.BasicGetOk{
DeliveryTag: channel.nextDeliveryTag(),
Redelivered: msg.Redelivered > 0,
Redelivered: qm.DeliveryCount > 0,
Exchange: msg.Exchange,
RoutingKey: msg.Key,
MessageCount: 1,
View
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"github.com/jeffjenkins/mq/amqp"
"math"
"sync"
)
@@ -55,15 +56,29 @@ func (channel *Channel) commitTx() {
channel.txLock.Lock()
defer channel.txLock.Unlock()
// messages
// todo: persist all messages to all queues once persistence exists
for _, msg := range channel.txMessages {
queue, found := channel.server.queues[msg.QueueName]
queueMessagesByQueue, err := channel.server.msgStore.addTxMessages(channel.txMessages)
if err != nil {
channel.channelErrorWithMethod(500, err.Error(), 60, 40)
return
}
for queueName, qms := range queueMessagesByQueue {
queue, found := channel.server.queues[queueName]
// the
if !found {
continue
}
queue.add(msg.Msg.Id)
for _, qm := range qms {
if !queue.add(qm) {
// If we couldn't add it means the queue is closed and we should
// remove the ref from the message store. The queue being closed means
// it is going away, so worst case if the server dies we have to process
// and discard the message on boot.
channel.server.msgStore.removeRef(qm.Id, queueName)
}
}
}
// Acks
// todo: remove acked messages from persistent storage in a single
// transaction
@@ -106,21 +121,26 @@ func (channel *Channel) recover(requeue bool) {
defer channel.ackLock.Unlock()
// Requeue. Make sure we update stats
for _, unacked := range channel.awaitingAcks {
unacked.Msg.Redelivered += 1
// re-add to queue
queue, qFound := channel.server.queues[unacked.QueueName]
if qFound {
queue.readd(unacked.Msg)
queue.readd(unacked.QueueName, unacked.Msg)
}
// else: The queue gone. The reference would have been removed
// then so we don't remove it now in an else clause
consumer, cFound := channel.consumers[unacked.ConsumerTag]
msg, found := channel.server.msgStore.Get(unacked.Msg.Id)
if !found {
panic("Integrity error, message not found in message store")
}
// decr channel active
var size = messageSize(unacked.Msg)
var size = messageSize(msg)
channel.decrActive(1, size)
// decr consumer active
if cFound {
consumer.decrActive(1, size)
}
}
// Clear awaiting acks
channel.awaitingAcks = make(map[uint64]amqp.UnackedMessage)
@@ -131,11 +151,18 @@ func (channel *Channel) recover(requeue bool) {
go func() {
for tag, unacked := range channel.awaitingAcks {
consumer, cFound := channel.consumers[unacked.ConsumerTag]
var size = messageSize(unacked.Msg)
unacked.Msg.Redelivered += 1
msg, found := channel.server.msgStore.Get(unacked.Msg.Id)
if !found {
panic("Integrity error, message not found in message store")
}
var size = messageSize(msg)
if cFound {
consumer.redeliver(tag, unacked.Msg)
// Consumer exists, try to delivery again
channel.server.msgStore.incrDeliveryCount(unacked.QueueName, unacked.Msg)
consumer.redeliver(tag, msg)
} else {
// no consumer, drop message
channel.server.msgStore.removeRef(unacked.Msg.Id, unacked.QueueName)
consumer.decrActive(1, size)
}
@@ -182,8 +209,13 @@ func (channel *Channel) ackBelow(tag uint64, commitTx bool) bool {
var count = 0
for k, unacked := range channel.awaitingAcks {
if k <= tag || tag == 0 {
msg, err := channel.server.msgStore.getAndDecrRef(unacked.Msg.Id, unacked.QueueName)
if err != nil {
channel.channelErrorWithMethod(500, err.Error(), 0, 0)
return false
}
delete(channel.awaitingAcks, k)
var size = messageSize(unacked.Msg)
var size = messageSize(msg)
channel.decrActive(1, size)
consumer, cFound := channel.consumers[unacked.ConsumerTag]
if cFound {
@@ -216,8 +248,13 @@ func (channel *Channel) ackOne(tag uint64, commitTx bool) bool {
})
return true
}
msg, err := channel.server.msgStore.getAndDecrRef(unacked.Msg.Id, unacked.QueueName)
if err != nil {
channel.channelErrorWithMethod(500, err.Error(), 0, 0)
return false
}
delete(channel.awaitingAcks, tag)
var size = messageSize(unacked.Msg)
var size = messageSize(msg)
channel.decrActive(1, size)
consumer, cFound := channel.consumers[unacked.ConsumerTag]
if cFound {
@@ -246,12 +283,17 @@ func (channel *Channel) nackBelow(tag uint64, requeue bool, commitTx bool) bool
for k, unacked := range channel.awaitingAcks {
fmt.Printf("%d(%d), ", k, tag)
if k <= tag || tag == 0 {
msg, err := channel.server.msgStore.getAndDecrRef(unacked.Msg.Id, unacked.QueueName)
if err != nil {
channel.channelErrorWithMethod(500, err.Error(), 0, 0)
return false
}
delete(channel.awaitingAcks, k)
consumer, cFound := channel.consumers[unacked.ConsumerTag]
if requeue && cFound {
consumer.queue.readd(unacked.Msg)
consumer.queue.readd(unacked.QueueName, unacked.Msg)
}
var size = messageSize(unacked.Msg)
var size = messageSize(msg)
channel.decrActive(1, size)
if cFound {
consumer.decrActive(1, size)
@@ -284,11 +326,16 @@ func (channel *Channel) nackOne(tag uint64, requeue bool, commitTx bool) bool {
})
return true
}
msg, err := channel.server.msgStore.getAndDecrRef(unacked.Msg.Id, unacked.QueueName)
if err != nil {
channel.channelErrorWithMethod(500, err.Error(), 0, 0)
return false
}
consumer, cFound := channel.consumers[unacked.ConsumerTag]
if requeue && cFound {
consumer.queue.readd(unacked.Msg)
consumer.queue.readd(unacked.QueueName, unacked.Msg)
}
var size = messageSize(unacked.Msg)
var size = messageSize(msg)
channel.decrActive(1, size)
if cFound {
consumer.decrActive(1, size)
@@ -298,7 +345,7 @@ func (channel *Channel) nackOne(tag uint64, requeue bool, commitTx bool) bool {
return true
}
func (channel *Channel) addUnackedMessage(consumer *Consumer, msg *amqp.Message) uint64 {
func (channel *Channel) addUnackedMessage(consumer *Consumer, msg *amqp.QueueMessage) uint64 {
var tag = channel.nextDeliveryTag()
var unacked = amqp.UnackedMessage{
ConsumerTag: consumer.consumerTag,
@@ -460,22 +507,24 @@ func (channel *Channel) shutdown() {
consumer.stop()
}
// Any unacked messages should be re-added
for _, unacked := range channel.awaitingAcks {
// TODO(MUST): If we want at-most-once delivery we can't re-add these
// messages. Need to figure out if the spec specifies, and after that
// provide a way to have both options. Maybe a message header?
consumer, cFound := channel.consumers[unacked.ConsumerTag]
if cFound {
consumer.queue.readd(unacked.Msg)
}
// this probably isn't needed, but for debugging purposes it's nice to
// ensure that all the active counts/sizes get back to 0
var size = messageSize(unacked.Msg)
channel.decrActive(1, size)
if cFound {
consumer.decrActive(1, size)
}
}
// for tag, unacked := range channel.awaitingAcks {
// TODO(MUST): If we want at-most-once delivery we can't re-add these
// messages. Need to figure out if the spec specifies, and after that
// provide a way to have both options. Maybe a message header?
// TODO(MUST): Is it safe to treat these as nacks?
channel.nackBelow(math.MaxUint64, true, false)
// consumer, cFound := channel.consumers[unacked.ConsumerTag]
// if cFound {
// consumer.queue.readd(unacked.Msg)
// }
// // this probably isn't needed, but for debugging purposes it's nice to
// // ensure that all the active counts/sizes get back to 0
// var size = messageSize(unacked.Msg)
// channel.decrActive(1, size)
// if cFound {
// consumer.decrActive(1, size)
// }
// }
}
func (channel *Channel) removeConsumer(consumerTag string) error {
View
@@ -103,42 +103,68 @@ func (consumer *Consumer) consume(id uint16) {
}
func (consumer *Consumer) consumeOne() {
var err error
// Check local limit
consumer.consumeLock.Lock()
defer consumer.consumeLock.Unlock()
if !consumer.consumerReady() {
return
}
// Try to get message/check channel limit
var msg = consumer.queue.getOne(consumer.channel, consumer)
if msg == nil {
var qm = consumer.queue.getOne(consumer.channel, consumer)
if qm == nil {
return
}
var tag uint64 = 0
var msg *amqp.Message
if !consumer.noAck {
tag = consumer.channel.addUnackedMessage(consumer, msg)
tag = consumer.channel.addUnackedMessage(consumer, qm)
// We get the message out without decrementing the ref because we're
// expecting an ack. The ack code will decrement.
msg, found := consumer.channel.server.msgStore.Get(qm.Id)
if !found {
panic("Integrity error, message id not found")
}
consumer.incrActive(1, messageSize(msg))
} else {
msg, err = consumer.channel.server.msgStore.getAndDecrRef(qm.Id, consumer.queue.name)
if err != nil {
panic("Error getting queue message")
}
}
consumer.channel.sendContent(&amqp.BasicDeliver{
ConsumerTag: consumer.consumerTag,
DeliveryTag: tag,
Redelivered: msg.Redelivered > 0,
Redelivered: qm.DeliveryCount > 0,
Exchange: msg.Exchange,
RoutingKey: msg.Key,
}, msg)
consumer.statCount += 1
}
func (consumer *Consumer) consumeImmediate(msg *amqp.Message) bool {
func (consumer *Consumer) consumeImmediate(qm *amqp.QueueMessage) bool {
var err error
consumer.consumeLock.Lock()
defer consumer.consumeLock.Unlock()
if !consumer.consumerReady() {
return false
}
var tag uint64 = 0
var msg *amqp.Message
if !consumer.noAck {
tag = consumer.channel.addUnackedMessage(consumer, msg)
tag = consumer.channel.addUnackedMessage(consumer, qm)
// We get the message out without decrementing the ref because we're
// expecting an ack. The ack code will decrement.
msg, found := consumer.channel.server.msgStore.Get(qm.Id)
if !found {
panic("Integrity error, message id not found")
}
consumer.incrActive(1, messageSize(msg))
} else {
msg, err = consumer.channel.server.msgStore.getAndDecrRef(qm.Id, consumer.queue.name)
if err != nil {
panic("Error getting queue message")
}
}
consumer.channel.sendContent(&amqp.BasicDeliver{
ConsumerTag: consumer.consumerTag,
Oops, something went wrong.

0 comments on commit 5da4551

Please sign in to comment.