diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index b37bda537a90..3ef9e0dc3a22 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -689,7 +689,7 @@ func TestUpdateSegmentsInfo(t *testing.T) { AddBinlogsOperator(1, []*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 10, 1)}, []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1)}, - []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1), LogID: 2}}}}, + []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: "", LogID: 2}}}}, ), UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}), UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}), @@ -830,7 +830,7 @@ func TestUpdateSegmentsInfo(t *testing.T) { AddBinlogsOperator(1, []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, - []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog", 1), LogID: 2}}}}, + []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: "", LogID: 2}}}}, ), UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}), UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}), diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 5946934e8828..d796c0c1a2dc 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -150,7 +150,7 @@ func (s *SegmentsInfo) DropSegment(segmentID UniqueID) { } // SetSegment sets SegmentInfo with segmentID, perform overwrite if already exists -// set the logPath of segement in meta empty, to save space +// set the logPath of segment in meta empty, to save space // if segment has logPath, make it empty func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) { if segment, ok := s.segments[segmentID]; ok { diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index c3b45d6cc1ba..42a0e9efc6ef 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -35,7 +35,6 @@ import ( "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/etcd" - "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -649,7 +648,6 @@ func TestTryToSealSegment(t *testing.T) { { EntriesNum: 10, LogID: 3, - LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 2, 3), }, }, }, @@ -674,12 +672,10 @@ func TestTryToSealSegment(t *testing.T) { { EntriesNum: 10, LogID: 1, - LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 1, 3), }, { EntriesNum: 20, LogID: 2, - LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 1, 2), }, }, }, diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 69b59f4ec9dd..94ed5ca73802 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -57,7 +57,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" - "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tikv" @@ -319,17 +318,14 @@ func TestGetSegmentInfo(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 801), LogID: 801, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 802), LogID: 802, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 803), LogID: 803, }, }, @@ -343,10 +339,10 @@ func TestGetSegmentInfo(t *testing.T) { SegmentIDs: []int64{0}, } resp, err := svr.GetSegmentInfo(svr.ctx, req) + assert.NoError(t, err) assert.Equal(t, 1, len(resp.GetInfos())) // Check that # of rows is corrected from 100 to 60. assert.EqualValues(t, 60, resp.GetInfos()[0].GetNumOfRows()) - assert.NoError(t, err) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) t.Run("with wrong segmentID", func(t *testing.T) { @@ -1823,17 +1819,14 @@ func TestGetRecoveryInfo(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901), LogID: 901, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902), LogID: 902, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903), LogID: 903, }, }, @@ -1846,12 +1839,10 @@ func TestGetRecoveryInfo(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 30, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801), LogID: 801, }, { EntriesNum: 70, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802), LogID: 802, }, }, @@ -1925,17 +1916,14 @@ func TestGetRecoveryInfo(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901), LogID: 901, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902), LogID: 902, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903), LogID: 903, }, }, @@ -1948,12 +1936,10 @@ func TestGetRecoveryInfo(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 30, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801), LogID: 801, }, { EntriesNum: 70, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802), LogID: 802, }, }, diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 0c7728f2eb17..d79fc5f0cf03 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -1138,17 +1138,14 @@ func TestGetRecoveryInfoV2(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901), LogID: 901, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902), LogID: 902, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903), LogID: 903, }, }, @@ -1161,12 +1158,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 30, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801), LogID: 801, }, { EntriesNum: 70, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802), LogID: 802, }, }, @@ -1243,17 +1238,14 @@ func TestGetRecoveryInfoV2(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901), LogID: 901, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902), LogID: 902, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903), LogID: 903, }, }, @@ -1266,12 +1258,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 30, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801), LogID: 801, }, { EntriesNum: 70, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802), LogID: 802, }, }, @@ -1318,12 +1308,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { FieldID: 1, Binlogs: []*datapb.Binlog{ { - LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801), - LogID: 801, + LogID: 801, }, { - LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801), - LogID: 801, + LogID: 801, }, }, }, @@ -1333,12 +1321,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { FieldID: 1, Binlogs: []*datapb.Binlog{ { - LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000), - LogID: 10000, + LogID: 10000, }, { - LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000), - LogID: 10000, + LogID: 10000, }, }, }, diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index fab1a9f9418b..f7aaa47ac6b3 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -313,6 +313,53 @@ func Test_AddSegments(t *testing.T) { assert.Equal(t, 4, len(savedKvs)) verifySavedKvsForSegment(t, savedKvs) }) + + t.Run("no need to store log path", func(t *testing.T) { + metakv := mocks.NewMetaKv(t) + catalog := NewCatalog(metakv, rootPath, "") + + validFieldBinlog := []*datapb.FieldBinlog{{ + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + LogID: 1, + LogPath: "", + }, + }, + }} + + invalidFieldBinlog := []*datapb.FieldBinlog{{ + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + LogID: 1, + LogPath: "no need to store", + }, + }, + }} + + segment := &datapb.SegmentInfo{ + ID: segmentID, + CollectionID: collectionID, + PartitionID: partitionID, + NumOfRows: 100, + State: commonpb.SegmentState_Flushed, + } + + segment.Statslogs = invalidFieldBinlog + err := catalog.AddSegment(context.TODO(), segment) + assert.Error(t, err) + segment.Statslogs = validFieldBinlog + + segment.Binlogs = invalidFieldBinlog + err = catalog.AddSegment(context.TODO(), segment) + assert.Error(t, err) + segment.Binlogs = validFieldBinlog + + segment.Deltalogs = invalidFieldBinlog + err = catalog.AddSegment(context.TODO(), segment) + assert.Error(t, err) + }) } func Test_AlterSegments(t *testing.T) { diff --git a/internal/metastore/kv/datacoord/util.go b/internal/metastore/kv/datacoord/util.go index c94459e3c649..aa14da999822 100644 --- a/internal/metastore/kv/datacoord/util.go +++ b/internal/metastore/kv/datacoord/util.go @@ -169,6 +169,9 @@ func buildBinlogKvs(collectionID, partitionID, segmentID typeutil.UniqueID, binl if binlog.GetLogID() == 0 { return fmt.Errorf("invalid log id, binlog:%v", binlog) } + if binlog.GetLogPath() != "" { + return fmt.Errorf("fieldBinlog no need to store logpath, binlog:%v", binlog) + } } return nil }