Skip to content

Commit

Permalink
change fdm to be not a singlton
Browse files Browse the repository at this point in the history
  • Loading branch information
elliotchenzichang committed May 17, 2022
2 parents c99c4ce + 1027a0e commit 9a12b71
Show file tree
Hide file tree
Showing 25 changed files with 1,152 additions and 1,260 deletions.
2 changes: 1 addition & 1 deletion README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -1955,7 +1955,7 @@ BoltDB最慢。

* Segment配置问题

NutsDB会自动切割分成一个个块(Segment),默认`SegmentSize`是8MB,这个参数可以自己配置(比如16MB、64MB等),但是**一旦配置不能修改**
NutsDB会自动切割分成一个个块(Segment),默认`SegmentSize`是8MB,这个参数可以自己需要配置(比如16MB、32MB、64MB、128MB、512MB等),但是**一旦配置不能修改**

* key和value的大小限制问题

Expand Down
6 changes: 1 addition & 5 deletions bptree.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ var (
// ErrScansNoResult is returned when Range or prefixScan or prefixSearchScan are called no result to found.
ErrScansNoResult = errors.New("range scans or prefix or prefix and search scans no result")

// ErrPrefixScansNoResult is returned when prefixScan is called no result to found.
ErrPrefixScansNoResult = errors.New("prefix scans no result")

// ErrPrefixSearchScansNoResult is returned when prefixSearchScan is called no result to found.
ErrPrefixSearchScansNoResult = errors.New("prefix and search scans no result")

Expand Down Expand Up @@ -91,7 +88,6 @@ type (
LastKey []byte
LastAddress int64
Filepath string
bucketSize uint32
keyPosMap map[string]int64
enabledKeyPosMap bool
}
Expand Down Expand Up @@ -511,7 +507,7 @@ func (t *BPTree) PrefixScan(prefix []byte, offsetNum int, limitNum int) (records
n = t.FindLeaf(prefix)

if n == nil {
return nil, off, ErrPrefixScansNoResult
return nil, off, ErrPrefixScan
}

for j = 0; j < n.KeysNum && compare(n.Keys[j], prefix) < 0; {
Expand Down
7 changes: 7 additions & 0 deletions bucket_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ func (suite *BucketTestSuite) SetupSuite() {
defer fd.Close()
}

func (suite *BucketTestSuite) TearDownSuite() {
err := os.RemoveAll(suite.tempFile)
if err != nil {
require.Failf(suite.T(), "remve file fail", err.Error())
}
}

func (suite *BucketTestSuite) TestEncode() {
encodeValue := suite.bucketMeat.Encode()
assert.Equal(suite.T(), suite.expectedEncode, encodeValue)
Expand Down
4 changes: 2 additions & 2 deletions datafile.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ type DataFile struct {
}

// NewDataFile returns a newly initialized DataFile object.
func NewDataFile(path string, capacity int64, rwMode RWMode) (df *DataFile, err error) {
func NewDataFile(path string, capacity int64, rwMode RWMode, fdm *fdManager) (df *DataFile, err error) {
var rwManager RWManager

if capacity <= 0 {
return nil, ErrCapacity
}

if rwMode == FileIO {
rwManager, err = NewFileIORWManager(path, capacity)
rwManager, err = NewFileIORWManager(path, capacity, fdm)
if err != nil {
return nil, err
}
Expand Down
17 changes: 9 additions & 8 deletions datafile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ func init() {
if err != nil {
return
}

InitOpt("", true)
db, err = Open(opt)
}
func TestDataFile_Err(t *testing.T) {
_, err := NewDataFile(filePath, -1, FileIO)
_, err := NewDataFile(filePath, -1, FileIO, db.fdm)
defer os.Remove(filePath)

if err == nil {
Expand All @@ -56,7 +57,7 @@ func TestDataFile_Err(t *testing.T) {
}

func TestDataFile1(t *testing.T) {
df, err := NewDataFile(filePath, 1024, MMap)
df, err := NewDataFile(filePath, 1024, MMap, db.fdm)
defer os.Remove(filePath)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -86,7 +87,7 @@ func TestDataFile1(t *testing.T) {

func TestDataFile2(t *testing.T) {
filePath2 := "/tmp/foo2"
df, err := NewDataFile(filePath2, 39, FileIO)
df, err := NewDataFile(filePath2, 39, FileIO, db.fdm)
defer os.Remove(filePath2)
if err != nil {
t.Fatal(err)
Expand All @@ -105,7 +106,7 @@ func TestDataFile2(t *testing.T) {
}

filePath3 := "/tmp/foo3"
df, err = NewDataFile(filePath3, 41, FileIO)
df, err = NewDataFile(filePath3, 41, FileIO, db.fdm)
defer os.Remove(filePath3)
if err != nil {
t.Fatal(err)
Expand All @@ -125,7 +126,7 @@ func TestDataFile2(t *testing.T) {
}

func TestDataFile_ReadAt(t *testing.T) {
df, err := NewDataFile(filePath, 1024, FileIO)
df, err := NewDataFile(filePath, 1024, FileIO, db.fdm)
defer os.Remove(filePath)
if err != nil {
t.Fatal(err)
Expand All @@ -145,7 +146,7 @@ func TestDataFile_ReadAt(t *testing.T) {

func TestDataFile_Err_Path(t *testing.T) {
filePath5 := ":/tmp/foo5"
df, err := NewDataFile(filePath5, entry.Size(), FileIO)
df, err := NewDataFile(filePath5, entry.Size(), FileIO, db.fdm)
if err == nil && df != nil {
t.Error("err TestDataFile_All open")
}
Expand All @@ -154,7 +155,7 @@ func TestDataFile_Err_Path(t *testing.T) {
func TestDataFile_Crc_Err(t *testing.T) {
filePath4 := "/tmp/foo4"

df, err := NewDataFile(filePath4, entry.Size(), FileIO)
df, err := NewDataFile(filePath4, entry.Size(), FileIO, db.fdm)
defer os.Remove(filePath4)
if err != nil {
t.Fatal(err)
Expand Down
59 changes: 35 additions & 24 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ type (
KeyCount int // total key number ,include expired, deleted, repeated.
closed bool
isMerging bool
fdm *fdManager
}

// BPTreeIdx represents the B+ tree index
Expand Down Expand Up @@ -197,10 +198,9 @@ func Open(opt Options) (*DB, error) {
BPTreeKeyEntryPosMap: make(map[string]int64),
bucketMetas: make(map[string]*BucketMeta),
ActiveCommittedTxIdsIdx: NewTree(),
fdm: newFdm(opt.MaxFdNumsInCache, opt.CleanFdsCacheThreshold),
}

fdm.setOptions(opt.MaxFdNumsInCache, opt.CleanFdsCacheThreshold)

if ok := filesystem.PathIsExist(db.opt.Dir); !ok {
if err := os.MkdirAll(db.opt.Dir, os.ModePerm); err != nil {
return nil, err
Expand Down Expand Up @@ -329,7 +329,7 @@ func (db *DB) Merge() error {

for _, pendingMergeFId := range pendingMergeFIds {
off = 0
f, err := NewDataFile(db.getDataPath(int64(pendingMergeFId)), db.opt.SegmentSize, db.opt.RWMode)
f, err := NewDataFile(db.getDataPath(int64(pendingMergeFId)), db.opt.SegmentSize, db.opt.RWMode, db.fdm)
if err != nil {
db.isMerging = false
return err
Expand Down Expand Up @@ -434,8 +434,8 @@ func (db *DB) Close() error {
db.ActiveFile = nil

db.BPTreeIdx = nil
db.fdm.close()

err = fdm.close()
if err != nil {
return err
}
Expand All @@ -446,7 +446,7 @@ func (db *DB) Close() error {
// setActiveFile sets the ActiveFile (DataFile object).
func (db *DB) setActiveFile() (err error) {
filepath := db.getDataPath(db.MaxFileID)
db.ActiveFile, err = NewDataFile(filepath, db.opt.SegmentSize, db.opt.RWMode)
db.ActiveFile, err = NewDataFile(filepath, db.opt.SegmentSize, db.opt.RWMode, db.fdm)
if err != nil {
return
}
Expand Down Expand Up @@ -530,7 +530,7 @@ func (db *DB) parseDataFiles(dataFileIds []int) (unconfirmedRecords []*Record, c
for _, dataID := range dataFileIds {
off = 0
fID := int64(dataID)
f, err := NewDataFile(db.getDataPath(fID), db.opt.SegmentSize, db.opt.StartFileLoadingMode)
f, err := NewDataFile(db.getDataPath(fID), db.opt.SegmentSize, db.opt.StartFileLoadingMode, db.fdm)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -1008,42 +1008,53 @@ func (db *DB) getBPTRootTxIDPath(fID int64) string {

func (db *DB) getPendingMergeEntries(entry *Entry, pendingMergeEntries []*Entry) []*Entry {
if entry.Meta.Ds == DataStructureBPTree {
if r, err := db.BPTreeIdx[string(entry.Meta.Bucket)].Find(entry.Key); err == nil {
if r.H.Meta.Flag == DataSetFlag {
bptIdx, exist := db.BPTreeIdx[string(entry.Meta.Bucket)]
if exist {
r, err := bptIdx.Find(entry.Key)
if err == nil && r.H.Meta.Flag == DataSetFlag {
pendingMergeEntries = append(pendingMergeEntries, entry)
}
}
}

if entry.Meta.Ds == DataStructureSet {
if db.SetIdx[string(entry.Meta.Bucket)].SIsMember(string(entry.Key), entry.Value) {
pendingMergeEntries = append(pendingMergeEntries, entry)
setIdx, exist := db.SetIdx[string(entry.Meta.Bucket)]
if exist {
if setIdx.SIsMember(string(entry.Key), entry.Value) {
pendingMergeEntries = append(pendingMergeEntries, entry)
}
}
}

if entry.Meta.Ds == DataStructureSortedSet {
keyAndScore := strings.Split(string(entry.Key), SeparatorForZSetKey)
if len(keyAndScore) == 2 {
key := keyAndScore[0]
n := db.SortedSetIdx[string(entry.Meta.Bucket)].GetByKey(key)
if n != nil {
pendingMergeEntries = append(pendingMergeEntries, entry)
sortedSetIdx, exist := db.SortedSetIdx[string(entry.Meta.Bucket)]
if exist {
n := sortedSetIdx.GetByKey(key)
if n != nil {
pendingMergeEntries = append(pendingMergeEntries, entry)
}
}
}
}

if entry.Meta.Ds == DataStructureList {
items, _ := db.ListIdx[string(entry.Meta.Bucket)].LRange(string(entry.Key), 0, -1)
ok := false
if entry.Meta.Flag == DataRPushFlag || entry.Meta.Flag == DataLPushFlag {
for _, item := range items {
if string(entry.Value) == string(item) {
ok = true
break
listIdx, exist := db.ListIdx[string(entry.Meta.Bucket)]
if exist {
items, _ := listIdx.LRange(string(entry.Key), 0, -1)
ok := false
if entry.Meta.Flag == DataRPushFlag || entry.Meta.Flag == DataLPushFlag {
for _, item := range items {
if string(entry.Value) == string(item) {
ok = true
break
}
}
if ok {
pendingMergeEntries = append(pendingMergeEntries, entry)
}
}
if ok {
pendingMergeEntries = append(pendingMergeEntries, entry)
}
}
}
Expand All @@ -1061,7 +1072,7 @@ func (db *DB) reWriteData(pendingMergeEntries []*Entry) error {
return err
}

dataFile, err := NewDataFile(db.getDataPath(db.MaxFileID+1), db.opt.SegmentSize, db.opt.RWMode)
dataFile, err := NewDataFile(db.getDataPath(db.MaxFileID+1), db.opt.SegmentSize, db.opt.RWMode, db.fdm)
if err != nil {
db.isMerging = false
return err
Expand Down
9 changes: 9 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1108,3 +1108,12 @@ func withRAMIdxDB(t *testing.T, fn func(t *testing.T, db *DB)) {

withDBOption(t, opt, fn)
}

func withBPTSpareeIdxDB(t *testing.T, fn func(t *testing.T, db *DB)) {
tmpdir, _ := ioutil.TempDir("", "nutsdb")
opt := DefaultOptions
opt.Dir = tmpdir
opt.EntryIdxMode = HintKeyAndRAMIdxMode

withDBOption(t, opt, fn)
}
38 changes: 38 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package nutsdb

import "errors"

// IsDBClosed is true if the error indicates the db was closed.
func IsDBClosed(err error) bool {
return errors.Is(err, ErrDBClosed)
}

// IsKeyNotFound is true if the error indicates the key is not found.
func IsKeyNotFound(err error) bool {
return errors.Is(err, ErrKeyNotFound)
}

// IsBucketNotFound is true if the error indicates the bucket is not exists.
func IsBucketNotFound(err error) bool {
return errors.Is(err, ErrBucketNotFound)
}

// IsBucketEmpty is true if the bucket is empty.
func IsBucketEmpty(err error) bool {
return errors.Is(err, ErrBucketEmpty)
}

// IsKeyEmpty is true if the key is empty.
func IsKeyEmpty(err error) bool {
return errors.Is(err, ErrKeyEmpty)
}

// IsPrefixScan is true if prefix scanning not found the result.
func IsPrefixScan(err error) bool {
return errors.Is(err, ErrPrefixScan)
}

// IsPrefixSearchScan is true if prefix and search scanning not found the result.
func IsPrefixSearchScan(err error) bool {
return errors.Is(err, ErrPrefixScan)
}
36 changes: 36 additions & 0 deletions errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package nutsdb

import (
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

func TestIsKeyNotFound(t *testing.T) {

ts := []struct {
err error

want bool
}{
{
ErrKeyNotFound,
true,
},
{
errors.Wrap(ErrKeyNotFound, "foobar"),
true,
},
{
errors.New("foo bar"),
false,
},
}

for _, tc := range ts {
got := IsKeyNotFound(tc.err)

assert.Equal(t, tc.want, got)
}
}
Loading

0 comments on commit 9a12b71

Please sign in to comment.