Skip to content

Commit

Permalink
bulkinsert binlog didn't consider ts order when processing delta data
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Dec 13, 2023
1 parent 3acb672 commit 0ee4ec5
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 52 deletions.
20 changes: 14 additions & 6 deletions internal/util/importutil/binlog_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,20 @@ func (p *BinlogAdapter) readDeltalogs(segmentHolder *SegmentFilesHolder) (map[in
if primaryKey.GetDataType() == schemapb.DataType_Int64 {
deletedIDDict := make(map[int64]uint64)
for _, deleteLog := range deleteLogs {
deletedIDDict[deleteLog.Pk.GetValue().(int64)] = deleteLog.Ts
_, exist := deletedIDDict[deleteLog.Pk.GetValue().(int64)]
if !exist || deleteLog.Ts > deletedIDDict[deleteLog.Pk.GetValue().(int64)] {
deletedIDDict[deleteLog.Pk.GetValue().(int64)] = deleteLog.Ts
}
}
log.Info("Binlog adapter: count of deleted entities", zap.Int("deletedCount", len(deletedIDDict)))
return deletedIDDict, nil, nil
} else if primaryKey.GetDataType() == schemapb.DataType_VarChar {
deletedIDDict := make(map[string]uint64)
for _, deleteLog := range deleteLogs {
deletedIDDict[deleteLog.Pk.GetValue().(string)] = deleteLog.Ts
_, exist := deletedIDDict[deleteLog.Pk.GetValue().(string)]
if !exist || deleteLog.Ts > deletedIDDict[deleteLog.Pk.GetValue().(string)] {
deletedIDDict[deleteLog.Pk.GetValue().(string)] = deleteLog.Ts
}
}
log.Info("Binlog adapter: count of deleted entities", zap.Int("deletedCount", len(deletedIDDict)))
return nil, deletedIDDict, nil
Expand Down Expand Up @@ -530,9 +536,10 @@ func (p *BinlogAdapter) getShardingListByPrimaryInt64(primaryKeys []int64,
continue
}

_, deleted := intDeletedList[key]
deleteTs, deleted := intDeletedList[key]
// if the key exists in intDeletedList, that means this entity has been deleted
if deleted {
// only skip entity when delete happen after insert
if deleted && deleteTs > uint64(ts) {
shardList = append(shardList, -1) // this entity has been deleted, set shardID = -1 and skip this entity
actualDeleted++
} else {
Expand Down Expand Up @@ -584,9 +591,10 @@ func (p *BinlogAdapter) getShardingListByPrimaryVarchar(primaryKeys []string,
continue
}

_, deleted := strDeletedList[key]
deleteTs, deleted := strDeletedList[key]
// if exists in strDeletedList, that means this entity has been deleted
if deleted {
// only skip entity when delete happen after insert
if deleted && deleteTs > uint64(ts) {
shardList = append(shardList, -1) // this entity has been deleted, set shardID = -1 and skip this entity
actualDeleted++
} else {
Expand Down
48 changes: 26 additions & 22 deletions internal/util/importutil/binlog_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
baseTimestamp = 43757345
)

func createDeltalogBuf(t *testing.T, deleteList interface{}, varcharType bool) []byte {
func createDeltalogBuf(t *testing.T, deleteList interface{}, varcharType bool, startTimestamp uint64) []byte {
deleteData := &storage.DeleteData{
Pks: make([]storage.PrimaryKey, 0),
Tss: make([]storage.Timestamp, 0),
Expand All @@ -47,15 +47,15 @@ func createDeltalogBuf(t *testing.T, deleteList interface{}, varcharType bool) [
assert.NotNil(t, deltaData)
for i, id := range deltaData {
deleteData.Pks = append(deleteData.Pks, storage.NewVarCharPrimaryKey(id))
deleteData.Tss = append(deleteData.Tss, baseTimestamp+uint64(i))
deleteData.Tss = append(deleteData.Tss, startTimestamp+uint64(i))
deleteData.RowCount++
}
} else {
deltaData := deleteList.([]int64)
assert.NotNil(t, deltaData)
for i, id := range deltaData {
deleteData.Pks = append(deleteData.Pks, storage.NewInt64PrimaryKey(id))
deleteData.Tss = append(deleteData.Tss, baseTimestamp+uint64(i))
deleteData.Tss = append(deleteData.Tss, startTimestamp+uint64(i))
deleteData.RowCount++
}
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func Test_BinlogAdapterReadDeltalog(t *testing.T) {
ctx := context.Background()

deleteItems := []int64{1001, 1002, 1003}
buf := createDeltalogBuf(t, deleteItems, false)
buf := createDeltalogBuf(t, deleteItems, false, baseTimestamp)
chunkManager := &MockChunkManager{
readBuf: map[string][]byte{
"dummy": buf,
Expand Down Expand Up @@ -212,7 +212,7 @@ func Test_BinlogAdapterDecodeDeleteLogs(t *testing.T) {
ctx := context.Background()

deleteItems := []int64{1001, 1002, 1003, 1004, 1005}
buf := createDeltalogBuf(t, deleteItems, false)
buf := createDeltalogBuf(t, deleteItems, false, baseTimestamp)
chunkManager := &MockChunkManager{
readBuf: map[string][]byte{
"dummy": buf,
Expand Down Expand Up @@ -244,7 +244,7 @@ func Test_BinlogAdapterDecodeDeleteLogs(t *testing.T) {

// wrong data type of delta log
chunkManager.readBuf = map[string][]byte{
"dummy": createDeltalogBuf(t, []string{"1001", "1002"}, true),
"dummy": createDeltalogBuf(t, []string{"1001", "1002"}, true, baseTimestamp),
}

adapter, err = NewBinlogAdapter(ctx, collectionInfo, 1024, 2048, chunkManager, flushFunc, 0, math.MaxUint64)
Expand Down Expand Up @@ -317,7 +317,7 @@ func Test_BinlogAdapterReadDeltalogs(t *testing.T) {
ctx := context.Background()

deleteItems := []int64{1001, 1002, 1003, 1004, 1005}
buf := createDeltalogBuf(t, deleteItems, false)
buf := createDeltalogBuf(t, deleteItems, false, baseTimestamp)
chunkManager := &MockChunkManager{
readBuf: map[string][]byte{
"dummy": buf,
Expand Down Expand Up @@ -374,7 +374,7 @@ func Test_BinlogAdapterReadDeltalogs(t *testing.T) {
collectionInfo.resetSchema(schema)

chunkManager.readBuf = map[string][]byte{
"dummy": createDeltalogBuf(t, []string{"1001", "1002"}, true),
"dummy": createDeltalogBuf(t, []string{"1001", "1002"}, true, baseTimestamp),
}

adapter, err = NewBinlogAdapter(ctx, collectionInfo, 1024, 2048, chunkManager, flushFunc, 0, math.MaxUint64)
Expand Down Expand Up @@ -462,7 +462,7 @@ func Test_BinlogAdapterReadTimestamp(t *testing.T) {

// succeed
rowCount := 10
fieldsData := createFieldsData(sampleSchema(), rowCount)
fieldsData := createFieldsData(sampleSchema(), rowCount, baseTimestamp)
chunkManager.readBuf["dummy"] = createBinlogBuf(t, schemapb.DataType_Int64, fieldsData[106].([]int64))
ts, err = adapter.readTimestamp("dummy")
assert.NoError(t, err)
Expand Down Expand Up @@ -502,7 +502,7 @@ func Test_BinlogAdapterReadPrimaryKeys(t *testing.T) {

// wrong primary key type
rowCount := 10
fieldsData := createFieldsData(sampleSchema(), rowCount)
fieldsData := createFieldsData(sampleSchema(), rowCount, baseTimestamp)
chunkManager.readBuf["dummy"] = createBinlogBuf(t, schemapb.DataType_Bool, fieldsData[102].([]bool))

adapter.collectionInfo.PrimaryKey.DataType = schemapb.DataType_Bool
Expand Down Expand Up @@ -545,7 +545,7 @@ func Test_BinlogAdapterShardListInt64(t *testing.T) {
assert.NotNil(t, adapter)
assert.NoError(t, err)

fieldsData := createFieldsData(sampleSchema(), 0)
fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp)
shardsData := createShardsData(sampleSchema(), fieldsData, shardNum, []int64{1})

// wrong input
Expand Down Expand Up @@ -587,7 +587,7 @@ func Test_BinlogAdapterShardListVarchar(t *testing.T) {
assert.NotNil(t, adapter)
assert.NoError(t, err)

fieldsData := createFieldsData(strKeySchema(), 0)
fieldsData := createFieldsData(strKeySchema(), 0, baseTimestamp)
shardsData := createShardsData(strKeySchema(), fieldsData, shardNum, []int64{1})
// wrong input
shardList, err := adapter.getShardingListByPrimaryVarchar([]string{"1"}, []int64{1, 2}, shardsData, map[string]uint64{})
Expand Down Expand Up @@ -615,6 +615,7 @@ func Test_BinlogAdapterShardListVarchar(t *testing.T) {

func Test_BinlogAdapterReadInt64PK(t *testing.T) {
ctx := context.Background()
paramtable.Init()

chunkManager := &MockChunkManager{}

Expand Down Expand Up @@ -677,7 +678,7 @@ func Test_BinlogAdapterReadInt64PK(t *testing.T) {

// prepare binlog data
rowCount := 1000
fieldsData := createFieldsData(sampleSchema(), rowCount)
fieldsData := createFieldsData(sampleSchema(), rowCount, baseTimestamp)
deletedItems := []int64{41, 51, 100, 400, 600}

chunkManager.readBuf = map[string][]byte{
Expand All @@ -693,7 +694,7 @@ func Test_BinlogAdapterReadInt64PK(t *testing.T) {
"111_insertlog": createBinlogBuf(t, schemapb.DataType_FloatVector, fieldsData[111].([][]float32)),
"112_insertlog": createBinlogBuf(t, schemapb.DataType_JSON, fieldsData[112].([][]byte)),
"113_insertlog": createBinlogBuf(t, schemapb.DataType_Array, fieldsData[113].([]*schemapb.ScalarField)),
"deltalog": createDeltalogBuf(t, deletedItems, false),
"deltalog": createDeltalogBuf(t, deletedItems, false, baseTimestamp+300),
}

// failed to read primary keys
Expand All @@ -708,15 +709,18 @@ func Test_BinlogAdapterReadInt64PK(t *testing.T) {
// succeed flush
chunkManager.readBuf["1_insertlog"] = createBinlogBuf(t, schemapb.DataType_Int64, fieldsData[1].([]int64))

adapter.tsEndPoint = baseTimestamp + uint64(499) // 4 entities deleted, 500 entities excluded
// as we createDeltalogBuf with baseTimestamp+300. deletedata pk = {41, 51, 100, 400, 600} ts = {341, 351, 400, 700, 900}
// ts = {341, 351, 400} < 499 will be deleted
adapter.tsEndPoint = baseTimestamp + uint64(499) // 3 entities deleted, 500 entities excluded
err = adapter.Read(holder)
assert.NoError(t, err)
assert.Equal(t, shardNum, int32(flushCounter))
assert.Equal(t, rowCount-4-500, flushRowCount)
assert.Equal(t, rowCount-3-500, flushRowCount)
}

func Test_BinlogAdapterReadVarcharPK(t *testing.T) {
ctx := context.Background()
paramtable.Init()

chunkManager := &MockChunkManager{}

Expand Down Expand Up @@ -788,7 +792,7 @@ func Test_BinlogAdapterReadVarcharPK(t *testing.T) {
"104_insertlog": createBinlogBuf(t, schemapb.DataType_VarChar, varcharData),
"105_insertlog": createBinlogBuf(t, schemapb.DataType_Bool, boolData),
"106_insertlog": createBinlogBuf(t, schemapb.DataType_FloatVector, floatVecData),
"deltalog": createDeltalogBuf(t, deletedItems, true),
"deltalog": createDeltalogBuf(t, deletedItems, true, baseTimestamp+300),
}

// succeed
Expand All @@ -800,7 +804,7 @@ func Test_BinlogAdapterReadVarcharPK(t *testing.T) {
assert.NotNil(t, adapter)
assert.NoError(t, err)

adapter.tsEndPoint = baseTimestamp + uint64(499) // 3 entities deleted, 500 entities excluded, the "999" is excluded, so totally 502 entities skipped
adapter.tsEndPoint = baseTimestamp + uint64(499) // 2 entities deleted, 500 entities excluded, the "999" is excluded, so totally 502 entities skipped
err = adapter.Read(holder)
assert.NoError(t, err)
assert.Equal(t, shardNum, int32(flushCounter))
Expand All @@ -823,7 +827,7 @@ func Test_BinlogAdapterDispatch(t *testing.T) {

// prepare empty in-memory segments data
partitionID := int64(1)
fieldsData := createFieldsData(sampleSchema(), 0)
fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp)
shardsData := createShardsData(sampleSchema(), fieldsData, shardNum, []int64{partitionID})

shardList := []int32{0, -1, 1}
Expand Down Expand Up @@ -1146,7 +1150,7 @@ func Test_BinlogAdapterVerifyField(t *testing.T) {

shardNum := int32(2)
partitionID := int64(1)
fieldsData := createFieldsData(sampleSchema(), 0)
fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp)
shardsData := createShardsData(sampleSchema(), fieldsData, shardNum, []int64{partitionID})

flushFunc := func(fields BlockData, shardID int, partID int64) error {
Expand All @@ -1173,7 +1177,7 @@ func Test_BinlogAdapterReadInsertlog(t *testing.T) {

shardNum := int32(2)
partitionID := int64(1)
fieldsData := createFieldsData(sampleSchema(), 0)
fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp)
shardsData := createShardsData(sampleSchema(), fieldsData, shardNum, []int64{partitionID})

flushFunc := func(fields BlockData, shardID int, partID int64) error {
Expand Down Expand Up @@ -1205,7 +1209,7 @@ func Test_BinlogAdapterReadInsertlog(t *testing.T) {

// prepare binlog data
rowCount := 3
fieldsData = createFieldsData(sampleSchema(), rowCount)
fieldsData = createFieldsData(sampleSchema(), rowCount, baseTimestamp)

failedFunc := func(fieldID int64, fieldName string, fieldType schemapb.DataType, wrongField int64, wrongType schemapb.DataType) {
// row count mismatch
Expand Down
24 changes: 12 additions & 12 deletions internal/util/importutil/binlog_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func Test_BinlogFileBool(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -388,7 +388,7 @@ func Test_BinlogFileInt8(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -446,7 +446,7 @@ func Test_BinlogFileInt16(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -503,7 +503,7 @@ func Test_BinlogFileInt32(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -560,7 +560,7 @@ func Test_BinlogFileInt64(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -617,7 +617,7 @@ func Test_BinlogFileFloat(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -674,7 +674,7 @@ func Test_BinlogFileDouble(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -778,7 +778,7 @@ func Test_BinlogFileJSON(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -858,7 +858,7 @@ func Test_BinlogFileArray(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -937,7 +937,7 @@ func Test_BinlogFileBinaryVector(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -1004,7 +1004,7 @@ func Test_BinlogFileFloatVector(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -1072,7 +1072,7 @@ func Test_BinlogFileFloat16Vector(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion internal/util/importutil/binlog_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func Test_BinlogParserParse(t *testing.T) {

// progress
rowCount := 100
fieldsData := createFieldsData(sampleSchema(), rowCount)
fieldsData := createFieldsData(sampleSchema(), rowCount, baseTimestamp)
chunkManager.listResult["deltaPath"] = []string{}
chunkManager.listResult["insertPath"] = []string{
"123/0/a",
Expand Down
Loading

0 comments on commit 0ee4ec5

Please sign in to comment.