Skip to content

Commit

Permalink
Add ability to swap stored messages to disk on memory pressure
Browse files Browse the repository at this point in the history
Queue can store into memmory only maxMessagesInRam messages. That param can be configured.
More messages will me swapped to disk.
  • Loading branch information
valinurovam committed Oct 5, 2018
1 parent d960fe3 commit a9f8e37
Show file tree
Hide file tree
Showing 15 changed files with 384 additions and 120 deletions.
7 changes: 6 additions & 1 deletion amqp/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ var msgID = uint64(time.Now().UnixNano())
// NewMessage returns new message instance
func NewMessage(method *BasicPublish) *Message {
return &Message{
ID: atomic.AddUint64(&msgID, 1),
Exchange: method.Exchange,
RoutingKey: method.RoutingKey,
Mandatory: method.Mandatory,
Expand All @@ -83,6 +82,12 @@ func (message *Message) IsPersistent() bool {
return deliveryMode != nil && *deliveryMode == 2
}

func (message *Message) GenerateSeq() {
if message.ID == 0 {
message.ID = atomic.AddUint64(&msgID, 1)
}
}

// Append appends new body-frame into message and increase bodySize
func (message *Message) Append(body *Frame) {
message.Body = append(message.Body, body)
Expand Down
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ type TCPConfig struct {

// Queue settings
type Queue struct {
ShardSize int `yaml:"shardSize"`
ShardSize int `yaml:"shardSize"`
MaxMessagesInRam uint64 `yaml:"maxMessagesInRam"`
}

// Db settings, such as path to load/save and engine
Expand Down
1 change: 1 addition & 0 deletions etc/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ connection:
frameMaxSize: 65536
queue:
shardSize: 8192
maxMessagesInRam: 131072
db:
defaultPath: db
engine: badger
Expand Down
7 changes: 6 additions & 1 deletion interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ type DbStorage interface {
Del(key string) (err error)
Get(key string) (value []byte, err error)
Iterate(fn func(key []byte, value []byte))
IterateByPrefix(prefix []byte, fn func(key []byte, value []byte))
IterateByPrefix(prefix []byte, limit uint64, fn func(key []byte, value []byte)) uint64
IterateByPrefixFrom(prefix []byte, from []byte, limit uint64, fn func(key []byte, value []byte)) uint64
DeleteByPrefix(prefix []byte)
KeysByPrefixCount(prefix []byte) uint64
ProcessBatch(batch []*Operation) (err error)
Close() error
}
Expand All @@ -48,4 +51,6 @@ type MsgStorage interface {
PurgeQueue(queue string)
Add(message *amqp.Message, queue string) error
Update(message *amqp.Message, queue string) error
IterateByQueueFromMsgID(queue string, msgId uint64, limit uint64, fn func(message *amqp.Message)) uint64
GetQueueLength(queue string) uint64
}
9 changes: 8 additions & 1 deletion metrics/registry.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package metrics

import "time"
import (
"sync"
"time"
)

var r *TrackRegistry

// TrackRegistry is a registry of track counters or other track metrics
type TrackRegistry struct {
cntLock sync.Mutex
Counters map[string]*TrackCounter
trackLength int
trackTick *time.Ticker
Expand Down Expand Up @@ -36,6 +40,9 @@ func Destroy() {
// AddCounter add counter into registry andd return it
// TODO check if already exists
func AddCounter(name string) *TrackCounter {
r.cntLock.Lock()
defer r.cntLock.Unlock()

c := NewTrackCounter(r.trackLength, r.isNil)
r.Counters[name] = c
return c
Expand Down
34 changes: 24 additions & 10 deletions msgstorage/msgstorage.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package msgstorage

import (
"bytes"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -190,10 +189,11 @@ func (storage *MsgStorage) Iterate(fn func(queue string, message *amqp.Message))
}

// Iterate with func fn over messages
func (storage *MsgStorage) IterateByQueue(queue string, fn func(message *amqp.Message)) {
func (storage *MsgStorage) IterateByQueue(queue string, limit uint64, fn func(message *amqp.Message)) {
prefix := "msg." + queue + "."
storage.db.IterateByPrefix(
[]byte(prefix),
limit,
func(key []byte, value []byte) {
message := &amqp.Message{}
message.Unmarshal(value, storage.protoVersion)
Expand All @@ -202,19 +202,33 @@ func (storage *MsgStorage) IterateByQueue(queue string, fn func(message *amqp.Me
)
}

// PurgeQueue delete messages
func (storage *MsgStorage) PurgeQueue(queue string) {
prefix := []byte("msg." + queue)
storage.db.Iterate(
// Iterate with func fn over messages
func (storage *MsgStorage) IterateByQueueFromMsgID(queue string, msgId uint64, limit uint64, fn func(message *amqp.Message)) uint64 {
prefix := "msg." + queue + "."
from := makeKey(msgId, queue)
return storage.db.IterateByPrefixFrom(
[]byte(prefix),
[]byte(from),
limit,
func(key []byte, value []byte) {
if !bytes.HasPrefix(key, prefix) {
return
}
storage.db.Del(string(key))
message := &amqp.Message{}
message.Unmarshal(value, storage.protoVersion)
fn(message)
},
)
}

func (storage *MsgStorage) GetQueueLength(queue string) uint64 {
prefix := "msg." + queue + "."
return storage.db.KeysByPrefixCount([]byte(prefix))
}

// PurgeQueue delete messages
func (storage *MsgStorage) PurgeQueue(queue string) {
prefix := []byte("msg." + queue + ".")
storage.db.DeleteByPrefix(prefix)
}

// Close properly "stop" message storage
func (storage *MsgStorage) Close() error {
storage.closeCh <- true
Expand Down
Loading

0 comments on commit a9f8e37

Please sign in to comment.