Skip to content

Commit

Permalink
Protect DataCoord from calculating segment lines by stale log entries…
Browse files Browse the repository at this point in the history
… num

Signed-off-by: yah01 <yang.cen@zilliz.com>
  • Loading branch information
yah01 committed Mar 28, 2023
1 parent a3e4b74 commit 5a32fc6
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 7 deletions.
10 changes: 6 additions & 4 deletions internal/datacoord/meta.go
Expand Up @@ -519,14 +519,15 @@ func (m *meta) UpdateFlushSegmentsInfo(

if importing {
s := clonedSegment
s.NumOfRows = s.currRows
count := segmentutil.CalcRowCountFromBinLog(s.SegmentInfo)
if count != segment.currRows {
if count != segment.currRows && count > 0 {
log.Info("check point reported inconsistent with bin log row count",
zap.Int64("segment ID", segment.GetID()),
zap.Int64("current rows (wrong)", segment.currRows),
zap.Int64("segment bin log row count (correct)", count))
s.NumOfRows = count

Check warning on line 529 in internal/datacoord/meta.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/meta.go#L529

Added line #L529 was not covered by tests
}
s.NumOfRows = count
modSegments[segmentID] = s
} else {
for _, cp := range checkpoints {
Expand All @@ -543,15 +544,16 @@ func (m *meta) UpdateFlushSegmentsInfo(
continue
}

s.NumOfRows = cp.NumOfRows
count := segmentutil.CalcRowCountFromBinLog(s.SegmentInfo)
// count should smaller than or equal to cp reported
if count != cp.NumOfRows {
if count != cp.NumOfRows && count > 0 {
log.Info("check point reported inconsistent with bin log row count",
zap.Int64("segment ID", segment.GetID()),
zap.Int64("check point (wrong)", cp.NumOfRows),
zap.Int64("segment bin log row count (correct)", count))
s.NumOfRows = count
}
s.NumOfRows = count

s.DmlPosition = cp.GetPosition()
modSegments[cp.GetSegmentID()] = s
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/meta_test.go
Expand Up @@ -384,7 +384,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
err = meta.AddSegment(segment1)
assert.Nil(t, err)

err = meta.UpdateFlushSegmentsInfo(1, true, false, true, []*datapb.FieldBinlog{getFieldBinlogPathsWithEntry(1, 10, getInsertLogPath("binlog1", 1))},
err = meta.UpdateFlushSegmentsInfo(1, true, false, false, []*datapb.FieldBinlog{getFieldBinlogPathsWithEntry(1, 10, getInsertLogPath("binlog1", 1))},
[]*datapb.FieldBinlog{getFieldBinlogPaths(1, getStatsLogPath("statslog1", 1))},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1)}}}},
[]*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/services.go
Expand Up @@ -724,7 +724,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
segment2Binlogs[id] = append(segment2Binlogs[id], fieldBinlogs)
}

if newCount := segmentutil.CalcRowCountFromBinLog(segment.SegmentInfo); newCount != segment.NumOfRows {
if newCount := segmentutil.CalcRowCountFromBinLog(segment.SegmentInfo); newCount != segment.NumOfRows && newCount > 0 {
log.Warn("segment row number meta inconsistent with bin log row count and will be corrected",
zap.Int64("segment ID", segment.GetID()),
zap.Int64("segment meta row count (wrong)", segment.GetNumOfRows()),
Expand Down
6 changes: 5 additions & 1 deletion internal/util/segmentutil/utils.go
Expand Up @@ -11,7 +11,7 @@ import (
// Note that `segCloned` should be a copied version of `seg`.
func ReCalcRowCount(seg, segCloned *datapb.SegmentInfo) {
// `segment` is not mutated but only cloned above and is safe to be referred here.
if newCount := CalcRowCountFromBinLog(seg); newCount != seg.GetNumOfRows() {
if newCount := CalcRowCountFromBinLog(seg); newCount != seg.GetNumOfRows() && newCount > 0 {
log.Warn("segment row number meta inconsistent with bin log row count and will be corrected",
zap.Int64("segment ID", seg.GetID()),
zap.Int64("segment meta row count (wrong)", seg.GetNumOfRows()),
Expand All @@ -27,6 +27,10 @@ func CalcRowCountFromBinLog(seg *datapb.SegmentInfo) int64 {
if len(seg.GetBinlogs()) > 0 {
for _, ct := range seg.GetBinlogs()[0].GetBinlogs() {
rowCt += ct.GetEntriesNum()
// This segment contains stale log with incorrect entries num,
if ct.GetEntriesNum() <= 0 {
return -1
}
}
}
return rowCt
Expand Down

0 comments on commit 5a32fc6

Please sign in to comment.