Skip to content

Commit

Permalink
Добавил тесты на indexes.BTreePage #76
Browse files Browse the repository at this point in the history
  • Loading branch information
unhandled-exception committed Jul 18, 2024
1 parent 04ce6a3 commit e5705a9
Show file tree
Hide file tree
Showing 2 changed files with 1,031 additions and 91 deletions.
229 changes: 138 additions & 91 deletions internal/pkg/indexes/btree_page.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,51 @@ type BTreePage struct {
recordsCountOffset uint32
flagOffset uint32
dataOffset uint32
}

const (
idFieldName = "id"
blockFieldName = "block"
datavalFieldName = "dataval"
)

func NewBTreeDirPageLayout(datavalFieldType records.FieldType, length int64) records.Layout {
schema := records.NewSchema()
schema.AddInt64Field(blockFieldName)

//nolint:exhaustive
switch datavalFieldType {
case records.Int64Field:
schema.AddInt64Field(datavalFieldName)
case records.Int8Field:
schema.AddInt8Field(datavalFieldName)
case records.StringField:
schema.AddStringField(datavalFieldName, length)
}

layout := records.NewLayout(schema)

return layout
}

func NewBTreeLeafPageLayout(datavalFieldType records.FieldType, length int64) records.Layout {
schema := records.NewSchema()
schema.AddInt64Field(idFieldName)
schema.AddInt64Field(blockFieldName)

//nolint:exhaustive
switch datavalFieldType {
case records.Int64Field:
schema.AddInt64Field(datavalFieldName)
case records.Int8Field:
schema.AddInt8Field(datavalFieldName)
case records.StringField:
schema.AddStringField(datavalFieldName, length)
}

datavalField string
blockField string
idField string
layout := records.NewLayout(schema)

return layout
}

func NewBTreePage(trx scan.TRXInt, block types.Block, layout records.Layout) (*BTreePage, error) {
Expand All @@ -29,10 +70,6 @@ func NewBTreePage(trx scan.TRXInt, block types.Block, layout records.Layout) (*B
recordsCountOffset: types.Int64Size,
flagOffset: 0,
dataOffset: 2 * types.Int64Size, //nolint:gomnd

idField: "id",
blockField: "block",
datavalField: "dataval",
}

if err := trx.Pin(block); err != nil {
Expand Down Expand Up @@ -66,6 +103,8 @@ func (p *BTreePage) FindSlotBefore(searchKey scan.Constant) (int32, error) {

if value.CompareTo(searchKey) == scan.CompLess {
slot++
} else {
break
}
}

Expand Down Expand Up @@ -105,46 +144,6 @@ func (p *BTreePage) Split(splitPos int32, flag int64) (types.Block, error) {
return block, nil
}

func (p *BTreePage) transferRecords(splitPos int32, dest *BTreePage) error {
var destSlot int32 = 0

schema := p.layout.Schema

for {
records, err := p.GetRecords()
if err != nil {
return err
}

if int32(records) < splitPos {
break
}

if err = dest.Insert(destSlot); err != nil {
return err
}

for _, fieldName := range schema.Fields() {
value, err1 := p.getVal(splitPos, fieldName)
if err1 != nil {
return err1
}

if err1 = dest.SetVal(destSlot, fieldName, value); err1 != nil {
return err
}
}

if err = p.Delete(splitPos); err != nil {
return err
}

destSlot++
}

return nil
}

func (p *BTreePage) GetFlag() (int64, error) {
return p.trx.GetInt64(*p.curBlock, p.flagOffset)
}
Expand Down Expand Up @@ -193,14 +192,20 @@ func (p *BTreePage) FormatBlock(block types.Block, flag int64) error {

func (p *BTreePage) makeDefaultBlockRecord(block types.Block, pos uint32) error {
for _, fieldName := range p.layout.Schema.Fields() {
var err error

//nolint:exhaustive
switch p.layout.Schema.Type(fieldName) {
case records.Int64Field:
return p.trx.SetInt64(block, pos+p.layout.Offset(fieldName), 0, false)
err = p.trx.SetInt64(block, pos+p.layout.Offset(fieldName), 0, false)
case records.Int8Field:
return p.trx.SetInt8(block, pos+p.layout.Offset(fieldName), 0, false)
err = p.trx.SetInt8(block, pos+p.layout.Offset(fieldName), 0, false)
case records.StringField:
return p.trx.SetString(block, pos+p.layout.Offset(fieldName), "", false)
err = p.trx.SetString(block, pos+p.layout.Offset(fieldName), "", false)
}

if err != nil {
return err
}
}

Expand All @@ -211,37 +216,35 @@ func (p *BTreePage) GetRecords() (int64, error) {
return p.trx.GetInt64(*p.curBlock, p.recordsCountOffset)
}

func (p *BTreePage) setRecords(records int64) error {
return p.trx.SetInt64(*p.curBlock, p.recordsCountOffset, records, true)
}

func (p *BTreePage) InsertDir(slot int32, value scan.Constant, blockID types.BlockID) error {
if err := p.Insert(slot); err != nil {
return err
}

if err := p.SetVal(slot, p.datavalField, value); err != nil {
if err := p.SetVal(slot, datavalFieldName, value); err != nil {
return err
}

if err := p.SetInt64(slot, p.blockField, int64(blockID)); err != nil {
if err := p.setInt64(slot, blockFieldName, int64(blockID)); err != nil {
return err
}

return nil
}

func (p *BTreePage) GetChildNum(slot int32) (int64, error) {
return p.getInt64(slot, p.blockField)
func (p *BTreePage) GetChildNum(slot int32) (types.BlockID, error) {
block, err := p.getInt64(slot, blockFieldName)

return types.BlockID(block), err
}

func (p *BTreePage) GetDataRID(slot int32) (types.RID, error) {
blockID, err := p.getInt64(slot, p.blockField)
blockID, err := p.getInt64(slot, blockFieldName)
if err != nil {
return types.RID{}, err
}

id, err := p.getInt64(slot, p.idField)
id, err := p.getInt64(slot, idFieldName)
if err != nil {
return types.RID{}, err
}
Expand All @@ -257,15 +260,15 @@ func (p *BTreePage) InsertLeaf(slot int32, value scan.Constant, rid types.RID) e
return err
}

if err := p.SetVal(slot, p.datavalField, value); err != nil {
if err := p.SetVal(slot, datavalFieldName, value); err != nil {
return err
}

if err := p.SetInt64(slot, p.blockField, int64(rid.BlockNumber)); err != nil {
if err := p.setInt64(slot, blockFieldName, int64(rid.BlockNumber)); err != nil {
return err
}

if err := p.SetInt64(slot, p.idField, int64(rid.Slot)); err != nil {
if err := p.setInt64(slot, idFieldName, int64(rid.Slot)); err != nil {
return err
}

Expand Down Expand Up @@ -306,23 +309,32 @@ func (p *BTreePage) Insert(slot int32) error {
return p.setRecords(records + 1)
}

func (p *BTreePage) copyRecord(from, to int32) error {
for _, fieldName := range p.layout.Schema.Fields() {
value, err := p.getVal(from, fieldName)
if err != nil {
return err
}
func (p *BTreePage) GetVal(slot int32) (scan.Constant, error) {
return p.getVal(slot, datavalFieldName)
}

if err = p.SetVal(to, fieldName, value); err != nil {
return err
func (p *BTreePage) SetVal(slot int32, field string, value scan.Constant) error {
//nolint:exhaustive
switch p.layout.Schema.Type(field) {
case records.Int64Field:
if val, ok := value.Value().(int64); ok {
return p.setInt64(slot, field, val)
}
case records.Int8Field:
if val, ok := value.Value().(int8); ok {
return p.setInt8(slot, field, val)
}
case records.StringField:
if val, ok := value.Value().(string); ok {
return p.setString(slot, field, val)
}
}

return nil
return ErrUnknownFieldType
}

func (p *BTreePage) GetVal(slot int32) (scan.Constant, error) {
return p.getVal(slot, p.datavalField)
func (p *BTreePage) setRecords(records int64) error {
return p.trx.SetInt64(*p.curBlock, p.recordsCountOffset, records, true)
}

func (p *BTreePage) getInt64(slot int32, field string) (int64, error) {
Expand Down Expand Up @@ -366,36 +378,31 @@ func (p *BTreePage) getVal(slot int32, field string) (scan.Constant, error) {
}
}

func (p *BTreePage) SetInt64(slot int32, field string, value int64) error {
func (p *BTreePage) setInt64(slot int32, field string, value int64) error {
return p.trx.SetInt64(*p.curBlock, uint32(p.fieldPos(slot, field)), value, true)
}

func (p *BTreePage) SetInt8(slot int32, field string, value int8) error {
func (p *BTreePage) setInt8(slot int32, field string, value int8) error {
return p.trx.SetInt8(*p.curBlock, uint32(p.fieldPos(slot, field)), value, true)
}

func (p *BTreePage) SetString(slot int32, field string, value string) error {
func (p *BTreePage) setString(slot int32, field string, value string) error {
return p.trx.SetString(*p.curBlock, uint32(p.fieldPos(slot, field)), value, true)
}

func (p *BTreePage) SetVal(slot int32, field string, value scan.Constant) error {
//nolint:exhaustive
switch p.layout.Schema.Type(field) {
case records.Int64Field:
if val, ok := value.Value().(int64); ok {
return p.SetInt64(slot, field, val)
}
case records.Int8Field:
if val, ok := value.Value().(int8); ok {
return p.SetInt8(slot, field, val)
func (p *BTreePage) copyRecord(from, to int32) error {
for _, fieldName := range p.layout.Schema.Fields() {
value, err := p.getVal(from, fieldName)
if err != nil {
return err
}
case records.StringField:
if val, ok := value.Value().(string); ok {
return p.SetString(slot, field, val)

if err = p.SetVal(to, fieldName, value); err != nil {
return err
}
}

return ErrUnknownFieldType
return nil
}

func (p *BTreePage) fieldPos(slot int32, field string) int32 {
Expand All @@ -405,3 +412,43 @@ func (p *BTreePage) fieldPos(slot int32, field string) int32 {
func (p *BTreePage) slotPos(slot int32) int32 {
return int32(p.dataOffset) + (slot * int32(p.layout.SlotSize))
}

func (p *BTreePage) transferRecords(splitPos int32, dest *BTreePage) error {
var destSlot int32 = 0

schema := p.layout.Schema

for {
records, err := p.GetRecords()
if err != nil {
return err
}

if int32(records) <= splitPos {
break
}

if err = dest.Insert(destSlot); err != nil {
return err
}

for _, fieldName := range schema.Fields() {
value, err1 := p.getVal(splitPos, fieldName)
if err1 != nil {
return err1
}

if err1 = dest.SetVal(destSlot, fieldName, value); err1 != nil {
return err
}
}

if err = p.Delete(splitPos); err != nil {
return err
}

destSlot++
}

return nil
}
Loading

0 comments on commit e5705a9

Please sign in to comment.