Skip to content

Commit

Permalink
Merge pull request #1 from jrluis/add-peek-function
Browse files Browse the repository at this point in the history
Add peek function
  • Loading branch information
peterfraedrich committed Apr 19, 2022
2 parents f4c52f5 + 7726e76 commit 48ba4a1
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 1 deletion.
37 changes: 37 additions & 0 deletions consulmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,23 @@ func (mq *MQ) indexPop(queue string) (string, int, error) {
return id, len(idx), nil
}

func (mq *MQ) indexPeek(queue string) (string, int, error) {
kv, err := mq.lock(queue)
if err != nil {
return "", -1, err
}
defer mq.unlock(kv)
idx, _, err := mq.loadIndex(kv)
if err != nil {
return "", len(idx), err
}
var id string
if len(idx) > 0 {
id, idx = idx[0], idx[1:]
}
return id, len(idx), nil
}

func (mq *MQ) indexPopLast(queue string) (string, int, error) {
kv, err := mq.lock(queue)
if err != nil {
Expand Down Expand Up @@ -455,6 +472,26 @@ func (mq *MQ) Pop() ([]byte, *QueueObject, error) {
return qo.Body, &qo, nil
}

func (mq *MQ) Peek() ([]byte, *QueueObject, error) {
id, _, err := mq.indexPeek("q")
if err != nil {
return []byte{}, &QueueObject{}, err
}
obj, _, err := mq.kv.Get(mq.q.QueuePath+id, nil)
if err != nil {
return []byte{}, &QueueObject{}, err
}
if obj == nil {
return []byte{}, &QueueObject{}, fmt.Errorf("object at head is nil")
}
var qo QueueObject
err = json.Unmarshal(obj.Value, &qo)
if err != nil {
return []byte{}, &QueueObject{}, err
}
return qo.Body, &qo, nil
}

// PopLast removes the last (newest) object in the queue. PopLast returns the message body as bytes and a QueueObject with the object ID,
// the object's CTime, the TTL deadline, and the message body as bytes.
func (mq *MQ) PopLast() ([]byte, *QueueObject, error) {
Expand Down
28 changes: 27 additions & 1 deletion consulmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"gopkg.in/yaml.v2"
)

var TestBytes = []byte("This is a test, this is only a test")
var config Config
var mq *MQ

Expand Down Expand Up @@ -95,6 +94,33 @@ func TestPushPop(t *testing.T) {
}
}

func TestPushPeekQueue(t *testing.T) {
testData := []byte(uuid.New().String())
err := mq.EmptyQueue()
if err != nil {
t.Error(err)
}
_, err = mq.Push(testData)
if err != nil {
t.Error(err)
}

for i := 0; i < 10; i++ {
b, _, err := mq.Peek()
if err != nil {
t.Error(err)
}
if string(testData[:]) != string(b[:]) {
t.Errorf("Test strings do not match! Had %s got %s", string(testData[:]), string(b[:]))
}
}

err = mq.EmptyQueue()
if err != nil {
t.Error(err)
}
}

func TestPushFirstPopLast(t *testing.T) {
testData := []byte(uuid.New().String())
err := mq.EmptyQueue()
Expand Down

0 comments on commit 48ba4a1

Please sign in to comment.