Skip to content

Commit

Permalink
refactor: Optimize the code structure so that DB and Tx use a set of …
Browse files Browse the repository at this point in the history
…indexing processes (#479)

* refactor: Optimize the code structure so that DB and Tx use a set of indexing processes

* style: Rename some variable names that were warned

* refactor: Remove the obsolete ioutil.ReadDir and use os.ReadDir instead
  • Loading branch information
bigboss2063 committed Oct 27, 2023
1 parent b569ba8 commit e6f2306
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 197 deletions.
105 changes: 53 additions & 52 deletions db.go
Expand Up @@ -19,7 +19,6 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path"
Expand Down Expand Up @@ -178,14 +177,14 @@ func open(opt Options) (*DB, error) {
}
}

flock := flock.New(filepath.Join(opt.Dir, FLockName))
if ok, err := flock.TryLock(); err != nil {
fileLock := flock.New(filepath.Join(opt.Dir, FLockName))
if ok, err := fileLock.TryLock(); err != nil {
return nil, err
} else if !ok {
return nil, ErrDirLocked
}

db.flock = flock
db.flock = fileLock

if err := db.buildIndexes(); err != nil {
return nil, fmt.Errorf("db.buildIndexes error: %s", err)
Expand Down Expand Up @@ -466,8 +465,8 @@ func (db *DB) doWrites() {

// setActiveFile sets the ActiveFile (DataFile object).
func (db *DB) setActiveFile() (err error) {
filepath := getDataPath(db.MaxFileID, db.opt.Dir)
db.ActiveFile, err = db.fm.getDataFile(filepath, db.opt.SegmentSize)
activeFilePath := getDataPath(db.MaxFileID, db.opt.Dir)
db.ActiveFile, err = db.fm.getDataFile(activeFilePath, db.opt.SegmentSize)
if err != nil {
return
}
Expand All @@ -479,21 +478,22 @@ func (db *DB) setActiveFile() (err error) {

// getMaxFileIDAndFileIds returns max fileId and fileIds.
func (db *DB) getMaxFileIDAndFileIDs() (maxFileID int64, dataFileIds []int) {
files, _ := ioutil.ReadDir(db.opt.Dir)
files, _ := os.ReadDir(db.opt.Dir)

if len(files) == 0 {
return 0, nil
}

for _, f := range files {
id := f.Name()
fileSuffix := path.Ext(path.Base(id))
for _, file := range files {
filename := file.Name()
fileSuffix := path.Ext(path.Base(filename))
if fileSuffix != DataSuffix {
continue
}

id = strings.TrimSuffix(id, DataSuffix)
idVal, _ := strconv2.StrToInt(id)
dataFileIds = append(dataFileIds, idVal)
filename = strings.TrimSuffix(filename, DataSuffix)
id, _ := strconv2.StrToInt(filename)
dataFileIds = append(dataFileIds, id)
}

if len(dataFileIds) == 0 {
Expand Down Expand Up @@ -670,24 +670,10 @@ func (db *DB) buildBTreeIdx(r *Record) {
b.Delete(key)
} else {
if meta.TTL != Persistent {
now := time.UnixMilli(time.Now().UnixMilli())
expireTime := time.UnixMilli(int64(meta.Timestamp))
expireTime = expireTime.Add(time.Duration(int64(meta.TTL)) * time.Second)
expire := expireTime.Sub(now)

callback := func() {
err := db.Update(func(tx *Tx) error {
if tx.db.tm.exist(bucket, string(key)) {
return tx.Delete(bucket, key)
}
return nil
})
if err != nil {
log.Printf("occur error when expired deletion, error: %v", err.Error())
}
}
expireTime := db.expireTime(meta.Timestamp, meta.TTL)
callback := db.buildExpireCallback(bucket, key)

db.tm.add(bucket, string(key), expire, callback)
db.tm.add(bucket, string(key), expireTime, callback)
} else {
db.tm.del(bucket, string(key))
}
Expand All @@ -696,6 +682,13 @@ func (db *DB) buildBTreeIdx(r *Record) {
}
}

func (db *DB) expireTime(timestamp uint64, ttl uint32) time.Duration {
now := time.UnixMilli(time.Now().UnixMilli())
expireTime := time.UnixMilli(int64(timestamp))
expireTime = expireTime.Add(time.Duration(int64(ttl)) * time.Second)
return expireTime.Sub(now)
}

func (db *DB) buildIdxes(r *Record) error {
switch r.H.Meta.Ds {
case DataStructureBTree:
Expand Down Expand Up @@ -787,9 +780,7 @@ func (db *DB) buildSortedSetIdx(r *Record) error {
case DataZRemFlag:
_, err = ss.ZRem(string(key), val)
case DataZRemRangeByRankFlag:
startAndEnd := strings.Split(string(val), SeparatorForZSetKey)
start, _ := strconv2.StrToInt(startAndEnd[0])
end, _ := strconv2.StrToInt(startAndEnd[1])
start, end := splitIntIntStr(string(val), SeparatorForZSetKey)
err = ss.ZRemRangeByRank(string(key), start, end)
case DataZPopMaxFlag:
_, _, err = ss.ZPopMax(string(key))
Expand Down Expand Up @@ -828,33 +819,17 @@ func (db *DB) buildListIdx(r *Record) error {
case DataRPushFlag:
err = l.RPush(string(key), r)
case DataLRemFlag:
countAndValueIndex := strings.Split(string(val), SeparatorForListKey)
count, _ := strconv2.StrToInt(countAndValueIndex[0])
value := []byte(countAndValueIndex[1])

err = l.LRem(string(key), count, func(r *Record) (bool, error) {
v, err := db.getValueByRecord(r)
if err != nil {
return false, err
}
return bytes.Equal(value, v), nil
})
err = db.buildListLRemIdx(val, l, key)
case DataLPopFlag:
_, err = l.LPop(string(key))
case DataRPopFlag:
_, err = l.RPop(string(key))
case DataLTrimFlag:
keyAndStartIndex := strings.Split(string(key), SeparatorForListKey)
newKey := keyAndStartIndex[0]
start, _ := strconv2.StrToInt(keyAndStartIndex[1])
newKey, start := splitStringIntStr(string(key), SeparatorForListKey)
end, _ := strconv2.StrToInt(string(val))
err = l.LTrim(newKey, start, end)
case DataLRemByIndex:
var indexes []int
indexes, err = UnmarshalInts(val)
if err != nil {
break
}
indexes, _ := UnmarshalInts(val)
err = l.LRemByIndex(string(key), indexes)
}

Expand All @@ -865,6 +840,18 @@ func (db *DB) buildListIdx(r *Record) error {
return nil
}

func (db *DB) buildListLRemIdx(value []byte, l *List, key []byte) error {
count, newValue := splitIntStringStr(string(value), SeparatorForListKey)

return l.LRem(string(key), count, func(r *Record) (bool, error) {
v, err := db.getValueByRecord(r)
if err != nil {
return false, err
}
return bytes.Equal([]byte(newValue), v), nil
})
}

// buildIndexes builds indexes when db initialize resource.
func (db *DB) buildIndexes() (err error) {
var (
Expand Down Expand Up @@ -948,3 +935,17 @@ func (db *DB) checkListExpired() {
func (db *DB) IsClose() bool {
return db.closed
}

func (db *DB) buildExpireCallback(bucket string, key []byte) func() {
return func() {
err := db.Update(func(tx *Tx) error {
if db.tm.exist(bucket, string(key)) {
return tx.Delete(bucket, key)
}
return nil
})
if err != nil {
log.Printf("occur error when expired deletion, error: %v", err.Error())
}
}
}
158 changes: 13 additions & 145 deletions tx.go
Expand Up @@ -17,13 +17,10 @@ package nutsdb
import (
"bytes"
"errors"
"log"
"strings"
"sync/atomic"
"time"

"github.com/bwmarrin/snowflake"
"github.com/xujiajun/utils/strconv2"
"strings"
"sync/atomic"
)

const (
Expand Down Expand Up @@ -268,7 +265,7 @@ func (tx *Tx) Commit() (err error) {
}

if err := tx.buildIdxes(records); err != nil {
return err
panic(err.Error())
}
tx.db.RecordCount += curWriteCount

Expand Down Expand Up @@ -510,141 +507,6 @@ func (tx *Tx) allocCommitBuffer() *bytes.Buffer {
return buff
}

func (tx *Tx) buildTreeIdx(record *Record) {
bucket, key, meta, offset := record.Bucket, record.H.Key, record.H.Meta, record.H.DataPos

b := tx.db.Index.bTree.getWithDefault(bucket)

if meta.Flag == DataSetFlag {
var value []byte
if tx.db.opt.EntryIdxMode == HintKeyValAndRAMIdxMode {
value = record.V
}

if meta.TTL != Persistent {
db := tx.db

callback := func() {
err := db.Update(func(tx *Tx) error {
if db.tm.exist(bucket, string(key)) {
return tx.Delete(bucket, key)
}
return nil
})
if err != nil {
log.Printf("occur error when expired deletion, error: %v", err.Error())
}
}

now := time.UnixMilli(time.Now().UnixMilli())
expireTime := time.UnixMilli(int64(record.H.Meta.Timestamp))
expireTime = expireTime.Add(time.Duration(record.H.Meta.TTL) * time.Second)

if now.After(expireTime) {
return
}

tx.db.tm.add(bucket, string(key), expireTime.Sub(now), callback)
} else {
tx.db.tm.del(bucket, string(key))
}

hint := NewHint().WithFileId(tx.db.ActiveFile.fileID).WithKey(key).WithMeta(meta).WithDataPos(offset)
b.Insert(key, value, hint)
} else if meta.Flag == DataDeleteFlag {
tx.db.tm.del(bucket, string(key))
b.Delete(key)
}
}

func (tx *Tx) buildSetIdx(record *Record) {
bucket, key, value, meta := record.Bucket, record.H.Key, record.V, record.H.Meta

tx.db.resetRecordByMode(record)

s := tx.db.Index.set.getWithDefault(bucket)

if meta.Flag == DataDeleteFlag {
_ = s.SRem(string(key), value)
}

if meta.Flag == DataSetFlag {
_ = s.SAdd(string(key), [][]byte{value}, []*Record{record})
}
}

func (tx *Tx) buildSortedSetIdx(record *Record) {
bucket, key, value, meta := record.Bucket, record.H.Key, record.V, record.H.Meta

tx.db.resetRecordByMode(record)

ss := tx.db.Index.sortedSet.getWithDefault(bucket, tx.db)

switch meta.Flag {
case DataZAddFlag:
key, score := splitStringFloat64Str(string(key), SeparatorForZSetKey)
_ = ss.ZAdd(key, SCORE(score), value, record)
case DataZRemFlag:
_, _ = ss.ZRem(string(key), value)
case DataZRemRangeByRankFlag:
start, end := splitIntIntStr(string(value), SeparatorForZSetKey)
_ = ss.ZRemRangeByRank(string(key), start, end)
case DataZPopMaxFlag:
_, _, _ = ss.ZPopMax(string(key))
case DataZPopMinFlag:
_, _, _ = ss.ZPopMin(string(key))
}
}

func (tx *Tx) buildListIdx(record *Record) {
bucket, key, value, meta := record.Bucket, record.H.Key, record.V, record.H.Meta

tx.db.resetRecordByMode(record)

l := tx.db.Index.list.getWithDefault(bucket)

if IsExpired(meta.TTL, meta.Timestamp) {
return
}

switch meta.Flag {
case DataExpireListFlag:
t, _ := strconv2.StrToInt64(string(value))
ttl := uint32(t)
l.TTL[string(key)] = ttl
l.TimeStamp[string(key)] = meta.Timestamp
case DataLPushFlag:
_ = l.LPush(string(key), record)
case DataRPushFlag:
_ = l.RPush(string(key), record)
case DataLRemFlag:
tx.buildListLRemIdx(value, l, key)
case DataLPopFlag:
_, _ = l.LPop(string(key))
case DataRPopFlag:
_, _ = l.RPop(string(key))
case DataLTrimFlag:
newKey, start := splitStringIntStr(string(key), SeparatorForListKey)
end, _ := strconv2.StrToInt(string(value))
_ = l.LTrim(newKey, start, end)
case DataLRemByIndex:
indexes, _ := UnmarshalInts(value)
_ = l.LRemByIndex(string(key), indexes)
}
}

func (tx *Tx) buildListLRemIdx(value []byte, l *List, key []byte) {
count, newValue := splitIntStringStr(string(value), SeparatorForListKey)

_ = l.LRem(string(key), count, func(r *Record) (bool, error) {
v, err := tx.db.getValueByRecord(r)
if err != nil {
return false, err
}
return bytes.Equal([]byte(newValue), v), nil
})
}

// rotateActiveFile rotates log file when active file is not enough space to store the entry.
func (tx *Tx) rotateActiveFile() error {
var err error
Expand Down Expand Up @@ -825,18 +687,19 @@ func (tx *Tx) isClosed() bool {
}

func (tx *Tx) buildIdxes(records []*Record) error {
var err error
for _, record := range records {
bucket, meta := record.Bucket, record.H.Meta

switch meta.Ds {
case DataStructureBTree:
tx.buildTreeIdx(record)
tx.db.buildBTreeIdx(record)
case DataStructureList:
tx.buildListIdx(record)
err = tx.db.buildListIdx(record)
case DataStructureSet:
tx.buildSetIdx(record)
err = tx.db.buildSetIdx(record)
case DataStructureSortedSet:
tx.buildSortedSetIdx(record)
err = tx.db.buildSortedSetIdx(record)
case DataStructureNone:
switch meta.Flag {
case DataBPTreeBucketDeleteFlag:
Expand All @@ -849,6 +712,11 @@ func (tx *Tx) buildIdxes(records []*Record) error {
tx.db.deleteBucket(DataStructureList, bucket)
}
}

if err != nil {
return err
}

tx.db.KeyCount++
}
return nil
Expand Down

0 comments on commit e6f2306

Please sign in to comment.