Skip to content

Commit

Permalink
Добавил indexes.BTreeLeaf и простой тест для итератору по листу без п…
Browse files Browse the repository at this point in the history
…ереполнения #76
  • Loading branch information
unhandled-exception committed Apr 26, 2023
1 parent 6fdc13f commit d3aec78
Show file tree
Hide file tree
Showing 5 changed files with 499 additions and 114 deletions.
244 changes: 244 additions & 0 deletions internal/pkg/indexes/btree_leaf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
package indexes

import (
"github.com/unhandled-exception/sophiadb/internal/pkg/records"
"github.com/unhandled-exception/sophiadb/internal/pkg/scan"
"github.com/unhandled-exception/sophiadb/internal/pkg/types"
)

type BTreeLeaf struct {
trx scan.TRXInt
layout records.Layout
searchKey scan.Constant
contents *BTreePage
currentSlot types.SlotID
filename string
}

func NewBTreeLeaf(trx scan.TRXInt, block types.Block, layout records.Layout, searchKey scan.Constant) (*BTreeLeaf, error) {
contents, err := NewBTreePage(trx, block, layout)
if err != nil {
return nil, err
}

currentSlot, err := contents.FindSlotBefore(searchKey)
if err != nil {
return nil, err
}

leaf := &BTreeLeaf{
trx: trx,
layout: layout,
searchKey: searchKey,
filename: block.Filename,
contents: contents,
currentSlot: currentSlot,
}

return leaf, nil
}

func (l *BTreeLeaf) Close() {
l.contents.Close()
}

func (l *BTreeLeaf) RID() (types.RID, error) {
return l.contents.GetDataRID(l.currentSlot)
}

func (l *BTreeLeaf) Next() (bool, error) {
l.currentSlot++

records, err := l.contents.GetRecords()
if err != nil {
return false, err
}

if records > int64(l.currentSlot) {
dataval, err1 := l.contents.GetVal(l.currentSlot)
if err1 != nil {
return false, err1
}

if dataval.CompareTo(l.searchKey) == scan.CompEqual {
return true, nil
}
}

return l.tryOverflow()
}

func (l *BTreeLeaf) Delete(dataRID types.RID) (bool, error) {
for {
if ok, err := l.Next(); !ok || err != nil {
return false, err
}

dr, err := l.RID()
if err != nil {
return false, err
}

if dr.Equals(dataRID) {
if err1 := l.contents.Delete(l.currentSlot); err1 != nil {
return false, err
}

return true, nil
}
}
}

func (l *BTreeLeaf) Insert(dataRID types.RID) (*BTreeDirEntry, error) {
flag, err := l.contents.GetFlag()
if err != nil {
return nil, err
}

firstKey, err := l.contents.GetVal(0)
if err != nil {
return nil, err
}

// Если блок ссылается на блок переполнения и ключи меньш первого в блоке,
// то текущий блок целиком отщепляем в новый и возвращаем каталожную запись для нового блока
// dataRID вставляем в текущую страницу
if flag >= 0 && firstKey.CompareTo(l.searchKey) == scan.CompGreat {
newBlock, err1 := l.contents.Split(0, flag)
if err1 != nil {
return nil, err1
}

if err1 = l.contents.SetFlag(BTreeNewFlag); err1 != nil {
return nil, err1
}

l.currentSlot = 0

if err1 = l.contents.InsertLeaf(l.currentSlot, l.searchKey, dataRID); err1 != nil {
return nil, err1
}

return &BTreeDirEntry{
BlockNumber: newBlock.Number,
Dataval: l.searchKey,
}, nil
}

l.currentSlot++

if err = l.contents.InsertLeaf(l.currentSlot, l.searchKey, dataRID); err != nil {
return nil, err
}

if ok, err1 := l.contents.IsFull(); !ok || err1 != nil {
return nil, err1
}

records, err := l.contents.GetRecords()
if err != nil {
return nil, err
}

lastKey, err := l.contents.GetVal(types.SlotID(records - 1))
if err != nil {
return nil, err
}

// Если блок содержит одно значение во всех слотах, то создать блок переполнения и перенести в него все записи кроме первой
if lastKey.CompareTo(firstKey) == scan.CompEqual {
newBlock, err1 := l.contents.Split(1, flag)
if err1 != nil {
return nil, err1
}

if err1 = l.contents.SetFlag(int64(newBlock.Number)); err1 != nil {
return nil, err1
}

return nil, nil //nolint:nilnil
}

// Расщепляем блок
splitPos := types.SlotID(records / 2) //nolint:gomnd

splitKey, err := l.contents.GetVal(splitPos)
if err != nil {
return nil, err
}

// Если средний ключи равен первому, то двигаемся вправо до нового ключа
if splitKey.CompareTo(firstKey) == scan.CompEqual { //nolint:nestif
for {
splitPos++

newKey, err1 := l.contents.GetVal(splitPos)
if err1 != nil {
return nil, err1
}

if newKey.CompareTo(splitKey) != scan.CompEqual {
splitKey = newKey

break
}
}
} else {
// Иначе двигаемся влево до первой записи с текущим ключем
for {
newKey, err1 := l.contents.GetVal(splitPos - 1)
if err1 != nil {
return nil, err1
}

if newKey.CompareTo(splitKey) != scan.CompEqual {
break
}

splitPos--
}
}

newBlock, err := l.contents.Split(splitPos, BTreeNewFlag)
if err != nil {
return nil, err
}

return &BTreeDirEntry{
BlockNumber: newBlock.Number,
Dataval: splitKey,
}, nil
}

func (l *BTreeLeaf) tryOverflow() (bool, error) {
firstKey, err := l.contents.GetVal(l.currentSlot)
if err != nil {
return false, err
}

flag, err := l.contents.GetFlag()
if err != nil {
return false, err
}

if l.searchKey.CompareTo(firstKey) != scan.CompEqual || flag < 0 {
return false, nil
}

l.contents.Close()

newBlock := types.Block{
Filename: l.filename,
Number: types.BlockID(flag),
}

newContents, err := NewBTreePage(l.trx, newBlock, l.layout)
if err != nil {
return false, err
}

l.contents = newContents
l.currentSlot = 0

return true, nil
}
125 changes: 125 additions & 0 deletions internal/pkg/indexes/btree_leaf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package indexes_test

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/unhandled-exception/sophiadb/internal/pkg/indexes"
"github.com/unhandled-exception/sophiadb/internal/pkg/records"
"github.com/unhandled-exception/sophiadb/internal/pkg/scan"
"github.com/unhandled-exception/sophiadb/internal/pkg/storage"
"github.com/unhandled-exception/sophiadb/internal/pkg/tx/transaction"
"github.com/unhandled-exception/sophiadb/internal/pkg/types"
)

type BTreeLeafTestSuite struct {
Suite
}

func TestBTreeLeaftestSuite(t *testing.T) {
suite.Run(t, new(BTreeLeafTestSuite))
}

func (ts *BTreeLeafTestSuite) newSUT(layout records.Layout, searchkey scan.Constant, data []btreeLeafTestsPageData) (*indexes.BTreeLeaf, *indexes.BTreePage, *transaction.Transaction, *storage.Manager) {
t := ts.T()

testFile := "btleaf_test.dat"

trxMan, fm := ts.newTRXManager(defaultLockTimeout, t.TempDir())

trx, err := trxMan.Transaction()
require.NoError(t, err)

block, err := fm.Append(testFile)
require.NoError(t, err)

btp, err := indexes.NewBTreePage(trx, block, layout)
require.NoError(t, err)

require.NoError(t, btp.FormatBlock(block, 0))

ts.fillTestPage(btp, data)

btl, err := indexes.NewBTreeLeaf(trx, block, layout, searchkey)
require.NoError(t, err)

return btl, btp, trx, fm
}

type btreeLeafTestsPageData struct {
SearchKey scan.Constant
Count int
FirstRID types.RID
}

func (ts *BTreeLeafTestSuite) fillTestPage(page *indexes.BTreePage, data []btreeLeafTestsPageData) {
t := ts.T()
total := 0

for _, d := range data {
for i := 0; i < d.Count; i++ {
require.NoError(t, page.InsertLeaf(
types.SlotID(total+i),
d.SearchKey,
types.RID{
BlockNumber: d.FirstRID.BlockNumber + types.BlockID(i),
Slot: d.FirstRID.Slot + types.SlotID(i),
},
))
}

total += d.Count
}
}

func (ts *BTreeLeafTestSuite) TestNextWithouOverflow() {
t := ts.T()

searchVal := int64(33)
searchKey := scan.NewInt64Constant(searchVal)

firstRID := types.RID{
BlockNumber: 145,
Slot: 0,
}

searchKeyCount := 15

pageData := []btreeLeafTestsPageData{
{
SearchKey: scan.NewInt64Constant(searchVal - 13),
Count: 25,
FirstRID: types.RID{
BlockNumber: 235,
Slot: 350,
},
},
{
SearchKey: searchKey,
Count: searchKeyCount,
FirstRID: firstRID,
},
{
SearchKey: scan.NewInt64Constant(searchVal + 13),
Count: 27,
FirstRID: types.RID{
BlockNumber: 335,
Slot: 150,
},
},
}

sut, _, _, fm := ts.newSUT(indexes.NewBTreeLeafPageLayout(records.Int64Field, 0), searchKey, pageData)
defer sut.Close()
defer func() {
require.NoError(t, fm.Close())
}()

for i := 0; i < searchKeyCount; i++ {
ok, err := sut.Next()
assert.NoError(t, err)
assert.Truef(t, ok, "i = %d", i)
}
}
Loading

0 comments on commit d3aec78

Please sign in to comment.