Skip to content

Commit

Permalink
use goleveldb (github.com/syndtr/goleveldb/leveldb)
Browse files Browse the repository at this point in the history
Introduce conditional compilation by build tags
  • Loading branch information
masahide committed Aug 23, 2014
1 parent 30745ad commit 8d23be9
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 5 deletions.
10 changes: 5 additions & 5 deletions Makefile
Expand Up @@ -7,8 +7,8 @@ build:
mkdir build

build/queued: build $(wildcard queued.go queued/*.go)
go get -d
go build -o build/queued
go get -d -tags=${BUILD_TAGS}
go build -o build/queued -tags=${BUILD_TAGS}

clean:
rm -rf build
Expand All @@ -21,7 +21,7 @@ uninstall:
rm ${BINDIR}/queued

test:
go get -d
cd queued; go test
go get -d -tags=${BUILD_TAGS}
cd queued; go test -tags=${BUILD_TAGS}

.PHONY: install uninstall clean all test
.PHONY: install uninstall clean all test
173 changes: 173 additions & 0 deletions queued/goleveldb_store.go
@@ -0,0 +1,173 @@
// +build use_goleveldb

package queued

import (
"bytes"
"encoding/gob"
"fmt"
"os"
"strconv"
"sync"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
leveldb_iterator "github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)

// Iterator

type LevelIterator struct {
leveldb_iterator.Iterator
}

func (it *LevelIterator) NextRecord() (*Record, bool) {
if !it.Valid() {
return nil, false
}

id, err := strconv.Atoi(string(it.Key()))
if err != nil {
panic(fmt.Sprintf("queued.LevelIterator: Error loading db: %v", err))
}

value := it.Value()
if value == nil {
return nil, false
}

var record Record
buf := bytes.NewBuffer(value)
dec := gob.NewDecoder(buf)
err = dec.Decode(&record)
if err != nil {
panic(fmt.Sprintf("queued.LevelIterator: Error decoding value: %v", err))
}

record.Id = id

it.Next()
return &record, true
}

// Store

type LevelStore struct {
path string
sync bool
db *leveldb.DB
id int
mutex sync.Mutex
}

func NewLevelStore(path string, sync bool) *LevelStore {
opts := &opt.Options{
Filter: filter.NewBloomFilter(10),
ErrorIfMissing: false,
}
db, err := leveldb.OpenFile(path, opts)
if err != nil {
panic(fmt.Sprintf("queued.LevelStore: Unable to open db: %v", err))
}

id := 0

iter := db.NewIterator(nil, nil)
iter.Last()
if iter.Valid() {
id, err = strconv.Atoi(string(iter.Key()))
if err != nil {
panic(fmt.Sprintf("queued.LevelStore: Error loading db: %v", err))
}
}

return &LevelStore{
id: id,
path: path,
sync: sync,
db: db,
}
}

func (s *LevelStore) Get(id int) (*Record, error) {
value, err := s.db.Get(key(id), nil)
if err == util.ErrNotFound {
return nil, nil
}
if err != nil {
return nil, err
}
if value == nil {
return nil, nil
}

var record Record
buf := bytes.NewBuffer(value)
dec := gob.NewDecoder(buf)
err = dec.Decode(&record)
if err != nil {
panic(fmt.Sprintf("queued.LevelStore: Error decoding value: %v", err))
}

record.Id = id

return &record, nil
}

func (s *LevelStore) Put(record *Record) error {
s.mutex.Lock()
defer s.mutex.Unlock()

id := s.id + 1

var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(record)
if err != nil {
panic(fmt.Sprintf("queued.LevelStore: Error encoding record: %v", err))
}

err = s.db.Put(key(id), buf.Bytes(), &opt.WriteOptions{Sync: s.sync})
if err != nil {
return err
}

record.Id = id
s.id = id

return nil
}

func (s *LevelStore) Remove(id int) error {
s.mutex.Lock()
defer s.mutex.Unlock()

return s.db.Delete(key(id), &opt.WriteOptions{Sync: s.sync})
}

func (s *LevelStore) Close() {
s.db.Close()
}

func (s *LevelStore) Drop() {
s.Close()

err := os.RemoveAll(s.path)
if err != nil {
panic(fmt.Sprintf("queued.LevelStore: Error dropping db: %v", err))
}
}

func (s *LevelStore) Iterator() Iterator {
it := s.db.NewIterator(nil, nil)
it.First()
return &LevelIterator{it}
}

// Helpers

func key(id int) []byte {
return []byte(fmt.Sprintf("%d", id))
}
74 changes: 74 additions & 0 deletions queued/goleveldb_store_test.go
@@ -0,0 +1,74 @@
// +build use_goleveldb

package queued

import (
"testing"

"github.com/bmizerany/assert"
)

func TestLevelStore(t *testing.T) {
store := NewLevelStore("./test1.db", true)
defer store.Drop()

assert.Equal(t, store.id, 0)

record := NewRecord([]byte("foo"), "testqueue")

err := store.Put(record)
assert.Equal(t, err, nil)
assert.Equal(t, record.Id, 1)

record, err = store.Get(1)
assert.Equal(t, err, nil)
assert.Equal(t, record.Id, 1)
assert.Equal(t, record.Value, []byte("foo"))
assert.Equal(t, record.Queue, "testqueue")

err = store.Remove(1)
assert.Equal(t, err, nil)

record, err = store.Get(1)
assert.Equal(t, err, nil)
assert.T(t, record == nil)
}

func TestLevelStoreLoad(t *testing.T) {
temp := NewLevelStore("./test2.db", true)
temp.Put(NewRecord([]byte("foo"), "testqueue"))
temp.Put(NewRecord([]byte("bar"), "testqueue"))
temp.Close()

store := NewLevelStore("./test2.db", true)
defer store.Drop()

assert.Equal(t, store.id, 2)
}

func TestLevelStoreIterator(t *testing.T) {
temp := NewLevelStore("./test3.db", true)
temp.Put(NewRecord([]byte("foo"), "testqueue"))
temp.Put(NewRecord([]byte("bar"), "testqueue"))
temp.Close()

store := NewLevelStore("./test3.db", true)
defer store.Drop()

it := store.Iterator()

one, ok := it.NextRecord()
assert.Equal(t, ok, true)
assert.Equal(t, one.Id, 1)
assert.Equal(t, one.Value, []byte("foo"))
assert.Equal(t, one.Queue, "testqueue")

two, ok := it.NextRecord()
assert.Equal(t, ok, true)
assert.Equal(t, two.Id, 2)
assert.Equal(t, two.Value, []byte("bar"))
assert.Equal(t, two.Queue, "testqueue")

_, ok = it.NextRecord()
assert.Equal(t, ok, false)
}
2 changes: 2 additions & 0 deletions queued/level_store.go
@@ -1,3 +1,5 @@
// +build !use_goleveldb

package queued

import (
Expand Down
2 changes: 2 additions & 0 deletions queued/level_store_test.go
@@ -1,3 +1,5 @@
// +build !use_goleveldb

package queued

import (
Expand Down

0 comments on commit 8d23be9

Please sign in to comment.