Skip to content

Commit 21ab484

Browse files
authored
fix: Resolves RLock() leakage in Store.DeleteSeries() (#26647) (#26852)
1 parent 61d9216 commit 21ab484

File tree

2 files changed

+120
-29
lines changed

2 files changed

+120
-29
lines changed

tsdb/store.go

Lines changed: 41 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ var (
3838
// ErrMultipleIndexTypes is returned when trying to do deletes on a database with
3939
// multiple index types.
4040
ErrMultipleIndexTypes = errors.New("cannot delete data. DB contains shards using both inmem and tsi1 indexes. Please convert all shards to use the same index type to delete data.")
41+
// ErrNothingToDelete is returned when where is nothing to do for DeleteSeries
42+
// this error is a noop
43+
ErrNothingToDelete = errors.New("nothing to delete")
4144
)
4245

4346
// Statistics gathered by the store.
@@ -1649,41 +1652,50 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
16491652
max = influxql.MaxTime
16501653
}
16511654

1652-
s.mu.RLock()
1653-
if s.databases[database].hasMultipleIndexTypes() {
1654-
s.mu.RUnlock()
1655-
return ErrMultipleIndexTypes
1656-
}
1657-
sfile := s.sfiles[database]
1658-
if sfile == nil {
1659-
s.mu.RUnlock()
1660-
// No series file means nothing has been written to this DB and thus nothing to delete.
1661-
return nil
1662-
}
1663-
1664-
shardFilterFn := byDatabase(database)
1665-
if len(sources) != 0 {
1666-
var rp string
1667-
for idx, source := range sources {
1668-
if measurement, ok := source.(*influxql.Measurement); ok {
1669-
if idx == 0 {
1670-
rp = measurement.RetentionPolicy
1671-
} else if rp != measurement.RetentionPolicy {
1672-
return fmt.Errorf("mixed retention policies not supported, wanted %q got %q", rp, measurement.RetentionPolicy)
1655+
getEpochsAndShards := func() (error, []*Shard, map[uint64]*epochTracker, *SeriesFile) {
1656+
s.mu.RLock()
1657+
defer s.mu.RUnlock()
1658+
if s.databases[database].hasMultipleIndexTypes() {
1659+
return ErrMultipleIndexTypes, nil, nil, nil
1660+
}
1661+
sfile := s.sfiles[database]
1662+
if sfile == nil {
1663+
// No series file means nothing has been written to this DB and thus nothing to delete.
1664+
return ErrNothingToDelete, nil, nil, nil
1665+
}
1666+
1667+
shardFilterFn := byDatabase(database)
1668+
if len(sources) != 0 {
1669+
var rp string
1670+
for idx, source := range sources {
1671+
if measurement, ok := source.(*influxql.Measurement); ok {
1672+
if idx == 0 {
1673+
rp = measurement.RetentionPolicy
1674+
} else if rp != measurement.RetentionPolicy {
1675+
return fmt.Errorf("mixed retention policies not supported, wanted %q got %q", rp, measurement.RetentionPolicy), nil, nil, nil
1676+
}
1677+
} else {
1678+
return fmt.Errorf("unsupported source type in delete %v", source), nil, nil, nil
16731679
}
1674-
} else {
1675-
return fmt.Errorf("unsupported source type in delete %v", source)
16761680
}
1677-
}
16781681

1679-
if rp != "" {
1680-
shardFilterFn = ComposeShardFilter(shardFilterFn, byRetentionPolicy(rp))
1682+
if rp != "" {
1683+
shardFilterFn = ComposeShardFilter(shardFilterFn, byRetentionPolicy(rp))
1684+
}
16811685
}
1686+
1687+
shards := s.filterShards(shardFilterFn)
1688+
epochs := s.epochsForShards(shards)
1689+
return nil, shards, epochs, sfile
16821690
}
1683-
shards := s.filterShards(shardFilterFn)
16841691

1685-
epochs := s.epochsForShards(shards)
1686-
s.mu.RUnlock()
1692+
err, shards, epochs, sfile := getEpochsAndShards()
1693+
if err != nil && !errors.Is(err, ErrNothingToDelete) {
1694+
s.Logger.Error("DeleteSeries failed", zap.String("error", err.Error()))
1695+
return err
1696+
} else if errors.Is(err, ErrNothingToDelete) {
1697+
return nil
1698+
}
16871699

16881700
// Limit deletes for each shard since expanding the measurement into the list
16891701
// of series keys can be very memory intensive if run concurrently.

tsdb/store_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2978,3 +2978,82 @@ func (m *mockStartupLogger) Tracked() []string {
29782978
copy(tracked, m._shardTracker)
29792979
return tracked
29802980
}
2981+
2982+
// TestStore_DeleteSeries_Deadlock tests the complete lock contention scenario
2983+
// where leaked read locks from DeleteSeries block write operations like CreateShard
2984+
func TestStore_DeleteSeries_Deadlock(t *testing.T) {
2985+
test := func(index string) {
2986+
s := MustOpenStore(t, index)
2987+
defer s.CloseStore(t, index)
2988+
2989+
err := s.CreateShard("db0", "rp0", 1, true)
2990+
require.NoError(t, err, "Create shard failure")
2991+
2992+
mixedSources := []influxql.Source{
2993+
&influxql.Measurement{Name: "measurement1", RetentionPolicy: "rp1"},
2994+
&influxql.Measurement{Name: "measurement2", RetentionPolicy: "rp2"},
2995+
}
2996+
2997+
leakCount := 10
2998+
for i := 0; i < leakCount; i++ {
2999+
err := s.DeleteSeries("db0", mixedSources, nil)
3000+
require.Contains(t, err.Error(), "mixed retention policies not supported")
3001+
}
3002+
3003+
results := make(chan string, 20)
3004+
startSignal := make(chan struct{})
3005+
3006+
writeOpsCount := 5
3007+
for i := 0; i < writeOpsCount; i++ {
3008+
go func(id int) {
3009+
<-startSignal
3010+
3011+
// Try CreateShard this needs s.mu.Lock() and should be blocked by RLocks
3012+
start := time.Now()
3013+
err := s.CreateShard("db0", "rp0", uint64(100+id), true)
3014+
duration := time.Since(start)
3015+
3016+
if err != nil {
3017+
results <- fmt.Sprintf("CreateShard[%d]: ERROR after %v: %v", id, duration, err)
3018+
} else {
3019+
results <- fmt.Sprintf("CreateShard[%d]: OK (%v)", id, duration)
3020+
}
3021+
}(i)
3022+
}
3023+
3024+
readOpsCount := 10
3025+
for i := 0; i < readOpsCount; i++ {
3026+
go func(id int) {
3027+
<-startSignal
3028+
3029+
// Try Shard() this needs s.mu.RLock()
3030+
start := time.Now()
3031+
shard := s.Shard(1)
3032+
duration := time.Since(start)
3033+
3034+
if shard == nil {
3035+
results <- fmt.Sprintf("Shard[%d]: ERROR after %v: shard not found", id, duration)
3036+
} else {
3037+
results <- fmt.Sprintf("Shard[%d]: OK (%v)", id, duration)
3038+
}
3039+
}(i)
3040+
}
3041+
3042+
// Start all operations simultaneously to create maximum contention
3043+
close(startSignal)
3044+
3045+
timeout := time.After(time.Second * 5)
3046+
3047+
for i := 0; i < writeOpsCount+readOpsCount; i++ {
3048+
select {
3049+
case <-results:
3050+
case <-timeout:
3051+
t.Fatal("Operation timed out - indicating severe lock contention")
3052+
}
3053+
}
3054+
}
3055+
3056+
for _, indexType := range tsdb.RegisteredIndexes() {
3057+
t.Run(indexType, func(t *testing.T) { test(indexType) })
3058+
}
3059+
}

0 commit comments

Comments
 (0)