Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bulkinsert binlog didn't consider ts order when processing delta data #5

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions internal/util/importutil/binlog_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,20 @@ func (p *BinlogAdapter) readDeltalogs(segmentHolder *SegmentFilesHolder) (map[in
if primaryKey.GetDataType() == schemapb.DataType_Int64 {
deletedIDDict := make(map[int64]uint64)
for _, deleteLog := range deleteLogs {
deletedIDDict[deleteLog.Pk.GetValue().(int64)] = deleteLog.Ts
_, exist := deletedIDDict[deleteLog.Pk.GetValue().(int64)]
if !exist || deleteLog.Ts > deletedIDDict[deleteLog.Pk.GetValue().(int64)] {
deletedIDDict[deleteLog.Pk.GetValue().(int64)] = deleteLog.Ts
}
}
log.Info("Binlog adapter: count of deleted entities", zap.Int("deletedCount", len(deletedIDDict)))
return deletedIDDict, nil, nil
} else if primaryKey.GetDataType() == schemapb.DataType_VarChar {
deletedIDDict := make(map[string]uint64)
for _, deleteLog := range deleteLogs {
deletedIDDict[deleteLog.Pk.GetValue().(string)] = deleteLog.Ts
_, exist := deletedIDDict[deleteLog.Pk.GetValue().(string)]
if !exist || deleteLog.Ts > deletedIDDict[deleteLog.Pk.GetValue().(string)] {
deletedIDDict[deleteLog.Pk.GetValue().(string)] = deleteLog.Ts
}
}
log.Info("Binlog adapter: count of deleted entities", zap.Int("deletedCount", len(deletedIDDict)))
return nil, deletedIDDict, nil
Expand Down Expand Up @@ -530,9 +536,10 @@ func (p *BinlogAdapter) getShardingListByPrimaryInt64(primaryKeys []int64,
continue
}

_, deleted := intDeletedList[key]
deleteTs, deleted := intDeletedList[key]
// if the key exists in intDeletedList, that means this entity has been deleted
if deleted {
// only skip entity when delete happen after insert
if deleted && deleteTs > uint64(ts) {
shardList = append(shardList, -1) // this entity has been deleted, set shardID = -1 and skip this entity
actualDeleted++
} else {
Expand Down Expand Up @@ -584,9 +591,10 @@ func (p *BinlogAdapter) getShardingListByPrimaryVarchar(primaryKeys []string,
continue
}

_, deleted := strDeletedList[key]
deleteTs, deleted := strDeletedList[key]
// if exists in strDeletedList, that means this entity has been deleted
if deleted {
// only skip entity when delete happen after insert
if deleted && deleteTs > uint64(ts) {
shardList = append(shardList, -1) // this entity has been deleted, set shardID = -1 and skip this entity
actualDeleted++
} else {
Expand Down
48 changes: 26 additions & 22 deletions internal/util/importutil/binlog_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
baseTimestamp = 43757345
)

func createDeltalogBuf(t *testing.T, deleteList interface{}, varcharType bool) []byte {
func createDeltalogBuf(t *testing.T, deleteList interface{}, varcharType bool, startTimestamp uint64) []byte {
deleteData := &storage.DeleteData{
Pks: make([]storage.PrimaryKey, 0),
Tss: make([]storage.Timestamp, 0),
Expand All @@ -47,15 +47,15 @@ func createDeltalogBuf(t *testing.T, deleteList interface{}, varcharType bool) [
assert.NotNil(t, deltaData)
for i, id := range deltaData {
deleteData.Pks = append(deleteData.Pks, storage.NewVarCharPrimaryKey(id))
deleteData.Tss = append(deleteData.Tss, baseTimestamp+uint64(i))
deleteData.Tss = append(deleteData.Tss, startTimestamp+uint64(i))
deleteData.RowCount++
}
} else {
deltaData := deleteList.([]int64)
assert.NotNil(t, deltaData)
for i, id := range deltaData {
deleteData.Pks = append(deleteData.Pks, storage.NewInt64PrimaryKey(id))
deleteData.Tss = append(deleteData.Tss, baseTimestamp+uint64(i))
deleteData.Tss = append(deleteData.Tss, startTimestamp+uint64(i))
deleteData.RowCount++
}
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func Test_BinlogAdapterReadDeltalog(t *testing.T) {
ctx := context.Background()

deleteItems := []int64{1001, 1002, 1003}
buf := createDeltalogBuf(t, deleteItems, false)
buf := createDeltalogBuf(t, deleteItems, false, baseTimestamp)
chunkManager := &MockChunkManager{
readBuf: map[string][]byte{
"dummy": buf,
Expand Down Expand Up @@ -212,7 +212,7 @@ func Test_BinlogAdapterDecodeDeleteLogs(t *testing.T) {
ctx := context.Background()

deleteItems := []int64{1001, 1002, 1003, 1004, 1005}
buf := createDeltalogBuf(t, deleteItems, false)
buf := createDeltalogBuf(t, deleteItems, false, baseTimestamp)
chunkManager := &MockChunkManager{
readBuf: map[string][]byte{
"dummy": buf,
Expand Down Expand Up @@ -244,7 +244,7 @@ func Test_BinlogAdapterDecodeDeleteLogs(t *testing.T) {

// wrong data type of delta log
chunkManager.readBuf = map[string][]byte{
"dummy": createDeltalogBuf(t, []string{"1001", "1002"}, true),
"dummy": createDeltalogBuf(t, []string{"1001", "1002"}, true, baseTimestamp),
}

adapter, err = NewBinlogAdapter(ctx, collectionInfo, 1024, 2048, chunkManager, flushFunc, 0, math.MaxUint64)
Expand Down Expand Up @@ -317,7 +317,7 @@ func Test_BinlogAdapterReadDeltalogs(t *testing.T) {
ctx := context.Background()

deleteItems := []int64{1001, 1002, 1003, 1004, 1005}
buf := createDeltalogBuf(t, deleteItems, false)
buf := createDeltalogBuf(t, deleteItems, false, baseTimestamp)
chunkManager := &MockChunkManager{
readBuf: map[string][]byte{
"dummy": buf,
Expand Down Expand Up @@ -374,7 +374,7 @@ func Test_BinlogAdapterReadDeltalogs(t *testing.T) {
collectionInfo.resetSchema(schema)

chunkManager.readBuf = map[string][]byte{
"dummy": createDeltalogBuf(t, []string{"1001", "1002"}, true),
"dummy": createDeltalogBuf(t, []string{"1001", "1002"}, true, baseTimestamp),
}

adapter, err = NewBinlogAdapter(ctx, collectionInfo, 1024, 2048, chunkManager, flushFunc, 0, math.MaxUint64)
Expand Down Expand Up @@ -462,7 +462,7 @@ func Test_BinlogAdapterReadTimestamp(t *testing.T) {

// succeed
rowCount := 10
fieldsData := createFieldsData(sampleSchema(), rowCount)
fieldsData := createFieldsData(sampleSchema(), rowCount, baseTimestamp)
chunkManager.readBuf["dummy"] = createBinlogBuf(t, schemapb.DataType_Int64, fieldsData[106].([]int64))
ts, err = adapter.readTimestamp("dummy")
assert.NoError(t, err)
Expand Down Expand Up @@ -502,7 +502,7 @@ func Test_BinlogAdapterReadPrimaryKeys(t *testing.T) {

// wrong primary key type
rowCount := 10
fieldsData := createFieldsData(sampleSchema(), rowCount)
fieldsData := createFieldsData(sampleSchema(), rowCount, baseTimestamp)
chunkManager.readBuf["dummy"] = createBinlogBuf(t, schemapb.DataType_Bool, fieldsData[102].([]bool))

adapter.collectionInfo.PrimaryKey.DataType = schemapb.DataType_Bool
Expand Down Expand Up @@ -545,7 +545,7 @@ func Test_BinlogAdapterShardListInt64(t *testing.T) {
assert.NotNil(t, adapter)
assert.NoError(t, err)

fieldsData := createFieldsData(sampleSchema(), 0)
fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp)
shardsData := createShardsData(sampleSchema(), fieldsData, shardNum, []int64{1})

// wrong input
Expand Down Expand Up @@ -587,7 +587,7 @@ func Test_BinlogAdapterShardListVarchar(t *testing.T) {
assert.NotNil(t, adapter)
assert.NoError(t, err)

fieldsData := createFieldsData(strKeySchema(), 0)
fieldsData := createFieldsData(strKeySchema(), 0, baseTimestamp)
shardsData := createShardsData(strKeySchema(), fieldsData, shardNum, []int64{1})
// wrong input
shardList, err := adapter.getShardingListByPrimaryVarchar([]string{"1"}, []int64{1, 2}, shardsData, map[string]uint64{})
Expand Down Expand Up @@ -615,6 +615,7 @@ func Test_BinlogAdapterShardListVarchar(t *testing.T) {

func Test_BinlogAdapterReadInt64PK(t *testing.T) {
ctx := context.Background()
paramtable.Init()

chunkManager := &MockChunkManager{}

Expand Down Expand Up @@ -677,7 +678,7 @@ func Test_BinlogAdapterReadInt64PK(t *testing.T) {

// prepare binlog data
rowCount := 1000
fieldsData := createFieldsData(sampleSchema(), rowCount)
fieldsData := createFieldsData(sampleSchema(), rowCount, baseTimestamp)
deletedItems := []int64{41, 51, 100, 400, 600}

chunkManager.readBuf = map[string][]byte{
Expand All @@ -693,7 +694,7 @@ func Test_BinlogAdapterReadInt64PK(t *testing.T) {
"111_insertlog": createBinlogBuf(t, schemapb.DataType_FloatVector, fieldsData[111].([][]float32)),
"112_insertlog": createBinlogBuf(t, schemapb.DataType_JSON, fieldsData[112].([][]byte)),
"113_insertlog": createBinlogBuf(t, schemapb.DataType_Array, fieldsData[113].([]*schemapb.ScalarField)),
"deltalog": createDeltalogBuf(t, deletedItems, false),
"deltalog": createDeltalogBuf(t, deletedItems, false, baseTimestamp+300),
}

// failed to read primary keys
Expand All @@ -708,15 +709,18 @@ func Test_BinlogAdapterReadInt64PK(t *testing.T) {
// succeed flush
chunkManager.readBuf["1_insertlog"] = createBinlogBuf(t, schemapb.DataType_Int64, fieldsData[1].([]int64))

adapter.tsEndPoint = baseTimestamp + uint64(499) // 4 entities deleted, 500 entities excluded
// as we createDeltalogBuf with baseTimestamp+300. deletedata pk = {41, 51, 100, 400, 600} ts = {341, 351, 400, 700, 900}
// ts = {341, 351, 400} < 499 will be deleted
adapter.tsEndPoint = baseTimestamp + uint64(499) // 3 entities deleted, 500 entities excluded
err = adapter.Read(holder)
assert.NoError(t, err)
assert.Equal(t, shardNum, int32(flushCounter))
assert.Equal(t, rowCount-4-500, flushRowCount)
assert.Equal(t, rowCount-3-500, flushRowCount)
}

func Test_BinlogAdapterReadVarcharPK(t *testing.T) {
ctx := context.Background()
paramtable.Init()

chunkManager := &MockChunkManager{}

Expand Down Expand Up @@ -788,7 +792,7 @@ func Test_BinlogAdapterReadVarcharPK(t *testing.T) {
"104_insertlog": createBinlogBuf(t, schemapb.DataType_VarChar, varcharData),
"105_insertlog": createBinlogBuf(t, schemapb.DataType_Bool, boolData),
"106_insertlog": createBinlogBuf(t, schemapb.DataType_FloatVector, floatVecData),
"deltalog": createDeltalogBuf(t, deletedItems, true),
"deltalog": createDeltalogBuf(t, deletedItems, true, baseTimestamp+300),
}

// succeed
Expand All @@ -800,7 +804,7 @@ func Test_BinlogAdapterReadVarcharPK(t *testing.T) {
assert.NotNil(t, adapter)
assert.NoError(t, err)

adapter.tsEndPoint = baseTimestamp + uint64(499) // 3 entities deleted, 500 entities excluded, the "999" is excluded, so totally 502 entities skipped
adapter.tsEndPoint = baseTimestamp + uint64(499) // 2 entities deleted, 500 entities excluded, the "999" is excluded, so totally 502 entities skipped
err = adapter.Read(holder)
assert.NoError(t, err)
assert.Equal(t, shardNum, int32(flushCounter))
Expand All @@ -823,7 +827,7 @@ func Test_BinlogAdapterDispatch(t *testing.T) {

// prepare empty in-memory segments data
partitionID := int64(1)
fieldsData := createFieldsData(sampleSchema(), 0)
fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp)
shardsData := createShardsData(sampleSchema(), fieldsData, shardNum, []int64{partitionID})

shardList := []int32{0, -1, 1}
Expand Down Expand Up @@ -1146,7 +1150,7 @@ func Test_BinlogAdapterVerifyField(t *testing.T) {

shardNum := int32(2)
partitionID := int64(1)
fieldsData := createFieldsData(sampleSchema(), 0)
fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp)
shardsData := createShardsData(sampleSchema(), fieldsData, shardNum, []int64{partitionID})

flushFunc := func(fields BlockData, shardID int, partID int64) error {
Expand All @@ -1173,7 +1177,7 @@ func Test_BinlogAdapterReadInsertlog(t *testing.T) {

shardNum := int32(2)
partitionID := int64(1)
fieldsData := createFieldsData(sampleSchema(), 0)
fieldsData := createFieldsData(sampleSchema(), 0, baseTimestamp)
shardsData := createShardsData(sampleSchema(), fieldsData, shardNum, []int64{partitionID})

flushFunc := func(fields BlockData, shardID int, partID int64) error {
Expand Down Expand Up @@ -1205,7 +1209,7 @@ func Test_BinlogAdapterReadInsertlog(t *testing.T) {

// prepare binlog data
rowCount := 3
fieldsData = createFieldsData(sampleSchema(), rowCount)
fieldsData = createFieldsData(sampleSchema(), rowCount, baseTimestamp)

failedFunc := func(fieldID int64, fieldName string, fieldType schemapb.DataType, wrongField int64, wrongType schemapb.DataType) {
// row count mismatch
Expand Down
24 changes: 12 additions & 12 deletions internal/util/importutil/binlog_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func Test_BinlogFileBool(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -388,7 +388,7 @@ func Test_BinlogFileInt8(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -446,7 +446,7 @@ func Test_BinlogFileInt16(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -503,7 +503,7 @@ func Test_BinlogFileInt32(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -560,7 +560,7 @@ func Test_BinlogFileInt64(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -617,7 +617,7 @@ func Test_BinlogFileFloat(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -674,7 +674,7 @@ func Test_BinlogFileDouble(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -778,7 +778,7 @@ func Test_BinlogFileJSON(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -858,7 +858,7 @@ func Test_BinlogFileArray(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -937,7 +937,7 @@ func Test_BinlogFileBinaryVector(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -1004,7 +1004,7 @@ func Test_BinlogFileFloatVector(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down Expand Up @@ -1072,7 +1072,7 @@ func Test_BinlogFileFloat16Vector(t *testing.T) {
binlogFile.Close()

// wrong log type
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false)
chunkManager.readBuf["dummy"] = createDeltalogBuf(t, []int64{1}, false, baseTimestamp)
err = binlogFile.Open("dummy")
assert.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion internal/util/importutil/binlog_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func Test_BinlogParserParse(t *testing.T) {

// progress
rowCount := 100
fieldsData := createFieldsData(sampleSchema(), rowCount)
fieldsData := createFieldsData(sampleSchema(), rowCount, baseTimestamp)
chunkManager.listResult["deltaPath"] = []string{}
chunkManager.listResult["insertPath"] = []string{
"123/0/a",
Expand Down
Loading
Loading