Skip to content

Commit

Permalink
fix(influxql): make meta queries respect query timeout (#21545)
Browse files Browse the repository at this point in the history

Co-authored-by: davidby-influx <dbyrne@influxdata.com>
  • Loading branch information
danxmoran and davidby-influx committed May 25, 2021
1 parent b1e1125 commit 00420fb
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 95 deletions.
6 changes: 3 additions & 3 deletions cmd/influxd/launcher/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Engine interface {
influxdb.BackupService
influxdb.RestoreService

SeriesCardinality(orgID, bucketID platform.ID) int64
SeriesCardinality(ctx context.Context, bucketID platform.ID) int64

TSDBStore() storage.TSDBStore
MetaClient() storage.MetaClient
Expand Down Expand Up @@ -116,8 +116,8 @@ func (t *TemporaryEngine) WritePoints(ctx context.Context, orgID platform.ID, bu
}

// SeriesCardinality returns the number of series in the engine.
func (t *TemporaryEngine) SeriesCardinality(orgID, bucketID platform.ID) int64 {
return t.engine.SeriesCardinality(orgID, bucketID)
func (t *TemporaryEngine) SeriesCardinality(ctx context.Context, bucketID platform.ID) int64 {
return t.engine.SeriesCardinality(ctx, bucketID)
}

// DeleteBucketRangePredicate will delete a bucket from the range and predicate.
Expand Down
4 changes: 2 additions & 2 deletions cmd/influxd/launcher/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestLauncher_BucketDelete(t *testing.T) {

// Verify the cardinality in the engine.
engine := l.Launcher.Engine()
if got, exp := engine.SeriesCardinality(l.Org.ID, l.Bucket.ID), int64(1); got != exp {
if got, exp := engine.SeriesCardinality(ctx, l.Bucket.ID), int64(1); got != exp {
t.Fatalf("got %d, exp %d", got, exp)
}

Expand All @@ -150,7 +150,7 @@ func TestLauncher_BucketDelete(t *testing.T) {
}

// Verify that the data has been removed from the storage engine.
if got, exp := engine.SeriesCardinality(l.Org.ID, l.Bucket.ID), int64(0); got != exp {
if got, exp := engine.SeriesCardinality(ctx, l.Bucket.ID), int64(0); got != exp {
t.Fatalf("after bucket delete got %d, exp %d", got, exp)
}
}
Expand Down
19 changes: 10 additions & 9 deletions internal/tsdb_store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package internal

import (
"context"
"io"
"time"

Expand Down Expand Up @@ -30,7 +31,7 @@ type TSDBStoreMock struct {
ImportShardFn func(id uint64, r io.Reader) error
MeasurementSeriesCountsFn func(database string) (measurements int, series int)
MeasurementsCardinalityFn func(database string) (int64, error)
MeasurementNamesFn func(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
MeasurementNamesFn func(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
OpenFn func() error
PathFn func() string
RestoreShardFn func(id uint64, r io.Reader) error
Expand All @@ -43,8 +44,8 @@ type TSDBStoreMock struct {
ShardRelativePathFn func(id uint64) (string, error)
ShardsFn func(ids []uint64) []*tsdb.Shard
StatisticsFn func(tags map[string]string) []models.Statistic
TagKeysFn func(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValuesFn func(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
TagKeysFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValuesFn func(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
WithLoggerFn func(log *zap.Logger)
WriteToShardFn func(shardID uint64, points []models.Point) error
}
Expand Down Expand Up @@ -92,8 +93,8 @@ func (s *TSDBStoreMock) ExpandSources(sources influxql.Sources) (influxql.Source
func (s *TSDBStoreMock) ImportShard(id uint64, r io.Reader) error {
return s.ImportShardFn(id, r)
}
func (s *TSDBStoreMock) MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
return s.MeasurementNamesFn(auth, database, cond)
func (s *TSDBStoreMock) MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
return s.MeasurementNamesFn(ctx, auth, database, cond)
}
func (s *TSDBStoreMock) MeasurementSeriesCounts(database string) (measurements int, series int) {
return s.MeasurementSeriesCountsFn(database)
Expand Down Expand Up @@ -137,11 +138,11 @@ func (s *TSDBStoreMock) Shards(ids []uint64) []*tsdb.Shard {
func (s *TSDBStoreMock) Statistics(tags map[string]string) []models.Statistic {
return s.StatisticsFn(tags)
}
func (s *TSDBStoreMock) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) {
return s.TagKeysFn(auth, shardIDs, cond)
func (s *TSDBStoreMock) TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error) {
return s.TagKeysFn(ctx, auth, shardIDs, cond)
}
func (s *TSDBStoreMock) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) {
return s.TagValuesFn(auth, shardIDs, cond)
func (s *TSDBStoreMock) TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error) {
return s.TagValuesFn(ctx, auth, shardIDs, cond)
}
func (s *TSDBStoreMock) WithLogger(log *zap.Logger) {
s.WithLoggerFn(log)
Expand Down
10 changes: 5 additions & 5 deletions storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ type MetaClient interface {
type TSDBStore interface {
DeleteMeasurement(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error)
ShardGroup(ids []uint64) tsdb.ShardGroup
Shards(ids []uint64) []*tsdb.Shard
TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
}

// NewEngine initialises a new storage engine, including a series file, index and
Expand Down Expand Up @@ -463,14 +463,14 @@ func (e *Engine) RestoreShard(ctx context.Context, shardID uint64, r io.Reader)
}

// SeriesCardinality returns the number of series in the engine.
func (e *Engine) SeriesCardinality(orgID, bucketID platform.ID) int64 {
func (e *Engine) SeriesCardinality(ctx context.Context, bucketID platform.ID) int64 {
e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil {
return 0
}

n, err := e.tsdbStore.SeriesCardinality(bucketID.String())
n, err := e.tsdbStore.SeriesCardinality(ctx, bucketID.String())
if err != nil {
return 0
}
Expand Down
83 changes: 67 additions & 16 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,13 @@ func (s *Store) Statistics(tags map[string]string) []models.Statistic {
statistics := make([]models.Statistic, 0, len(databases))
for _, database := range databases {
log := s.Logger.With(logger.Database(database))
sc, err := s.SeriesCardinality(database)
sc, err := s.SeriesCardinality(context.Background(), database)
if err != nil {
log.Info("Cannot retrieve series cardinality", zap.Error(err))
continue
}

mc, err := s.MeasurementsCardinality(database)
mc, err := s.MeasurementsCardinality(context.Background(), database)
if err != nil {
log.Info("Cannot retrieve measurement cardinality", zap.Error(err))
continue
Expand Down Expand Up @@ -1049,15 +1049,20 @@ func (s *Store) sketchesForDatabase(dbName string, getSketches func(*Shard) (est
// Cardinality is calculated exactly by unioning all shards' bitsets of series
// IDs. The result of this method cannot be combined with any other results.
//
func (s *Store) SeriesCardinality(database string) (int64, error) {
func (s *Store) SeriesCardinality(ctx context.Context, database string) (int64, error) {
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()

var setMu sync.Mutex
others := make([]*SeriesIDSet, 0, len(shards))

s.walkShards(shards, func(sh *Shard) error {
err := s.walkShards(shards, func(sh *Shard) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
index, err := sh.Index()
if err != nil {
return err
Expand All @@ -1070,9 +1075,17 @@ func (s *Store) SeriesCardinality(database string) (int64, error) {

return nil
})
if err != nil {
return 0, err
}

ss := NewSeriesIDSet()
ss.Merge(others...)
select {
case <-ctx.Done():
return 0, ctx.Err()
default:
}
return int64(ss.Cardinality()), nil
}

Expand All @@ -1081,8 +1094,13 @@ func (s *Store) SeriesCardinality(database string) (int64, error) {
//
// The returned sketches can be combined with other sketches to provide an
// estimation across distributed databases.
func (s *Store) SeriesSketches(database string) (estimator.Sketch, estimator.Sketch, error) {
func (s *Store) SeriesSketches(ctx context.Context, database string) (estimator.Sketch, estimator.Sketch, error) {
return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}
if sh == nil {
return nil, nil, errors.New("shard nil, can't get cardinality")
}
Expand All @@ -1095,13 +1113,8 @@ func (s *Store) SeriesSketches(database string) (estimator.Sketch, estimator.Ske
//
// Cardinality is calculated using a sketch-based estimation. The result of this
// method cannot be combined with any other results.
func (s *Store) MeasurementsCardinality(database string) (int64, error) {
ss, ts, err := s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
if sh == nil {
return nil, nil, errors.New("shard nil, can't get cardinality")
}
return sh.MeasurementsSketches()
})
func (s *Store) MeasurementsCardinality(ctx context.Context, database string) (int64, error) {
ss, ts, err := s.MeasurementsSketches(ctx, database)

if err != nil {
return 0, err
Expand All @@ -1114,8 +1127,14 @@ func (s *Store) MeasurementsCardinality(database string) (int64, error) {
//
// The returned sketches can be combined with other sketches to provide an
// estimation across distributed databases.
func (s *Store) MeasurementsSketches(database string) (estimator.Sketch, estimator.Sketch, error) {
func (s *Store) MeasurementsSketches(ctx context.Context, database string) (estimator.Sketch, estimator.Sketch, error) {
return s.sketchesForDatabase(database, func(sh *Shard) (estimator.Sketch, estimator.Sketch, error) {
// every iteration, check for timeout.
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
default:
}
if sh == nil {
return nil, nil, errors.New("shard nil, can't get cardinality")
}
Expand Down Expand Up @@ -1430,7 +1449,7 @@ func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
// MeasurementNames returns a slice of all measurements. Measurements accepts an
// optional condition expression. If cond is nil, then all measurements for the
// database will be returned.
func (s *Store) MeasurementNames(auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
func (s *Store) MeasurementNames(ctx context.Context, auth query.Authorizer, database string, cond influxql.Expr) ([][]byte, error) {
s.mu.RLock()
shards := s.filterShards(byDatabase(database))
s.mu.RUnlock()
Expand All @@ -1449,6 +1468,11 @@ func (s *Store) MeasurementNames(auth query.Authorizer, database string, cond in
}
is.Indexes = append(is.Indexes, index)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
return is.MeasurementNamesByExpr(auth, cond)
}

Expand All @@ -1471,7 +1495,7 @@ func (a TagKeysSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a TagKeysSlice) Less(i, j int) bool { return a[i].Measurement < a[j].Measurement }

// TagKeys returns the tag keys in the given database, matching the condition.
func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagKeys, error) {
func (s *Store) TagKeys(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagKeys, error) {
if len(shardIDs) == 0 {
return nil, nil
}
Expand Down Expand Up @@ -1543,6 +1567,13 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.
var results []TagKeys
for _, name := range names {

// Check for timeouts.
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

// Build keyset over all indexes for measurement.
tagKeySet, err := is.MeasurementTagKeysByExpr(name, nil)
if err != nil {
Expand All @@ -1556,6 +1587,12 @@ func (s *Store) TagKeys(auth query.Authorizer, shardIDs []uint64, cond influxql.
// If they have authorized series associated with them.
if filterExpr == nil {
for tagKey := range tagKeySet {
// check for timeouts
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
ok, err := is.TagKeyHasAuthorizedSeries(auth, []byte(name), []byte(tagKey))
if err != nil {
return nil, err
Expand Down Expand Up @@ -1636,7 +1673,7 @@ func (a tagValuesSlice) Less(i, j int) bool { return bytes.Compare(a[i].name, a[

// TagValues returns the tag keys and values for the provided shards, where the
// tag values satisfy the provided condition.
func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagValues, error) {
func (s *Store) TagValues(ctx context.Context, auth query.Authorizer, shardIDs []uint64, cond influxql.Expr) ([]TagValues, error) {
if cond == nil {
return nil, errors.New("a condition is required")
}
Expand Down Expand Up @@ -1724,6 +1761,13 @@ func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxq
// values from matching series. Series may be filtered using a WHERE
// filter.
for _, name := range names {
// check for timeouts
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

// Determine a list of keys from condition.
keySet, err := is.MeasurementTagKeysByExpr(name, cond)
if err != nil {
Expand Down Expand Up @@ -1786,6 +1830,13 @@ func (s *Store) TagValues(auth query.Authorizer, shardIDs []uint64, cond influxq
// instances of tagValues for a given measurement.
idxBuf := make([][2]int, 0, len(is.Indexes))
for i < len(allResults) {
// check for timeouts
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

// Gather all occurrences of the same measurement for merging.
for j+1 < len(allResults) && bytes.Equal(allResults[j+1].name, allResults[i].name) {
j++
Expand Down
Loading

0 comments on commit 00420fb

Please sign in to comment.