Skip to content

Commit

Permalink
legacy code clean up
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
  • Loading branch information
shaoting-huang committed Jun 13, 2024
1 parent ca1f7ab commit d164333
Show file tree
Hide file tree
Showing 6 changed files with 0 additions and 477 deletions.
216 changes: 0 additions & 216 deletions internal/storage/binlog_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,69 +127,6 @@ func (itr *InsertBinlogIterator) isDisposed() bool {
return atomic.LoadInt32(&itr.dispose) == 1
}

/*
type DeltalogIterator struct {
dispose int32
values []*Value
pos int
}
func NewDeltalogIterator(blob *Blob) (*DeltalogIterator, error) {
deltaCodec := NewDeleteCodec()
_, _, serData, err := deltaCodec.Deserialize(blob)
if err != nil {
return nil, err
}
values := make([]*Value, 0, len(serData.Data))
for pkstr, ts := range serData.Data {
pk, err := strconv.ParseInt(pkstr, 10, 64)
if err != nil {
return nil, err
}
values = append(values, &Value{pk, ts, true, nil})
}
sort.Slice(values, func(i, j int) bool { return values[i].id < values[j].id })
return &DeltalogIterator{values: values}, nil
}
// HasNext returns true if the iterator have unread record
func (itr *DeltalogIterator) HasNext() bool {
return !itr.isDisposed() && itr.hasNext()
}
// Next returns the next record
func (itr *DeltalogIterator) Next() (interface{}, error) {
if itr.isDisposed() {
return nil, ErrDisposed
}
if !itr.hasNext() {
return nil, ErrNoMoreRecord
}
tmp := itr.values[itr.pos]
itr.pos++
return tmp, nil
}
// Dispose disposes the iterator
func (itr *DeltalogIterator) Dispose() {
atomic.CompareAndSwapInt32(&itr.dispose, 0, 1)
}
func (itr *DeltalogIterator) hasNext() bool {
return itr.pos < len(itr.values)
}
func (itr *DeltalogIterator) isDisposed() bool {
return atomic.LoadInt32(&itr.dispose) == 1
}
*/

// MergeIterator merge iterators.
type MergeIterator struct {
disposed int32
Expand Down Expand Up @@ -278,156 +215,3 @@ func (itr *MergeIterator) hasNext() bool {
itr.nextRecord = minRecord
return true
}

/*
func NewInsertlogMergeIterator(blobs [][]*Blob) (*MergeIterator, error) {
iterators := make([]Iterator, 0, len(blobs))
for _, fieldBlobs := range blobs {
itr, err := NewInsertBinlogIterator(fieldBlobs)
if err != nil {
return nil, err
}
iterators = append(iterators, itr)
}
return NewMergeIterator(iterators), nil
}
func NewDeltalogMergeIterator(blobs []*Blob) (*MergeIterator, error) {
iterators := make([]Iterator, 0, len(blobs))
for _, blob := range blobs {
itr, err := NewDeltalogIterator(blob)
if err != nil {
return nil, err
}
iterators = append(iterators, itr)
}
return NewMergeIterator(iterators), nil
}
type MergeSingleSegmentIterator struct {
disposed int32
insertItr Iterator
deltaItr Iterator
timetravel int64
nextRecord *Value
insertTmpRecord *Value
deltaTmpRecord *Value
}
func NewMergeSingleSegmentIterator(insertBlobs [][]*Blob, deltaBlobs []*Blob, timetravel int64) (*MergeSingleSegmentIterator, error) {
insertMergeItr, err := NewInsertlogMergeIterator(insertBlobs)
if err != nil {
return nil, err
}
deltaMergeItr, err := NewDeltalogMergeIterator(deltaBlobs)
if err != nil {
return nil, err
}
return &MergeSingleSegmentIterator{
insertItr: insertMergeItr,
deltaItr: deltaMergeItr,
timetravel: timetravel,
}, nil
}
// HasNext returns true if the iterator have unread record
func (itr *MergeSingleSegmentIterator) HasNext() bool {
return !itr.isDisposed() && itr.hasNext()
}
// Next returns the next record
func (itr *MergeSingleSegmentIterator) Next() (interface{}, error) {
if itr.isDisposed() {
return nil, ErrDisposed
}
if !itr.hasNext() {
return nil, ErrNoMoreRecord
}
tmp := itr.nextRecord
itr.nextRecord = nil
return tmp, nil
}
// Dispose disposes the iterator
func (itr *MergeSingleSegmentIterator) Dispose() {
if itr.isDisposed() {
return
}
if itr.insertItr != nil {
itr.insertItr.Dispose()
}
if itr.deltaItr != nil {
itr.deltaItr.Dispose()
}
atomic.CompareAndSwapInt32(&itr.disposed, 0, 1)
}
func (itr *MergeSingleSegmentIterator) isDisposed() bool {
return atomic.LoadInt32(&itr.disposed) == 1
}
func (itr *MergeSingleSegmentIterator) hasNext() bool {
if itr.nextRecord != nil {
return true
}
for {
if itr.insertTmpRecord == nil && itr.insertItr.HasNext() {
r, _ := itr.insertItr.Next()
itr.insertTmpRecord = r.(*Value)
}
if itr.deltaTmpRecord == nil && itr.deltaItr.HasNext() {
r, _ := itr.deltaItr.Next()
itr.deltaTmpRecord = r.(*Value)
}
if itr.insertTmpRecord == nil && itr.deltaTmpRecord == nil {
return false
} else if itr.insertTmpRecord == nil {
itr.nextRecord = itr.deltaTmpRecord
itr.deltaTmpRecord = nil
return true
} else if itr.deltaTmpRecord == nil {
itr.nextRecord = itr.insertTmpRecord
itr.insertTmpRecord = nil
return true
} else {
// merge records
if itr.insertTmpRecord.timestamp >= itr.timetravel {
itr.nextRecord = itr.insertTmpRecord
itr.insertTmpRecord = nil
return true
}
if itr.deltaTmpRecord.timestamp >= itr.timetravel {
itr.nextRecord = itr.deltaTmpRecord
itr.deltaTmpRecord = nil
return true
}
if itr.insertTmpRecord.id < itr.deltaTmpRecord.id {
itr.nextRecord = itr.insertTmpRecord
itr.insertTmpRecord = nil
return true
} else if itr.insertTmpRecord.id > itr.deltaTmpRecord.id {
itr.deltaTmpRecord = nil
continue
} else if itr.insertTmpRecord.id == itr.deltaTmpRecord.id {
if itr.insertTmpRecord.timestamp <= itr.deltaTmpRecord.timestamp {
itr.insertTmpRecord = nil
continue
} else {
itr.deltaTmpRecord = nil
continue
}
}
}
}
}
*/
25 changes: 0 additions & 25 deletions internal/storage/data_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,31 +849,6 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int
return collectionID, partitionID, segmentID, nil
}

// func deserializeEntity[T any, U any](
// eventReader *EventReader,
// binlogReader *BinlogReader,
// insertData *InsertData,
// getPayloadFunc func() (U, error),
// fillDataFunc func() FieldData,
// ) error {
// fieldID := binlogReader.FieldID
// stringPayload, err := getPayloadFunc()
// if err != nil {
// eventReader.Close()
// binlogReader.Close()
// return err
// }
//
// if insertData.Data[fieldID] == nil {
// insertData.Data[fieldID] = fillDataFunc()
// }
// stringFieldData := insertData.Data[fieldID].(*T)
//
// stringFieldData.Data = append(stringFieldData.Data, stringPayload...)
// totalLength += len(stringPayload)
// insertData.Data[fieldID] = stringFieldData
// }

// Deserialize transfer blob back to insert data.
// From schema, it get all fields.
// For each field, it will create a binlog reader, and read all event to the buffer.
Expand Down
61 changes: 0 additions & 61 deletions internal/storage/unsafe.go

This file was deleted.

49 changes: 0 additions & 49 deletions internal/storage/unsafe_test.go

This file was deleted.

Loading

0 comments on commit d164333

Please sign in to comment.