Skip to content

Commit ced2944

Browse files
devanbenzgwossum
andauthored
fix: correct locking bug when deleting a series (#26649) (#27143)
Co-authored-by: Geoffrey Wossum <gwossum@influxdata.com> fix: correct locking bug when deleting a series (#26649) closes: #26648
1 parent 2ff5d01 commit ced2944

2 files changed

Lines changed: 108 additions & 6 deletions

File tree

tsdb/engine/tsm1/engine.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1623,6 +1623,9 @@ func (e *Engine) deleteSeriesRange(ctx context.Context, seriesKeys [][]byte, min
16231623
// would delete it from the index.
16241624
minKey := seriesKeys[0]
16251625

1626+
// Ensure seriesKeys slice is correctly read and written concurrently in the Apply func.
1627+
var seriesKeysLock sync.RWMutex
1628+
16261629
// Apply runs this func concurrently. The seriesKeys slice is mutated concurrently
16271630
// by different goroutines setting positions to nil.
16281631
if err := e.FileStore.Apply(ctx, func(r TSMFile) error {
@@ -1639,18 +1642,28 @@ func (e *Engine) deleteSeriesRange(ctx context.Context, seriesKeys [][]byte, min
16391642
seriesKey, _ := SeriesAndFieldFromCompositeKey(indexKey)
16401643

16411644
// Skip over any deleted keys that are less than our tsm key
1642-
cmp := bytes.Compare(seriesKeys[j], seriesKey)
1643-
for j < len(seriesKeys) && cmp < 0 {
1644-
j++
1645-
if j >= len(seriesKeys) {
1646-
return nil
1645+
cmp, cont := func() (int, bool) {
1646+
seriesKeysLock.RLock()
1647+
defer seriesKeysLock.RUnlock()
1648+
cmp := bytes.Compare(seriesKeys[j], seriesKey)
1649+
for j < len(seriesKeys) && cmp < 0 {
1650+
j++
1651+
if j >= len(seriesKeys) {
1652+
return 0, false // don't continue processing seriesKeys.
1653+
}
1654+
cmp = bytes.Compare(seriesKeys[j], seriesKey)
16471655
}
1648-
cmp = bytes.Compare(seriesKeys[j], seriesKey)
1656+
return cmp, true // continue processing seriesKeys.
1657+
}()
1658+
if !cont {
1659+
return nil
16491660
}
16501661

16511662
// We've found a matching key, cross it out so we do not remove it from the index.
16521663
if j < len(seriesKeys) && cmp == 0 {
1664+
seriesKeysLock.Lock()
16531665
seriesKeys[j] = emptyBytes
1666+
seriesKeysLock.Unlock()
16541667
j++
16551668
}
16561669
}

tsdb/engine/tsm1/engine_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1355,6 +1355,95 @@ func TestEngine_DeleteSeriesRange(t *testing.T) {
13551355
}
13561356
}
13571357

1358+
// TestEngine_DeleteSeriesRange_MultiTSMLocking checks for locking issues when deleting series
1359+
// ranges from a large number of TSM files. This test is derived from TestEngine_DeleteSeriesRange.
1360+
func TestEngine_DeleteSeriesRange_MultiTSMLocking(t *testing.T) {
1361+
for _, index := range tsdb.RegisteredIndexes() {
1362+
t.Run(index, func(t *testing.T) {
1363+
e, err := NewEngine(t, index)
1364+
require.NoError(t, err)
1365+
1366+
// mock the planner so compactions don't run during the test
1367+
e.CompactionPlan = &mockPlanner{}
1368+
require.NoError(t, e.Open(t.Context()))
1369+
1370+
// In order to trigger potentially locking issues within DeleteSeriesRange, we must create much more
1371+
// TSM files than the number of CPUs. This combined with how the test data interleaves deleted and not
1372+
// deleted series within a TSM file triggers locking issues if they are present.
1373+
numTSMFiles := runtime.NumCPU() * 20
1374+
var ts int64
1375+
for range numTSMFiles {
1376+
ts += 1000000000
1377+
1378+
// Create a few points.
1379+
p1 := MustParsePointString(fmt.Sprintf("cpu,host=0 value=1.1 %d", ts+5000000000)) // Should not be deleted
1380+
p2 := MustParsePointString(fmt.Sprintf("cpu,host=A value=1.2 %d", ts+1000000000))
1381+
p3 := MustParsePointString(fmt.Sprintf("cpu,host=A value=1.3 %d", ts+2000000000))
1382+
p4 := MustParsePointString(fmt.Sprintf("cpu,host=B value=1.3 %d", ts+3000000000)) // Should not be deleted
1383+
p5 := MustParsePointString(fmt.Sprintf("cpu,host=B value=1.3 %d", ts+4000000000)) // Should not be deleted
1384+
p6 := MustParsePointString(fmt.Sprintf("cpu,host=C value=1.3 %d", ts))
1385+
p7 := MustParsePointString(fmt.Sprintf("mem,host=C value=1.3 %d", ts)) // Should not be deleted
1386+
p8 := MustParsePointString(fmt.Sprintf("disk,host=C value=1.3 %d", ts)) // Should not be deleted
1387+
1388+
for _, p := range []models.Point{p1, p2, p3, p4, p5, p6, p7, p8} {
1389+
require.NoError(t, e.CreateSeriesIfNotExists(p.Key(), p.Name(), p.Tags()), "create series index error")
1390+
}
1391+
1392+
require.NoError(t, e.WritePoints(t.Context(), []models.Point{p1, p2, p3, p4, p5, p6, p7, p8}), "failed to write points")
1393+
require.NoError(t, e.WriteSnapshot(), "failed to snapshot")
1394+
}
1395+
1396+
keys := e.FileStore.Keys()
1397+
require.Len(t, keys, 6, "series count mismatch")
1398+
1399+
itr := &seriesIterator{keys: [][]byte{[]byte("cpu,host=0"), []byte("cpu,host=A"), []byte("cpu,host=B"), []byte("cpu,host=C")}}
1400+
require.NoError(t, e.DeleteSeriesRange(t.Context(), itr, 0, ts+2000000000), "failed to delete series")
1401+
1402+
keys = e.FileStore.Keys()
1403+
require.Len(t, keys, 4, "series count mismatch")
1404+
1405+
require.Contains(t, keys, "cpu,host=B#!~#value", "wrong series deleted")
1406+
1407+
// Check that the series still exists in the index
1408+
indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
1409+
iter, err := indexSet.MeasurementSeriesIDIterator([]byte("cpu"))
1410+
require.NoError(t, err, "iterator error")
1411+
defer func() {
1412+
if iter != nil {
1413+
require.NoError(t, iter.Close())
1414+
}
1415+
}()
1416+
1417+
elem, err := iter.Next()
1418+
require.NoError(t, err)
1419+
require.NotEqual(t, uint64(0), elem.SeriesID, "series index mismatch: EOF, exp 2 series")
1420+
1421+
// Lookup series.
1422+
name, tags := e.sfile.Series(elem.SeriesID)
1423+
require.Equal(t, []byte("cpu"), name, "series mismatch")
1424+
1425+
require.True(t, tags.Equal(models.NewTags(map[string]string{"host": "0"})) || tags.Equal(models.NewTags(map[string]string{"host": "B"})),
1426+
`series mismatch: got %s, exp either "host=0" or "host=B"`, tags)
1427+
require.NoError(t, iter.Close())
1428+
1429+
// Deleting remaining series should remove them from the series.
1430+
itr = &seriesIterator{keys: [][]byte{[]byte("cpu,host=0"), []byte("cpu,host=B")}}
1431+
require.NoError(t, e.DeleteSeriesRange(t.Context(), itr, 0, ts+9000000000), "failed to delete series")
1432+
1433+
indexSet = tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile}
1434+
iter, err = indexSet.MeasurementSeriesIDIterator([]byte("cpu"))
1435+
require.NoError(t, err, "iterator error")
1436+
if iter == nil {
1437+
return
1438+
}
1439+
1440+
elem, err = iter.Next()
1441+
require.NoError(t, err)
1442+
require.Equal(t, uint64(0), elem.SeriesID, "got an undeleted series id, but series should be dropped from index")
1443+
})
1444+
}
1445+
}
1446+
13581447
func TestEngine_DeleteSeriesRangeWithPredicate(t *testing.T) {
13591448
for _, index := range tsdb.RegisteredIndexes() {
13601449
t.Run(index, func(t *testing.T) {

0 commit comments

Comments
 (0)