Skip to content

Commit

Permalink
Tools: rewrite delete can delete series only if it matches all matche…
Browse files Browse the repository at this point in the history
…rs (#3886) (#3890)

* rewrite delete should delete a series only if it matches all matchers in a deletion request

Signed-off-by: yeya24 <yb532204897@gmail.com>

* add test case

Signed-off-by: yeya24 <yb532204897@gmail.com>

Co-authored-by: Ben Ye <yb532204897@gmail.com>
  • Loading branch information
bwplotka and yeya24 committed Mar 5, 2021
1 parent 26077e0 commit 92b4452
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 22 deletions.
32 changes: 32 additions & 0 deletions pkg/compactv2/compactor_test.go
Expand Up @@ -274,6 +274,38 @@ func TestCompactor_WriteSeries_e2e(t *testing.T) {
NumChunks: 2,
},
},
{
name: "1 blocks + delete modifier, deletion request contains multiple matchers, delete second series",
input: [][]seriesSamples{
{
{lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "b", Value: "2"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "3"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}},
},
},
modifiers: []Modifier{WithDeletionModifier(
metadata.DeletionRequest{
Matchers: []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "a", "1"),
labels.MustNewMatcher(labels.MatchEqual, "b", "2"),
},
})},
expected: []seriesSamples{
{lset: labels.Labels{{Name: "a", Value: "1"}, {Name: "b", Value: "1"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}},
{lset: labels.Labels{{Name: "a", Value: "3"}},
chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}},
},
expectedChanges: "Deleted {a=\"1\", b=\"2\"} [{0 20}]\n",
expectedStats: tsdb.BlockStats{
NumSamples: 12,
NumSeries: 2,
NumChunks: 2,
},
},
} {
t.Run(tcase.name, func(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "test-series-writer")
Expand Down
46 changes: 24 additions & 22 deletions pkg/compactv2/modifiers.go
Expand Up @@ -58,41 +58,43 @@ SeriesLoop:
lbls := s.Labels()

var intervals tombstones.Intervals
DeletionsLoop:
for _, deletions := range d.d.deletions {
for _, m := range deletions.Matchers {
v := lbls.Get(m.Name)
if v == "" {
continue
}

// Only if all matchers in the deletion request are matched can we proceed to deletion.
if !m.Matches(v) {
continue
continue DeletionsLoop
}
if len(deletions.Intervals) > 0 {
for _, in := range deletions.Intervals {
intervals = intervals.Add(in)
}
break
}
if len(deletions.Intervals) > 0 {
for _, in := range deletions.Intervals {
intervals = intervals.Add(in)
}
continue
}

// Special case: Delete whole series.
chksIter := s.Iterator()
var chks []chunks.Meta
for chksIter.Next() {
chks = append(chks, chksIter.At())
}
if d.err = chksIter.Err(); d.err != nil {
return false
}
// Special case: Delete whole series.
chksIter := s.Iterator()
var chks []chunks.Meta
for chksIter.Next() {
chks = append(chks, chksIter.At())
}
if d.err = chksIter.Err(); d.err != nil {
return false
}

var deleted tombstones.Intervals
if len(chks) > 0 {
deleted = deleted.Add(tombstones.Interval{Mint: chks[0].MinTime, Maxt: chks[len(chks)-1].MaxTime})
}
d.log.DeleteSeries(lbls, deleted)
d.p.SeriesProcessed()
continue SeriesLoop
var deleted tombstones.Intervals
if len(chks) > 0 {
deleted = deleted.Add(tombstones.Interval{Mint: chks[0].MinTime, Maxt: chks[len(chks)-1].MaxTime})
}
d.log.DeleteSeries(lbls, deleted)
d.p.SeriesProcessed()
continue SeriesLoop
}

d.curr = &storage.ChunkSeriesEntry{
Expand Down

0 comments on commit 92b4452

Please sign in to comment.