Skip to content

Commit

Permalink
Limit the maximum number of concurrent queries
Browse files Browse the repository at this point in the history
Fixes #6079.
  • Loading branch information
jsternberg committed Mar 22, 2016
1 parent 6bf6491 commit abae1cf
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -7,6 +7,7 @@
- [#5744](https://github.com/influxdata/influxdb/issues/5744): Add integer literal support to the query language.
- [#5939](https://github.com/influxdata/influxdb/issues/5939): Support viewing and killing running queries.
- [#6073](https://github.com/influxdata/influxdb/pulls/6073): Iterator stats
- [#6079](https://github.com/influxdata/influxdb/issues/6079): Limit the maximum number of concurrent queries.

### Bugfixes

Expand Down
6 changes: 6 additions & 0 deletions cluster/config.go
Expand Up @@ -19,6 +19,10 @@ const (
// DefaultMaxRemoteWriteConnections is the maximum number of open connections
// that will be available for remote writes to another host.
DefaultMaxRemoteWriteConnections = 3

// DefaultMaxConcurrentQueries is the maximum number of running queries.
// A value of zero will make the maximum query limit unlimited.
DefaultMaxConcurrentQueries = 0
)

// Config represents the configuration for the clustering service.
Expand All @@ -28,6 +32,7 @@ type Config struct {
ShardWriterTimeout toml.Duration `toml:"shard-writer-timeout"`
MaxRemoteWriteConnections int `toml:"max-remote-write-connections"`
ShardMapperTimeout toml.Duration `toml:"shard-mapper-timeout"`
MaxConcurrentQueries int `toml:"max-concurrent-queries"`
}

// NewConfig returns an instance of Config with defaults.
Expand All @@ -37,5 +42,6 @@ func NewConfig() Config {
ShardWriterTimeout: toml.Duration(DefaultShardWriterTimeout),
ShardMapperTimeout: toml.Duration(DefaultShardMapperTimeout),
MaxRemoteWriteConnections: DefaultMaxRemoteWriteConnections,
MaxConcurrentQueries: DefaultMaxConcurrentQueries,
}
}
2 changes: 1 addition & 1 deletion cmd/influxd/run/server.go
Expand Up @@ -167,7 +167,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
s.QueryExecutor.TSDBStore = s.TSDBStore
s.QueryExecutor.Monitor = s.Monitor
s.QueryExecutor.PointsWriter = s.PointsWriter
s.QueryExecutor.QueryManager = influxql.DefaultQueryManager()
s.QueryExecutor.QueryManager = influxql.DefaultQueryManager(c.Cluster.MaxConcurrentQueries)
if c.Data.QueryLogEnabled {
s.QueryExecutor.LogOutput = os.Stderr
}
Expand Down
22 changes: 16 additions & 6 deletions influxql/query_manager.go
Expand Up @@ -11,6 +11,10 @@ import (

var (
ErrNoQueryManager = errors.New("no query manager available")

// ErrMaxConcurrentQueriesReached is an error when a query cannot be run
// because the maximum number of queries has been reached.
ErrMaxConcurrentQueriesReached = errors.New("max concurrent queries reached")
)

// QueryTaskInfo holds information about a currently running query.
Expand Down Expand Up @@ -53,10 +57,11 @@ type QueryManager interface {
Queries() []QueryTaskInfo
}

func DefaultQueryManager() QueryManager {
func DefaultQueryManager(maxQueries int) QueryManager {
return &defaultQueryManager{
queries: make(map[uint64]*queryTask),
nextID: 1,
queries: make(map[uint64]*queryTask),
nextID: 1,
maxQueries: maxQueries,
}
}

Expand Down Expand Up @@ -99,15 +104,20 @@ type queryTask struct {
}

type defaultQueryManager struct {
queries map[uint64]*queryTask
nextID uint64
mu sync.Mutex
queries map[uint64]*queryTask
nextID uint64
maxQueries int
mu sync.Mutex
}

func (qm *defaultQueryManager) AttachQuery(params *QueryParams) (uint64, <-chan struct{}, error) {
qm.mu.Lock()
defer qm.mu.Unlock()

if qm.maxQueries > 0 && len(qm.queries) >= qm.maxQueries {
return 0, nil, ErrMaxConcurrentQueriesReached
}

qid := qm.nextID
query := &queryTask{
query: params.Query.String(),
Expand Down
32 changes: 28 additions & 4 deletions influxql/query_manager_test.go
Expand Up @@ -14,7 +14,7 @@ func TestQueryManager_AttachQuery(t *testing.T) {
t.Fatal(err)
}

qm := influxql.DefaultQueryManager()
qm := influxql.DefaultQueryManager(0)
params := influxql.QueryParams{
Query: q,
Database: `mydb`,
Expand All @@ -37,7 +37,7 @@ func TestQueryManager_KillQuery(t *testing.T) {
t.Fatal(err)
}

qm := influxql.DefaultQueryManager()
qm := influxql.DefaultQueryManager(0)
params := influxql.QueryParams{
Query: q,
Database: `mydb`,
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestQueryManager_Interrupt(t *testing.T) {
}

closing := make(chan struct{})
qm := influxql.DefaultQueryManager()
qm := influxql.DefaultQueryManager(0)
params := influxql.QueryParams{
Query: q,
Database: `mydb`,
Expand All @@ -93,7 +93,7 @@ func TestQueryManager_Queries(t *testing.T) {
t.Fatal(err)
}

qm := influxql.DefaultQueryManager()
qm := influxql.DefaultQueryManager(0)
params := influxql.QueryParams{
Query: q,
Database: `mydb`,
Expand Down Expand Up @@ -126,3 +126,27 @@ func TestQueryManager_Queries(t *testing.T) {
t.Errorf("expected 0 queries, got %d", len(queries))
}
}

func TestQueryManager_Limit_ConcurrentQueries(t *testing.T) {
q, err := influxql.ParseQuery(`SELECT count(value) FROM cpu`)
if err != nil {
t.Fatal(err)
}

qm := influxql.DefaultQueryManager(1)
params := influxql.QueryParams{
Query: q,
Database: `mydb`,
}

qid, _, err := qm.AttachQuery(&params)
if err != nil {
t.Fatal(err)
}
defer qm.KillQuery(qid)

_, _, err = qm.AttachQuery(&params)
if err == nil || err != influxql.ErrMaxConcurrentQueriesReached {
t.Errorf("unexpected error: %s", err)
}
}

0 comments on commit abae1cf

Please sign in to comment.