From d1643330995fc58f8bd92f391fe39d603595a9d1 Mon Sep 17 00:00:00 2001 From: shaoting-huang Date: Thu, 13 Jun 2024 12:17:56 +0800 Subject: [PATCH] legacy code clean up Signed-off-by: shaoting-huang --- internal/storage/binlog_iterator.go | 216 ---------------------------- internal/storage/data_codec.go | 25 ---- internal/storage/unsafe.go | 61 -------- internal/storage/unsafe_test.go | 49 ------- internal/storage/utils.go | 38 ----- internal/storage/utils_test.go | 88 ------------ 6 files changed, 477 deletions(-) delete mode 100644 internal/storage/unsafe.go delete mode 100644 internal/storage/unsafe_test.go diff --git a/internal/storage/binlog_iterator.go b/internal/storage/binlog_iterator.go index f620483982f61..2eb291a1496ab 100644 --- a/internal/storage/binlog_iterator.go +++ b/internal/storage/binlog_iterator.go @@ -127,69 +127,6 @@ func (itr *InsertBinlogIterator) isDisposed() bool { return atomic.LoadInt32(&itr.dispose) == 1 } -/* -type DeltalogIterator struct { - dispose int32 - values []*Value - pos int -} - -func NewDeltalogIterator(blob *Blob) (*DeltalogIterator, error) { - deltaCodec := NewDeleteCodec() - _, _, serData, err := deltaCodec.Deserialize(blob) - if err != nil { - return nil, err - } - - values := make([]*Value, 0, len(serData.Data)) - for pkstr, ts := range serData.Data { - pk, err := strconv.ParseInt(pkstr, 10, 64) - if err != nil { - return nil, err - } - values = append(values, &Value{pk, ts, true, nil}) - } - - sort.Slice(values, func(i, j int) bool { return values[i].id < values[j].id }) - - return &DeltalogIterator{values: values}, nil -} - -// HasNext returns true if the iterator have unread record -func (itr *DeltalogIterator) HasNext() bool { - return !itr.isDisposed() && itr.hasNext() -} - -// Next returns the next record -func (itr *DeltalogIterator) Next() (interface{}, error) { - if itr.isDisposed() { - return nil, ErrDisposed - } - - if !itr.hasNext() { - return nil, ErrNoMoreRecord - } - - tmp := itr.values[itr.pos] - itr.pos++ - return tmp, nil -} - -// Dispose disposes the iterator -func (itr *DeltalogIterator) Dispose() { - atomic.CompareAndSwapInt32(&itr.dispose, 0, 1) -} - -func (itr *DeltalogIterator) hasNext() bool { - return itr.pos < len(itr.values) -} - -func (itr *DeltalogIterator) isDisposed() bool { - return atomic.LoadInt32(&itr.dispose) == 1 -} - -*/ - // MergeIterator merge iterators. type MergeIterator struct { disposed int32 @@ -278,156 +215,3 @@ func (itr *MergeIterator) hasNext() bool { itr.nextRecord = minRecord return true } - -/* -func NewInsertlogMergeIterator(blobs [][]*Blob) (*MergeIterator, error) { - iterators := make([]Iterator, 0, len(blobs)) - for _, fieldBlobs := range blobs { - itr, err := NewInsertBinlogIterator(fieldBlobs) - if err != nil { - return nil, err - } - iterators = append(iterators, itr) - } - - return NewMergeIterator(iterators), nil -} - -func NewDeltalogMergeIterator(blobs []*Blob) (*MergeIterator, error) { - iterators := make([]Iterator, 0, len(blobs)) - for _, blob := range blobs { - itr, err := NewDeltalogIterator(blob) - if err != nil { - return nil, err - } - iterators = append(iterators, itr) - } - return NewMergeIterator(iterators), nil -} - -type MergeSingleSegmentIterator struct { - disposed int32 - insertItr Iterator - deltaItr Iterator - timetravel int64 - nextRecord *Value - insertTmpRecord *Value - deltaTmpRecord *Value -} - -func NewMergeSingleSegmentIterator(insertBlobs [][]*Blob, deltaBlobs []*Blob, timetravel int64) (*MergeSingleSegmentIterator, error) { - insertMergeItr, err := NewInsertlogMergeIterator(insertBlobs) - if err != nil { - return nil, err - } - - deltaMergeItr, err := NewDeltalogMergeIterator(deltaBlobs) - if err != nil { - return nil, err - } - return &MergeSingleSegmentIterator{ - insertItr: insertMergeItr, - deltaItr: deltaMergeItr, - timetravel: timetravel, - }, nil -} - -// HasNext returns true if the iterator have unread record -func (itr *MergeSingleSegmentIterator) HasNext() bool { - return !itr.isDisposed() && itr.hasNext() -} - -// Next returns the next record -func (itr *MergeSingleSegmentIterator) Next() (interface{}, error) { - if itr.isDisposed() { - return nil, ErrDisposed - } - if !itr.hasNext() { - return nil, ErrNoMoreRecord - } - - tmp := itr.nextRecord - itr.nextRecord = nil - return tmp, nil -} - -// Dispose disposes the iterator -func (itr *MergeSingleSegmentIterator) Dispose() { - if itr.isDisposed() { - return - } - - if itr.insertItr != nil { - itr.insertItr.Dispose() - } - if itr.deltaItr != nil { - itr.deltaItr.Dispose() - } - - atomic.CompareAndSwapInt32(&itr.disposed, 0, 1) -} - -func (itr *MergeSingleSegmentIterator) isDisposed() bool { - return atomic.LoadInt32(&itr.disposed) == 1 -} - -func (itr *MergeSingleSegmentIterator) hasNext() bool { - if itr.nextRecord != nil { - return true - } - - for { - if itr.insertTmpRecord == nil && itr.insertItr.HasNext() { - r, _ := itr.insertItr.Next() - itr.insertTmpRecord = r.(*Value) - } - - if itr.deltaTmpRecord == nil && itr.deltaItr.HasNext() { - r, _ := itr.deltaItr.Next() - itr.deltaTmpRecord = r.(*Value) - } - - if itr.insertTmpRecord == nil && itr.deltaTmpRecord == nil { - return false - } else if itr.insertTmpRecord == nil { - itr.nextRecord = itr.deltaTmpRecord - itr.deltaTmpRecord = nil - return true - } else if itr.deltaTmpRecord == nil { - itr.nextRecord = itr.insertTmpRecord - itr.insertTmpRecord = nil - return true - } else { - // merge records - if itr.insertTmpRecord.timestamp >= itr.timetravel { - itr.nextRecord = itr.insertTmpRecord - itr.insertTmpRecord = nil - return true - } - if itr.deltaTmpRecord.timestamp >= itr.timetravel { - itr.nextRecord = itr.deltaTmpRecord - itr.deltaTmpRecord = nil - return true - } - - if itr.insertTmpRecord.id < itr.deltaTmpRecord.id { - itr.nextRecord = itr.insertTmpRecord - itr.insertTmpRecord = nil - return true - } else if itr.insertTmpRecord.id > itr.deltaTmpRecord.id { - itr.deltaTmpRecord = nil - continue - } else if itr.insertTmpRecord.id == itr.deltaTmpRecord.id { - if itr.insertTmpRecord.timestamp <= itr.deltaTmpRecord.timestamp { - itr.insertTmpRecord = nil - continue - } else { - itr.deltaTmpRecord = nil - continue - } - } - } - - } -} -*/ diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 757d683b9356c..50de1efb9d35c 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -849,31 +849,6 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int return collectionID, partitionID, segmentID, nil } -// func deserializeEntity[T any, U any]( -// eventReader *EventReader, -// binlogReader *BinlogReader, -// insertData *InsertData, -// getPayloadFunc func() (U, error), -// fillDataFunc func() FieldData, -// ) error { -// fieldID := binlogReader.FieldID -// stringPayload, err := getPayloadFunc() -// if err != nil { -// eventReader.Close() -// binlogReader.Close() -// return err -// } -// -// if insertData.Data[fieldID] == nil { -// insertData.Data[fieldID] = fillDataFunc() -// } -// stringFieldData := insertData.Data[fieldID].(*T) -// -// stringFieldData.Data = append(stringFieldData.Data, stringPayload...) -// totalLength += len(stringPayload) -// insertData.Data[fieldID] = stringFieldData -// } - // Deserialize transfer blob back to insert data. // From schema, it get all fields. // For each field, it will create a binlog reader, and read all event to the buffer. diff --git a/internal/storage/unsafe.go b/internal/storage/unsafe.go deleted file mode 100644 index 33056788ae556..0000000000000 --- a/internal/storage/unsafe.go +++ /dev/null @@ -1,61 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import "unsafe" - -/* #nosec G103 */ -func UnsafeReadByte(buf []byte, idx int) byte { - ptr := unsafe.Pointer(&(buf[idx])) - return *((*byte)(ptr)) -} - -/* #nosec G103 */ -func UnsafeReadInt8(buf []byte, idx int) int8 { - ptr := unsafe.Pointer(&(buf[idx])) - return *((*int8)(ptr)) -} - -/* #nosec G103 */ -func UnsafeReadInt16(buf []byte, idx int) int16 { - ptr := unsafe.Pointer(&(buf[idx])) - return *((*int16)(ptr)) -} - -/* #nosec G103 */ -func UnsafeReadInt32(buf []byte, idx int) int32 { - ptr := unsafe.Pointer(&(buf[idx])) - return *((*int32)(ptr)) -} - -/* #nosec G103 */ -func UnsafeReadInt64(buf []byte, idx int) int64 { - ptr := unsafe.Pointer(&(buf[idx])) - return *((*int64)(ptr)) -} - -/* #nosec G103 */ -func UnsafeReadFloat32(buf []byte, idx int) float32 { - ptr := unsafe.Pointer(&(buf[idx])) - return *((*float32)(ptr)) -} - -/* #nosec G103 */ -func UnsafeReadFloat64(buf []byte, idx int) float64 { - ptr := unsafe.Pointer(&(buf[idx])) - return *((*float64)(ptr)) -} diff --git a/internal/storage/unsafe_test.go b/internal/storage/unsafe_test.go deleted file mode 100644 index ecb60c0e79e89..0000000000000 --- a/internal/storage/unsafe_test.go +++ /dev/null @@ -1,49 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package storage - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestUnsafe(t *testing.T) { - buf := []byte{16} - int8Res := UnsafeReadInt8(buf, 0) - assert.Equal(t, int8Res, int8(16)) - - buf = []byte{16, 16} - int16Res := UnsafeReadInt16(buf, 0) - assert.Equal(t, int16Res, int16(4112)) - - buf = []byte{16, 16, 16, 16} - int32Res := UnsafeReadInt32(buf, 0) - assert.Equal(t, int32Res, int32(269488144)) - - buf = []byte{16, 16, 16, 16, 16, 16, 16, 16} - int64Res := UnsafeReadInt64(buf, 0) - assert.Equal(t, int64Res, int64(1157442765409226768)) - - buf = []byte{16, 16, 16, 16} - float32Res := UnsafeReadFloat32(buf, 0) - assert.Equal(t, float32Res, float32(2.8411367e-29)) - - buf = []byte{16, 16, 16, 16, 16, 16, 16, 16} - float64Res := UnsafeReadFloat64(buf, 0) - assert.Equal(t, float64Res, float64(2.586563270614692e-231)) -} diff --git a/internal/storage/utils.go b/internal/storage/utils.go index f3a90a88726e2..06e8d1ca7c947 100644 --- a/internal/storage/utils.go +++ b/internal/storage/utils.go @@ -982,44 +982,6 @@ func binaryWrite(endian binary.ByteOrder, data interface{}) ([]byte, error) { return buf.Bytes(), nil } -// FieldDataToBytes encode field data to byte slice. -// For some fixed-length data, such as int32, int64, float vector, use binary.Write directly. -// For binary vector, return it directly. -// For bool data, first transfer to schemapb.BoolArray and then marshal it. (TODO: handle bool like other scalar data.) -// For variable-length data, such as string, first transfer to schemapb.StringArray and then marshal it. -// TODO: find a proper way to store variable-length data. Or we should unify to use protobuf? -func FieldDataToBytes(endian binary.ByteOrder, fieldData FieldData) ([]byte, error) { - switch field := fieldData.(type) { - case *BoolFieldData: - // return binaryWrite(endian, field.Data) - return boolFieldDataToPbBytes(field) - case *StringFieldData: - return stringFieldDataToPbBytes(field) - case *ArrayFieldData: - return arrayFieldDataToPbBytes(field) - case *JSONFieldData: - return jsonFieldDataToPbBytes(field) - case *BinaryVectorFieldData: - return field.Data, nil - case *FloatVectorFieldData: - return binaryWrite(endian, field.Data) - case *Int8FieldData: - return binaryWrite(endian, field.Data) - case *Int16FieldData: - return binaryWrite(endian, field.Data) - case *Int32FieldData: - return binaryWrite(endian, field.Data) - case *Int64FieldData: - return binaryWrite(endian, field.Data) - case *FloatFieldData: - return binaryWrite(endian, field.Data) - case *DoubleFieldData: - return binaryWrite(endian, field.Data) - default: - return nil, fmt.Errorf("unsupported field data: %s", field) - } -} - func TransferInsertDataToInsertRecord(insertData *InsertData) (*segcorepb.InsertRecord, error) { insertRecord := &segcorepb.InsertRecord{} for fieldID, rawData := range insertData.Data { diff --git a/internal/storage/utils_test.go b/internal/storage/utils_test.go index f9d1be2d136d8..25eb19cf064eb 100644 --- a/internal/storage/utils_test.go +++ b/internal/storage/utils_test.go @@ -1551,94 +1551,6 @@ func binaryRead(endian binary.ByteOrder, bs []byte, receiver interface{}) error return binary.Read(reader, endian, receiver) } -func TestFieldDataToBytes(t *testing.T) { - // TODO: test big endian. - endian := common.Endian - - var bs []byte - var err error - var receiver interface{} - - f1 := &BoolFieldData{Data: []bool{true, false}} - bs, err = FieldDataToBytes(endian, f1) - assert.NoError(t, err) - var barr schemapb.BoolArray - err = proto.Unmarshal(bs, &barr) - assert.NoError(t, err) - assert.ElementsMatch(t, f1.Data, barr.Data) - - f2 := &StringFieldData{Data: []string{"true", "false"}} - bs, err = FieldDataToBytes(endian, f2) - assert.NoError(t, err) - var sarr schemapb.StringArray - err = proto.Unmarshal(bs, &sarr) - assert.NoError(t, err) - assert.ElementsMatch(t, f2.Data, sarr.Data) - - f3 := &Int8FieldData{Data: []int8{0, 1}} - bs, err = FieldDataToBytes(endian, f3) - assert.NoError(t, err) - receiver = make([]int8, 2) - err = binaryRead(endian, bs, receiver) - assert.NoError(t, err) - assert.ElementsMatch(t, f3.Data, receiver) - - f4 := &Int16FieldData{Data: []int16{0, 1}} - bs, err = FieldDataToBytes(endian, f4) - assert.NoError(t, err) - receiver = make([]int16, 2) - err = binaryRead(endian, bs, receiver) - assert.NoError(t, err) - assert.ElementsMatch(t, f4.Data, receiver) - - f5 := &Int32FieldData{Data: []int32{0, 1}} - bs, err = FieldDataToBytes(endian, f5) - assert.NoError(t, err) - receiver = make([]int32, 2) - err = binaryRead(endian, bs, receiver) - assert.NoError(t, err) - assert.ElementsMatch(t, f5.Data, receiver) - - f6 := &Int64FieldData{Data: []int64{0, 1}} - bs, err = FieldDataToBytes(endian, f6) - assert.NoError(t, err) - receiver = make([]int64, 2) - err = binaryRead(endian, bs, receiver) - assert.NoError(t, err) - assert.ElementsMatch(t, f6.Data, receiver) - - // in fact, hard to compare float point value. - - f7 := &FloatFieldData{Data: []float32{0, 1}} - bs, err = FieldDataToBytes(endian, f7) - assert.NoError(t, err) - receiver = make([]float32, 2) - err = binaryRead(endian, bs, receiver) - assert.NoError(t, err) - assert.ElementsMatch(t, f7.Data, receiver) - - f8 := &DoubleFieldData{Data: []float64{0, 1}} - bs, err = FieldDataToBytes(endian, f8) - assert.NoError(t, err) - receiver = make([]float64, 2) - err = binaryRead(endian, bs, receiver) - assert.NoError(t, err) - assert.ElementsMatch(t, f8.Data, receiver) - - f9 := &BinaryVectorFieldData{Data: []byte{0, 1, 0}} - bs, err = FieldDataToBytes(endian, f9) - assert.NoError(t, err) - assert.ElementsMatch(t, f9.Data, bs) - - f10 := &FloatVectorFieldData{Data: []float32{0, 1}} - bs, err = FieldDataToBytes(endian, f10) - assert.NoError(t, err) - receiver = make([]float32, 2) - err = binaryRead(endian, bs, receiver) - assert.NoError(t, err) - assert.ElementsMatch(t, f10.Data, receiver) -} - func TestJson(t *testing.T) { extras := make(map[string]string) extras["IndexBuildID"] = "10"