From c79f261b882d838dd016a266bef57a519f4fe564 Mon Sep 17 00:00:00 2001 From: yah01 Date: Tue, 28 Mar 2023 21:30:04 +0800 Subject: [PATCH] Protect DataCoord from calculating segment lines by stale log entries num (#23069) Signed-off-by: yah01 --- internal/datacoord/meta.go | 10 ++++++---- internal/datacoord/meta_test.go | 2 +- internal/datacoord/services.go | 2 +- internal/util/segmentutil/utils.go | 6 +++++- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index f074323bc811c..efa0ed55b9997 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -519,14 +519,15 @@ func (m *meta) UpdateFlushSegmentsInfo( if importing { s := clonedSegment + s.NumOfRows = s.currRows count := segmentutil.CalcRowCountFromBinLog(s.SegmentInfo) - if count != segment.currRows { + if count != segment.currRows && count > 0 { log.Info("check point reported inconsistent with bin log row count", zap.Int64("segment ID", segment.GetID()), zap.Int64("current rows (wrong)", segment.currRows), zap.Int64("segment bin log row count (correct)", count)) + s.NumOfRows = count } - s.NumOfRows = count modSegments[segmentID] = s } else { for _, cp := range checkpoints { @@ -543,15 +544,16 @@ func (m *meta) UpdateFlushSegmentsInfo( continue } + s.NumOfRows = cp.NumOfRows count := segmentutil.CalcRowCountFromBinLog(s.SegmentInfo) // count should smaller than or equal to cp reported - if count != cp.NumOfRows { + if count != cp.NumOfRows && count > 0 { log.Info("check point reported inconsistent with bin log row count", zap.Int64("segment ID", segment.GetID()), zap.Int64("check point (wrong)", cp.NumOfRows), zap.Int64("segment bin log row count (correct)", count)) + s.NumOfRows = count } - s.NumOfRows = count s.DmlPosition = cp.GetPosition() modSegments[cp.GetSegmentID()] = s diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 98ddaeae78699..64c88e1bc0f6e 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -384,7 +384,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) { err = meta.AddSegment(segment1) assert.Nil(t, err) - err = meta.UpdateFlushSegmentsInfo(1, true, false, true, []*datapb.FieldBinlog{getFieldBinlogPathsWithEntry(1, 10, getInsertLogPath("binlog1", 1))}, + err = meta.UpdateFlushSegmentsInfo(1, true, false, false, []*datapb.FieldBinlog{getFieldBinlogPathsWithEntry(1, 10, getInsertLogPath("binlog1", 1))}, []*datapb.FieldBinlog{getFieldBinlogPaths(1, getStatsLogPath("statslog1", 1))}, []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1)}}}}, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index c01736f9827a8..bcb2de6dc8d80 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -724,7 +724,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf segment2Binlogs[id] = append(segment2Binlogs[id], fieldBinlogs) } - if newCount := segmentutil.CalcRowCountFromBinLog(segment.SegmentInfo); newCount != segment.NumOfRows { + if newCount := segmentutil.CalcRowCountFromBinLog(segment.SegmentInfo); newCount != segment.NumOfRows && newCount > 0 { log.Warn("segment row number meta inconsistent with bin log row count and will be corrected", zap.Int64("segment ID", segment.GetID()), zap.Int64("segment meta row count (wrong)", segment.GetNumOfRows()), diff --git a/internal/util/segmentutil/utils.go b/internal/util/segmentutil/utils.go index 07f3046dfafd3..36ceeda269b4e 100644 --- a/internal/util/segmentutil/utils.go +++ b/internal/util/segmentutil/utils.go @@ -11,7 +11,7 @@ import ( // Note that `segCloned` should be a copied version of `seg`. func ReCalcRowCount(seg, segCloned *datapb.SegmentInfo) { // `segment` is not mutated but only cloned above and is safe to be referred here. - if newCount := CalcRowCountFromBinLog(seg); newCount != seg.GetNumOfRows() { + if newCount := CalcRowCountFromBinLog(seg); newCount != seg.GetNumOfRows() && newCount > 0 { log.Warn("segment row number meta inconsistent with bin log row count and will be corrected", zap.Int64("segment ID", seg.GetID()), zap.Int64("segment meta row count (wrong)", seg.GetNumOfRows()), @@ -27,6 +27,10 @@ func CalcRowCountFromBinLog(seg *datapb.SegmentInfo) int64 { if len(seg.GetBinlogs()) > 0 { for _, ct := range seg.GetBinlogs()[0].GetBinlogs() { rowCt += ct.GetEntriesNum() + // This segment contains stale log with incorrect entries num, + if ct.GetEntriesNum() <= 0 { + return -1 + } } } return rowCt