Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: legacy code clean up #33838

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 0 additions & 216 deletions internal/storage/binlog_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,69 +127,6 @@ func (itr *InsertBinlogIterator) isDisposed() bool {
return atomic.LoadInt32(&itr.dispose) == 1
}

/*
type DeltalogIterator struct {
dispose int32
values []*Value
pos int
}
func NewDeltalogIterator(blob *Blob) (*DeltalogIterator, error) {
deltaCodec := NewDeleteCodec()
_, _, serData, err := deltaCodec.Deserialize(blob)
if err != nil {
return nil, err
}
values := make([]*Value, 0, len(serData.Data))
for pkstr, ts := range serData.Data {
pk, err := strconv.ParseInt(pkstr, 10, 64)
if err != nil {
return nil, err
}
values = append(values, &Value{pk, ts, true, nil})
}
sort.Slice(values, func(i, j int) bool { return values[i].id < values[j].id })
return &DeltalogIterator{values: values}, nil
}
// HasNext returns true if the iterator have unread record
func (itr *DeltalogIterator) HasNext() bool {
return !itr.isDisposed() && itr.hasNext()
}
// Next returns the next record
func (itr *DeltalogIterator) Next() (interface{}, error) {
if itr.isDisposed() {
return nil, ErrDisposed
}
if !itr.hasNext() {
return nil, ErrNoMoreRecord
}
tmp := itr.values[itr.pos]
itr.pos++
return tmp, nil
}
// Dispose disposes the iterator
func (itr *DeltalogIterator) Dispose() {
atomic.CompareAndSwapInt32(&itr.dispose, 0, 1)
}
func (itr *DeltalogIterator) hasNext() bool {
return itr.pos < len(itr.values)
}
func (itr *DeltalogIterator) isDisposed() bool {
return atomic.LoadInt32(&itr.dispose) == 1
}
*/

// MergeIterator merge iterators.
type MergeIterator struct {
disposed int32
Expand Down Expand Up @@ -278,156 +215,3 @@ func (itr *MergeIterator) hasNext() bool {
itr.nextRecord = minRecord
return true
}

/*
func NewInsertlogMergeIterator(blobs [][]*Blob) (*MergeIterator, error) {
iterators := make([]Iterator, 0, len(blobs))
for _, fieldBlobs := range blobs {
itr, err := NewInsertBinlogIterator(fieldBlobs)
if err != nil {
return nil, err
}
iterators = append(iterators, itr)
}
return NewMergeIterator(iterators), nil
}
func NewDeltalogMergeIterator(blobs []*Blob) (*MergeIterator, error) {
iterators := make([]Iterator, 0, len(blobs))
for _, blob := range blobs {
itr, err := NewDeltalogIterator(blob)
if err != nil {
return nil, err
}
iterators = append(iterators, itr)
}
return NewMergeIterator(iterators), nil
}
type MergeSingleSegmentIterator struct {
disposed int32
insertItr Iterator
deltaItr Iterator
timetravel int64
nextRecord *Value
insertTmpRecord *Value
deltaTmpRecord *Value
}
func NewMergeSingleSegmentIterator(insertBlobs [][]*Blob, deltaBlobs []*Blob, timetravel int64) (*MergeSingleSegmentIterator, error) {
insertMergeItr, err := NewInsertlogMergeIterator(insertBlobs)
if err != nil {
return nil, err
}
deltaMergeItr, err := NewDeltalogMergeIterator(deltaBlobs)
if err != nil {
return nil, err
}
return &MergeSingleSegmentIterator{
insertItr: insertMergeItr,
deltaItr: deltaMergeItr,
timetravel: timetravel,
}, nil
}
// HasNext returns true if the iterator have unread record
func (itr *MergeSingleSegmentIterator) HasNext() bool {
return !itr.isDisposed() && itr.hasNext()
}
// Next returns the next record
func (itr *MergeSingleSegmentIterator) Next() (interface{}, error) {
if itr.isDisposed() {
return nil, ErrDisposed
}
if !itr.hasNext() {
return nil, ErrNoMoreRecord
}
tmp := itr.nextRecord
itr.nextRecord = nil
return tmp, nil
}
// Dispose disposes the iterator
func (itr *MergeSingleSegmentIterator) Dispose() {
if itr.isDisposed() {
return
}
if itr.insertItr != nil {
itr.insertItr.Dispose()
}
if itr.deltaItr != nil {
itr.deltaItr.Dispose()
}
atomic.CompareAndSwapInt32(&itr.disposed, 0, 1)
}
func (itr *MergeSingleSegmentIterator) isDisposed() bool {
return atomic.LoadInt32(&itr.disposed) == 1
}
func (itr *MergeSingleSegmentIterator) hasNext() bool {
if itr.nextRecord != nil {
return true
}
for {
if itr.insertTmpRecord == nil && itr.insertItr.HasNext() {
r, _ := itr.insertItr.Next()
itr.insertTmpRecord = r.(*Value)
}
if itr.deltaTmpRecord == nil && itr.deltaItr.HasNext() {
r, _ := itr.deltaItr.Next()
itr.deltaTmpRecord = r.(*Value)
}
if itr.insertTmpRecord == nil && itr.deltaTmpRecord == nil {
return false
} else if itr.insertTmpRecord == nil {
itr.nextRecord = itr.deltaTmpRecord
itr.deltaTmpRecord = nil
return true
} else if itr.deltaTmpRecord == nil {
itr.nextRecord = itr.insertTmpRecord
itr.insertTmpRecord = nil
return true
} else {
// merge records
if itr.insertTmpRecord.timestamp >= itr.timetravel {
itr.nextRecord = itr.insertTmpRecord
itr.insertTmpRecord = nil
return true
}
if itr.deltaTmpRecord.timestamp >= itr.timetravel {
itr.nextRecord = itr.deltaTmpRecord
itr.deltaTmpRecord = nil
return true
}
if itr.insertTmpRecord.id < itr.deltaTmpRecord.id {
itr.nextRecord = itr.insertTmpRecord
itr.insertTmpRecord = nil
return true
} else if itr.insertTmpRecord.id > itr.deltaTmpRecord.id {
itr.deltaTmpRecord = nil
continue
} else if itr.insertTmpRecord.id == itr.deltaTmpRecord.id {
if itr.insertTmpRecord.timestamp <= itr.deltaTmpRecord.timestamp {
itr.insertTmpRecord = nil
continue
} else {
itr.deltaTmpRecord = nil
continue
}
}
}
}
}
*/
25 changes: 0 additions & 25 deletions internal/storage/data_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,31 +849,6 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
return collectionID, partitionID, segmentID, nil
}

// func deserializeEntity[T any, U any](
// eventReader *EventReader,
// binlogReader *BinlogReader,
// insertData *InsertData,
// getPayloadFunc func() (U, error),
// fillDataFunc func() FieldData,
// ) error {
// fieldID := binlogReader.FieldID
// stringPayload, err := getPayloadFunc()
// if err != nil {
// eventReader.Close()
// binlogReader.Close()
// return err
// }
//
// if insertData.Data[fieldID] == nil {
// insertData.Data[fieldID] = fillDataFunc()
// }
// stringFieldData := insertData.Data[fieldID].(*T)
//
// stringFieldData.Data = append(stringFieldData.Data, stringPayload...)
// totalLength += len(stringPayload)
// insertData.Data[fieldID] = stringFieldData
// }

// Deserialize transfer blob back to insert data.
// From schema, it get all fields.
// For each field, it will create a binlog reader, and read all event to the buffer.
Expand Down
38 changes: 0 additions & 38 deletions internal/storage/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,44 +982,6 @@ func binaryWrite(endian binary.ByteOrder, data interface{}) ([]byte, error) {
return buf.Bytes(), nil
}

// FieldDataToBytes encode field data to byte slice.
// For some fixed-length data, such as int32, int64, float vector, use binary.Write directly.
// For binary vector, return it directly.
// For bool data, first transfer to schemapb.BoolArray and then marshal it. (TODO: handle bool like other scalar data.)
// For variable-length data, such as string, first transfer to schemapb.StringArray and then marshal it.
// TODO: find a proper way to store variable-length data. Or we should unify to use protobuf?
func FieldDataToBytes(endian binary.ByteOrder, fieldData FieldData) ([]byte, error) {
switch field := fieldData.(type) {
case *BoolFieldData:
// return binaryWrite(endian, field.Data)
return boolFieldDataToPbBytes(field)
case *StringFieldData:
return stringFieldDataToPbBytes(field)
case *ArrayFieldData:
return arrayFieldDataToPbBytes(field)
case *JSONFieldData:
return jsonFieldDataToPbBytes(field)
case *BinaryVectorFieldData:
return field.Data, nil
case *FloatVectorFieldData:
return binaryWrite(endian, field.Data)
case *Int8FieldData:
return binaryWrite(endian, field.Data)
case *Int16FieldData:
return binaryWrite(endian, field.Data)
case *Int32FieldData:
return binaryWrite(endian, field.Data)
case *Int64FieldData:
return binaryWrite(endian, field.Data)
case *FloatFieldData:
return binaryWrite(endian, field.Data)
case *DoubleFieldData:
return binaryWrite(endian, field.Data)
default:
return nil, fmt.Errorf("unsupported field data: %s", field)
}
}

func TransferInsertDataToInsertRecord(insertData *InsertData) (*segcorepb.InsertRecord, error) {
insertRecord := &segcorepb.InsertRecord{}
for fieldID, rawData := range insertData.Data {
Expand Down
88 changes: 0 additions & 88 deletions internal/storage/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1551,94 +1551,6 @@ func binaryRead(endian binary.ByteOrder, bs []byte, receiver interface{}) error
return binary.Read(reader, endian, receiver)
}

func TestFieldDataToBytes(t *testing.T) {
// TODO: test big endian.
endian := common.Endian

var bs []byte
var err error
var receiver interface{}

f1 := &BoolFieldData{Data: []bool{true, false}}
bs, err = FieldDataToBytes(endian, f1)
assert.NoError(t, err)
var barr schemapb.BoolArray
err = proto.Unmarshal(bs, &barr)
assert.NoError(t, err)
assert.ElementsMatch(t, f1.Data, barr.Data)

f2 := &StringFieldData{Data: []string{"true", "false"}}
bs, err = FieldDataToBytes(endian, f2)
assert.NoError(t, err)
var sarr schemapb.StringArray
err = proto.Unmarshal(bs, &sarr)
assert.NoError(t, err)
assert.ElementsMatch(t, f2.Data, sarr.Data)

f3 := &Int8FieldData{Data: []int8{0, 1}}
bs, err = FieldDataToBytes(endian, f3)
assert.NoError(t, err)
receiver = make([]int8, 2)
err = binaryRead(endian, bs, receiver)
assert.NoError(t, err)
assert.ElementsMatch(t, f3.Data, receiver)

f4 := &Int16FieldData{Data: []int16{0, 1}}
bs, err = FieldDataToBytes(endian, f4)
assert.NoError(t, err)
receiver = make([]int16, 2)
err = binaryRead(endian, bs, receiver)
assert.NoError(t, err)
assert.ElementsMatch(t, f4.Data, receiver)

f5 := &Int32FieldData{Data: []int32{0, 1}}
bs, err = FieldDataToBytes(endian, f5)
assert.NoError(t, err)
receiver = make([]int32, 2)
err = binaryRead(endian, bs, receiver)
assert.NoError(t, err)
assert.ElementsMatch(t, f5.Data, receiver)

f6 := &Int64FieldData{Data: []int64{0, 1}}
bs, err = FieldDataToBytes(endian, f6)
assert.NoError(t, err)
receiver = make([]int64, 2)
err = binaryRead(endian, bs, receiver)
assert.NoError(t, err)
assert.ElementsMatch(t, f6.Data, receiver)

// in fact, hard to compare float point value.

f7 := &FloatFieldData{Data: []float32{0, 1}}
bs, err = FieldDataToBytes(endian, f7)
assert.NoError(t, err)
receiver = make([]float32, 2)
err = binaryRead(endian, bs, receiver)
assert.NoError(t, err)
assert.ElementsMatch(t, f7.Data, receiver)

f8 := &DoubleFieldData{Data: []float64{0, 1}}
bs, err = FieldDataToBytes(endian, f8)
assert.NoError(t, err)
receiver = make([]float64, 2)
err = binaryRead(endian, bs, receiver)
assert.NoError(t, err)
assert.ElementsMatch(t, f8.Data, receiver)

f9 := &BinaryVectorFieldData{Data: []byte{0, 1, 0}}
bs, err = FieldDataToBytes(endian, f9)
assert.NoError(t, err)
assert.ElementsMatch(t, f9.Data, bs)

f10 := &FloatVectorFieldData{Data: []float32{0, 1}}
bs, err = FieldDataToBytes(endian, f10)
assert.NoError(t, err)
receiver = make([]float32, 2)
err = binaryRead(endian, bs, receiver)
assert.NoError(t, err)
assert.ElementsMatch(t, f10.Data, receiver)
}

func TestJson(t *testing.T) {
extras := make(map[string]string)
extras["IndexBuildID"] = "10"
Expand Down
Loading