Skip to content
Permalink
Browse files

Try to remove the pudge dep

  • Loading branch information...
tsileo committed Nov 5, 2019
1 parent 148195a commit 5902b8576076107522acfc506fe82137213d059f
Showing with 56 additions and 40 deletions.
  1. +11 −8 pkg/backend/s3/index/index.go
  2. +35 −29 pkg/queue/queue.go
  3. +6 −3 pkg/queue/queue_test.go
  4. +4 −0 pkg/rangedb/rangedb.go
@@ -4,19 +4,19 @@ import (
"encoding/hex"
"sync"

"github.com/recoilme/pudge"
"a4.io/blobstash/pkg/rangedb"
)

// Queue is a FIFO queue,
type Index struct {
db *pudge.Db
db *rangedb.RangeDB
path string
sync.Mutex
}

// New creates a new database.
func New(path string) (*Index, error) {
kvdb, err := pudge.Open(path, nil)
kvdb, err := rangedb.New(path)
if err != nil {
return nil, err
}
@@ -29,7 +29,7 @@ func New(path string) (*Index, error) {

// Remove the underlying db file.
func (i *Index) Remove() error {
return i.db.DeleteFile()
return i.db.Destroy()
}

// Close the underlying db file.
@@ -58,11 +58,14 @@ func (i *Index) Exists(hash string) (bool, error) {
if err != nil {
return false, err
}
exists, err := i.db.Has(bhash)
exists, err := i.db.Get(bhash)
if err != nil {
return false, err
}
return exists, nil
if exists != nil {
return true, nil
}
return false, nil
}

func (i *Index) Get(hash string) (string, error) {
@@ -72,8 +75,8 @@ func (i *Index) Get(hash string) (string, error) {
if err != nil {
return "", err
}
var v []byte
if err := i.db.Get(bhash, &v); err != nil {
v, err := i.db.Get(bhash)
if err != nil {
return "", err
}
if v != nil {
@@ -8,23 +8,23 @@ package queue // import "a4.io/blobstash/pkg/queue"
import (
"encoding/json"
"fmt"
"io"
"time"

"github.com/recoilme/pudge"

"a4.io/blobstash/pkg/blob"
"a4.io/blobstash/pkg/docstore/id"
"a4.io/blobstash/pkg/rangedb"
)

// Queue is a FIFO queue,
type Queue struct {
db *pudge.Db
db *rangedb.RangeDB
path string
}

// New creates a new database.
func New(path string) (*Queue, error) {
db, err := pudge.Open(path, nil)
db, err := rangedb.New(path)
if err != nil {
return nil, err
}
@@ -42,29 +42,35 @@ func (q *Queue) Close() error {

// Remove the underlying db file.
func (q *Queue) Remove() error {
return q.db.DeleteFile()
return q.db.Destroy()
}

// Size returns the number of items currently enqueued
func (q *Queue) Size() (int, error) {
return q.db.Count()
cnt := 0
c := q.db.PrefixRange([]byte(""), false)
defer c.Close()

// Iterate the range
c.Next()
var err error
for ; err == nil; _, _, err = c.Next() {
cnt++
}
return cnt, nil
}

func (q *Queue) Blobs() ([]*blob.Blob, error) {
keys, err := q.db.Keys(nil, 0, 0, true)
if err != nil {
return nil, fmt.Errorf("failed to list keys: %v", err)
}
out := []*blob.Blob{}

for _, k := range keys {
b := &blob.Blob{}
d := []byte{}
if err := q.db.Get(k, &d); err != nil {
return nil, err
c := q.db.PrefixRange([]byte(""), false)
defer c.Close()

}
if err := json.Unmarshal(d, b); err != nil {
// Iterate the range
_, v, err := c.Next()
for ; err == nil; _, v, err = c.Next() {
b := &blob.Blob{}
if err := json.Unmarshal(v, b); err != nil {
return nil, err
}

@@ -75,7 +81,7 @@ func (q *Queue) Blobs() ([]*blob.Blob, error) {

// Enqueue the given `item`. Must be JSON serializable.
func (q *Queue) Enqueue(item interface{}) (*id.ID, error) {
id, err := id.New(time.Now().Unix())
id, err := id.New(time.Now().UnixNano())
if err != nil {
return nil, err
}
@@ -98,19 +104,19 @@ func (q *Queue) InstantDequeue(id *id.ID) error {
// Dequeue the older item, unserialize the given item.
// Returns false if the queue is empty.
func (q *Queue) Dequeue(item interface{}) (bool, func(bool), error) {
keys, err := q.db.Keys(nil, 1, 0, true)
if err != nil {
return false, nil, err
c := q.db.PrefixRange([]byte(""), false)
defer c.Close()

// Iterate the range
k, js, err := c.Next()
if err != nil && err != io.EOF {
return false, nil, fmt.Errorf("next failed: %v", err)
}
if len(keys) == 0 {

if js == nil || len(js) == 0 {
return false, nil, nil
}
k := keys[0]
v := []byte{}
if err := q.db.Get(k, &v); err != nil {
fmt.Printf("failed to get %v\n", err)
return false, nil, err
}

deqFunc := func(remove bool) {
if !remove {
return
@@ -121,7 +127,7 @@ func (q *Queue) Dequeue(item interface{}) (bool, func(bool), error) {
}
}

return true, deqFunc, json.Unmarshal(v, item)
return true, deqFunc, json.Unmarshal(js, item)
}

// TODO(tsileo): func (q *Queue) Items() ([]*blob.Blob, error)
@@ -2,7 +2,6 @@ package queue

import (
"testing"
"time"
)

func check(e error) {
@@ -25,11 +24,15 @@ func TestQueue(t *testing.T) {
item2 := &Item{"ok2"}
_, err = q.Enqueue(item1)
check(err)
// FIXME test InstantDequeue
time.Sleep(time.Second)
_, err = q.Enqueue(item2)
check(err)

cnt, err := q.Size()
check(err)
if cnt != 2 {
t.Errorf("2 items should queued, got %d", cnt)
}

deq := &Item{}
ok, deqFunc, err := q.Dequeue(deq)
if !ok {
@@ -44,6 +44,10 @@ func (db *RangeDB) Set(k, v []byte) error {
return db.db.Put(k, v, nil)
}

func (db *RangeDB) Delete(k []byte) error {
return db.db.Delete(k, nil)
}

func (db *RangeDB) Get(k []byte) ([]byte, error) {
v, err := db.db.Get(k, nil)
if err != nil {

0 comments on commit 5902b85

Please sign in to comment.
You can’t perform that action at this time.