Skip to content

Commit

Permalink
[unitdb] update db sync and add updte db tests
Browse files Browse the repository at this point in the history
  • Loading branch information
unit-adm committed Nov 19, 2020
1 parent 6191cc3 commit 970cac8
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 98 deletions.
2 changes: 1 addition & 1 deletion block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newBlockReader(fs *_FileSet) *_BlockReader {

indexFile, err := fs.getFile(_FileDesc{fileType: typeIndex})
if err != nil {
return nil
return r
}
r.indexFile = indexFile

Expand Down
37 changes: 23 additions & 14 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ func Open(path string, opts ...Options) (*DB, error) {
}
}

// fsys := options.fileSystem
lock, err := createLockFile(path + lockPostfix)
lock, err := createLockFile(path)
if err != nil {
if err == os.ErrExist {
err = errLocked
Expand Down Expand Up @@ -129,18 +128,28 @@ func Open(path string, opts ...Options) (*DB, error) {

fileset := &_FileSet{mu: new(sync.RWMutex), list: []_FileSet{infoFile, winFile, indexFile, dataFile, leaseFile, filterFile}}
internal := &_DB{
mutex: newMutex(),
dbInfo: dbInfo,
info: infoFile,
mutex: newMutex(),
start: time.Now(),
meter: NewMeter(),

dbInfo: dbInfo,

bufPool: bpool.NewBufferPool(options.bufferSize, &bpool.Options{MaxElapsedTime: 10 * time.Second}),

info: infoFile,
filter: Filter{file: filterFile, filterBlock: fltr.NewFilterGenerator()},
freeList: lease,

timeWindow: newTimeWindowBucket(timeOptions),
filter: Filter{file: filterFile, filterBlock: fltr.NewFilterGenerator()},
freeList: lease,
reader: newBlockReader(fileset),
syncLockC: make(chan struct{}, 1),
bufPool: bpool.NewBufferPool(options.bufferSize, &bpool.Options{MaxElapsedTime: 10 * time.Second}),
trie: newTrie(),
start: time.Now(),
meter: NewMeter(),

// Trie
trie: newTrie(),

// Block reader
reader: newBlockReader(fileset),

// Sync Handler
syncLockC: make(chan struct{}, 1),

// Close
closeC: make(chan struct{}),
Expand Down Expand Up @@ -178,7 +187,7 @@ func Open(path string, opts ...Options) (*DB, error) {
logger.Error().Err(err).Str("context", "db.loadTrie")
}

// Read freeList before DB recovery
// Read freeList.
if err := db.internal.freeList.read(); err != nil {
logger.Error().Err(err).Str("context", "db.readHeader")
return nil, err
Expand Down
28 changes: 11 additions & 17 deletions db_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,29 @@ const (
type (
_DB struct {
mutex _Mutex
mac *crypto.MAC

// The db start time.
start time.Time

// The metrics to measure timeseries on message events.
meter *Meter

dbInfo _DBInfo
mac *crypto.MAC

mem *memdb.DB
bufPool *bpool.BufferPool
info _FileSet
filter Filter
freeList *_Lease
reader *_BlockReader
mem *memdb.DB
bufPool *bpool.BufferPool

timeWindow *_TimeWindowBucket

//trie
// Trie
trie *_Trie

// Block reader
reader *_BlockReader

// sync handler
syncLockC chan struct{}
syncWrites bool
Expand Down Expand Up @@ -166,12 +168,8 @@ func (db *DB) close() error {

// loadTopicHash loads topic and offset from window file.
func (db *DB) loadTrie() error {
winFile, err := db.fs.getFile(_FileDesc{fileType: typeTimeWindow})
if err != nil {
return err
}
r := newWindowReader(winFile)
err = r.foreachWindowBlock(func(startSeq, topicHash uint64, off int64) (bool, error) {
r := newWindowReader(db.fs)
err := r.foreachWindowBlock(func(startSeq, topicHash uint64, off int64) (bool, error) {
// fmt.Println("db.loadTrie: topicHash, seq ", topicHash, startSeq)
e, err := db.internal.reader.readIndexEntry(startSeq)
if err != nil {
Expand Down Expand Up @@ -228,16 +226,12 @@ func (db *DB) lookup(q *Query) error {
sort.Slice(topics[:], func(i, j int) bool {
return topics[i].offset > topics[j].offset
})
winFile, err := db.fs.getFile(_FileDesc{fileType: typeTimeWindow})
if err != nil {
return err
}
for _, topic := range topics {
if len(q.internal.winEntries) > q.Limit {
break
}
limit := q.Limit - len(q.internal.winEntries)
wEntries := db.internal.timeWindow.lookup(winFile, topic.hash, topic.offset, q.internal.cutoff, limit)
wEntries := db.internal.timeWindow.lookup(db.fs, topic.hash, topic.offset, q.internal.cutoff, limit)
for _, we := range wEntries {
q.internal.winEntries = append(q.internal.winEntries, _Query{topicHash: topic.hash, seq: we.seq()})
}
Expand Down
10 changes: 4 additions & 6 deletions db_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ type (

rawWindow *bpool.Buffer
rawBlock *bpool.Buffer

// offsets for rollback in case of sync error.
winOff int64
}
)

Expand All @@ -58,16 +55,17 @@ func (db *_SyncHandle) startSync() bool {
db.rawWindow = db.internal.bufPool.Get()
db.rawBlock = db.internal.bufPool.Get()

winFile, err := db.fs.getFile(_FileDesc{fileType: typeTimeWindow})
var err error
db.windowWriter, err = newWindowWriter(db.fs, db.rawWindow)
if err != nil {
logger.Error().Err(err).Str("context", "startSync").Msg("Error syncing to db")
return false
}
db.windowWriter = newWindowWriter(winFile, db.rawWindow)
db.blockWriter, err = newBlockWriter(db.fs, db.internal.freeList, db.rawBlock)
if err != nil {
logger.Error().Err(err).Str("context", "startSync").Msg("Error syncing to db")
return false
}
db.winOff = winFile.currSize()
db.syncInfo.syncStatusOk = true

return db.syncInfo.syncStatusOk
Expand Down
38 changes: 21 additions & 17 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@ import (
"time"
)

func cleanup(path string) {
// os.Remove(path + indexPostfix)
// os.Remove(path + dataPostfix)
os.Remove(path + lockPostfix)
// os.Remove(path + windowPostfix)
// os.Remove(path + filterPostfix)
var (
dbPath = "test"
)

func cleanup() {
os.RemoveAll(dbPath)
}

func TestSimple(t *testing.T) {
cleanup("test.db")
db, err := Open("test.db", WithBufferSize(1<<4), WithBlockCacheSize(1<<16), WithLogSize(1<<16), WithMinimumFreeBlocksSize(1<<16))
db, err := Open(dbPath, WithBufferSize(1<<4), WithBlockCacheSize(1<<16), WithLogSize(1<<16), WithMinimumFreeBlocksSize(1<<16))
if err != nil {
t.Fatal(err)
}

var i uint16
var n uint16 = 1000

Expand Down Expand Up @@ -103,11 +103,13 @@ func TestSimple(t *testing.T) {
}
verifyMsgsAndClose()

db, err = Open("test.db", WithMutable())
db, err = Open(dbPath, WithMutable())
if err != nil {
t.Fatal(err)
}
defer db.Close()
defer cleanup()

_, err = db.Get(NewQuery(topic).WithContract(contract).WithLimit(int(n)))
if err != nil {
t.Fatal(err)
Expand All @@ -133,12 +135,12 @@ func TestSimple(t *testing.T) {
}

func TestBatch(t *testing.T) {
cleanup("test.db")
db, err := Open("test.db", WithBufferSize(1<<16), WithBlockCacheSize(1<<16), WithLogSize(1<<16), WithMinimumFreeBlocksSize(1<<16), WithMutable(), WithBackgroundKeyExpiry())
db, err := Open(dbPath, WithBufferSize(1<<16), WithBlockCacheSize(1<<16), WithLogSize(1<<16), WithMinimumFreeBlocksSize(1<<16), WithMutable(), WithBackgroundKeyExpiry())
if err != nil {
t.Fatal(err)
}
defer db.Close()
defer cleanup()

contract, err := db.NewContract()
if err != nil {
Expand Down Expand Up @@ -196,12 +198,13 @@ func TestBatch(t *testing.T) {
}

func TestExpiry(t *testing.T) {
cleanup("test.db")
db, err := Open("test.db", WithMutable(), WithBackgroundKeyExpiry())
db, err := Open(dbPath, WithMutable(), WithBackgroundKeyExpiry())
if err != nil {
t.Fatal(err)
}
defer db.Close()
defer cleanup()

contract, err := db.NewContract()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -237,12 +240,13 @@ func TestExpiry(t *testing.T) {
}

func TestLeasing(t *testing.T) {
cleanup("test.db")
db, err := Open("test.db", WithBufferSize(1<<16), WithBlockCacheSize(1<<16), WithLogSize(1<<16), WithMinimumFreeBlocksSize(1<<4), WithMutable(), WithBackgroundKeyExpiry())
db, err := Open(dbPath, WithBufferSize(1<<16), WithBlockCacheSize(1<<16), WithLogSize(1<<16), WithMinimumFreeBlocksSize(1<<4), WithMutable(), WithBackgroundKeyExpiry())
if err != nil {
t.Fatal(err)
}
defer db.Close()
defer cleanup()

var i uint16
var n uint16 = 100

Expand Down Expand Up @@ -282,12 +286,12 @@ func TestLeasing(t *testing.T) {
}

func TestWildcardTopics(t *testing.T) {
cleanup("test.db")
db, err := Open("test.db", WithBufferSize(1<<16), WithBlockCacheSize(1<<16), WithLogSize(1<<16), WithMinimumFreeBlocksSize(1<<16), WithMutable(), WithBackgroundKeyExpiry())
db, err := Open(dbPath, WithBufferSize(1<<16), WithBlockCacheSize(1<<16), WithLogSize(1<<16), WithMinimumFreeBlocksSize(1<<16), WithMutable(), WithBackgroundKeyExpiry())
if err != nil {
t.Fatal(err)
}
defer db.Close()
defer cleanup()

tests := []struct {
wtopic []byte
Expand Down
34 changes: 20 additions & 14 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (

typeAll = typeInfo | typeTimeWindow | typeIndex | typeData | typeLease | typeFilter

prefix = "unitdb"
indexDir = "index"
dataDir = "data"
winDir = "window"
Expand All @@ -51,36 +52,36 @@ type _FileDesc struct {
fd uintptr
}

func filePath(prefix string, fd _FileDesc) string {
func filePath(dirName string, fd _FileDesc) string {
name := fmt.Sprintf("%#x-%d", fd.fileType, fd.num)
if err := ensureDir(indexDir); err != nil {
if err := ensureDir(path.Join(dirName, indexDir)); err != nil {
return name
}
if err := ensureDir(dataDir); err != nil {
if err := ensureDir(path.Join(dirName, dataDir)); err != nil {
return name
}
if err := ensureDir(winDir); err != nil {
if err := ensureDir(path.Join(dirName, winDir)); err != nil {
return name
}
switch fd.fileType {
case typeInfo:
suffix := fmt.Sprintf("%s.info", prefix)
return suffix
return path.Join(dirName, suffix)
case typeTimeWindow:
suffix := fmt.Sprintf("%s%04d.win", prefix, fd.num)
return path.Join(winDir, suffix)
return path.Join(dirName, winDir, suffix)
case typeIndex:
suffix := fmt.Sprintf("%s%04d.index", prefix, fd.num)
return path.Join(indexDir, suffix)
return path.Join(dirName, indexDir, suffix)
case typeData:
suffix := fmt.Sprintf("%s%04d.data", prefix, fd.num)
return path.Join(dataDir, suffix)
return path.Join(dirName, dataDir, suffix)
case typeLease:
suffix := fmt.Sprintf("%s.lease", prefix)
return suffix
return path.Join(dirName, suffix)
case typeFilter:
suffix := fmt.Sprintf("%s.filter", prefix)
return suffix
return path.Join(dirName, suffix)
default:
return fmt.Sprintf("%#x-%d", fd.fileType, fd.num)
}
Expand All @@ -107,11 +108,16 @@ type (
)

// createLockFile to create lock file.
func createLockFile(name string) (_LockFile, error) {
return newLockFile(name)
func createLockFile(dirName string) (_LockFile, error) {
if err := ensureDir(dirName); err != nil {
return nil, err
}
suffix := fmt.Sprintf("%s.lock", prefix)

return newLockFile(path.Join(dirName, suffix))
}

func newFile(name string, nFiles int16, fd _FileDesc) (_FileSet, error) {
func newFile(path string, nFiles int16, fd _FileDesc) (_FileSet, error) {
if nFiles == 0 {
return _FileSet{}, errors.New("no new file")
}
Expand All @@ -121,7 +127,7 @@ func newFile(name string, nFiles int16, fd _FileDesc) (_FileSet, error) {
fs := _FileSet{mu: new(sync.RWMutex), fileMap: make(map[int16]_File, nFiles)}
for i := int16(0); i < nFiles; i++ {
fd.num = i
path := filePath(name, fd)
path := filePath(path, fd)
fi, err := os.OpenFile(path, fileFlag, fileMode)
if err != nil {
return fs, err
Expand Down
4 changes: 2 additions & 2 deletions time_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,13 @@ func (tw *_TimeWindowBucket) ilookup(topicHash uint64, limit int) (winEntries _W
}

// lookup lookups window entries from window file.
func (tw *_TimeWindowBucket) lookup(winFile *_File, topicHash uint64, off, cutoff int64, limit int) (winEntries _WindowEntries) {
func (tw *_TimeWindowBucket) lookup(fileSet *_FileSet, topicHash uint64, off, cutoff int64, limit int) (winEntries _WindowEntries) {
winEntries = make([]_WinEntry, 0)
winEntries = tw.ilookup(topicHash, limit)
if len(winEntries) >= limit {
return winEntries
}
r := newWindowReader(winFile)
r := newWindowReader(fileSet)
next := func(blockOff int64, f func(_WinBlock) (bool, error)) error {
for {
b, err := r.readBlock(blockOff)
Expand Down

0 comments on commit 970cac8

Please sign in to comment.