Permalink
Browse files

Limit maximum number of concurrent queries.

A high number of concurrent queries can slow each other down
so that none of them is reasonbly responsive. This commit limits
the number of queries being concurrently executed.
  • Loading branch information...
fabxc committed Apr 30, 2015
1 parent d59d1cb commit 9ab1f6c690940187b227cdd6ec591349abf0f5aa
Showing with 107 additions and 24 deletions.
  1. +52 −7 promql/engine.go
  2. +52 −14 promql/engine_test.go
  3. +3 −3 stats/query_stats.go
@@ -31,8 +31,9 @@ import (
)

var (
stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.")
defaultQueryTimeout = flag.Duration("query.timeout", 2*time.Minute, "Maximum time a query may take before being aborted.")
stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.")
defaultQueryTimeout = flag.Duration("query.timeout", 2*time.Minute, "Maximum time a query may take before being aborted.")
maxConcurrentQueries = flag.Int("query.max-concurrency", 20, "Maximum number of queries executed concurrently.")
)

// SampleStream is a stream of Values belonging to an attached COWMetric.
@@ -215,10 +216,7 @@ func (q *query) Cancel() {

// Exec implements the Query interface.
func (q *query) Exec() *Result {
ctx, cancel := context.WithTimeout(q.ng.baseCtx, *defaultQueryTimeout)
q.cancel = cancel

res, err := q.ng.exec(ctx, q)
res, err := q.ng.exec(q)
return &Result{Err: err, Value: res}
}

@@ -249,6 +247,8 @@ type Engine struct {
// The base context for all queries and its cancellation function.
baseCtx context.Context
cancelQueries func()
// The gate limiting the maximum number of concurrent and waiting queries.
gate *queryGate
}

// NewEngine returns a new engine.
@@ -258,6 +258,7 @@ func NewEngine(storage local.Storage) *Engine {
storage: storage,
baseCtx: ctx,
cancelQueries: cancel,
gate: newQueryGate(*maxConcurrentQueries),
}
}

@@ -316,9 +317,21 @@ func (ng *Engine) newTestQuery(stmts ...Statement) Query {
//
// At this point per query only one EvalStmt is evaluated. Alert and record
// statements are not handled by the Engine.
func (ng *Engine) exec(ctx context.Context, q *query) (Value, error) {
func (ng *Engine) exec(q *query) (Value, error) {
const env = "query execution"

ctx, cancel := context.WithTimeout(q.ng.baseCtx, *defaultQueryTimeout)
q.cancel = cancel

queueTimer := q.stats.GetTimer(stats.ExecQueueTime).Start()

if err := ng.gate.Start(ctx); err != nil {
return nil, err
}
defer ng.gate.Done()

queueTimer.Stop()

// Cancel when execution is done or an error was raised.
defer q.cancel()

@@ -1125,3 +1138,35 @@ func interpolateSamples(first, second *metric.SamplePair, timestamp clientmodel.
Timestamp: timestamp,
}
}

// A queryGate controls the maximum number of concurrently running and waiting queries.
type queryGate struct {
ch chan struct{}
}

// newQueryGate returns a query gate that limits the number of queries
// being concurrently executed.
func newQueryGate(length int) *queryGate {
return &queryGate{
ch: make(chan struct{}, length),
}
}

// Start blocks until the gate has a free spot or the context is done.
func (g *queryGate) Start(ctx context.Context) error {
select {
case <-ctx.Done():
return contextDone(ctx, "query queue")
case g.ch <- struct{}{}:
return nil
}
}

// Done releases a single spot in the gate.
func (g *queryGate) Done() {
select {
case <-g.ch:
default:
panic("engine.queryGate.Done: more operations done than started")
}
}
@@ -6,25 +6,69 @@ import (
"time"

"golang.org/x/net/context"

"github.com/prometheus/prometheus/storage/local"
)

var noop = testStmt(func(context.Context) error {
return nil
})

func TestQueryConcurreny(t *testing.T) {
engine := NewEngine(nil)
defer engine.Stop()

block := make(chan struct{})
processing := make(chan struct{})
f1 := testStmt(func(context.Context) error {
processing <- struct{}{}
<-block
return nil
})

for i := 0; i < *maxConcurrentQueries; i++ {
q := engine.newTestQuery(f1)
go q.Exec()
select {
case <-processing:
// Expected.
case <-time.After(5 * time.Millisecond):
t.Fatalf("Query within concurrency threshold not being executed")
}
}

q := engine.newTestQuery(f1)
go q.Exec()

select {
case <-processing:
t.Fatalf("Query above concurrency threhosld being executed")
case <-time.After(5 * time.Millisecond):
// Expected.
}

// Terminate a running query.
block <- struct{}{}

select {
case <-processing:
// Expected.
case <-time.After(5 * time.Millisecond):
t.Fatalf("Query within concurrency threshold not being executed")
}

// Terminate remaining queries.
for i := 0; i < *maxConcurrentQueries; i++ {
block <- struct{}{}
}
}

func TestQueryTimeout(t *testing.T) {
*defaultQueryTimeout = 5 * time.Millisecond
defer func() {
// Restore default query timeout
*defaultQueryTimeout = 2 * time.Minute
}()

storage, closer := local.NewTestStorage(t, 1)
defer closer.Close()

engine := NewEngine(storage)
engine := NewEngine(nil)
defer engine.Stop()

f1 := testStmt(func(context.Context) error {
@@ -46,10 +90,7 @@ func TestQueryTimeout(t *testing.T) {
}

func TestQueryCancel(t *testing.T) {
storage, closer := local.NewTestStorage(t, 1)
defer closer.Close()

engine := NewEngine(storage)
engine := NewEngine(nil)
defer engine.Stop()

// As for timeouts, cancellation is only checked at designated points. We ensure
@@ -91,10 +132,7 @@ func TestQueryCancel(t *testing.T) {
}

func TestEngineShutdown(t *testing.T) {
storage, closer := local.NewTestStorage(t, 1)
defer closer.Close()

engine := NewEngine(storage)
engine := NewEngine(nil)

handlerExecutions := 0
// Shutdown engine on first handler execution. Should handler execution ever become
@@ -31,7 +31,7 @@ const (
GetValueAtTimeTime
GetBoundaryValuesTime
GetRangeValuesTime
ViewQueueTime
ExecQueueTime
ViewDiskPreparationTime
ViewDataExtractionTime
ViewDiskExtractionTime
@@ -64,8 +64,8 @@ func (s QueryTiming) String() string {
return "GetBoundaryValues() time"
case GetRangeValuesTime:
return "GetRangeValues() time"
case ViewQueueTime:
return "View queue wait time"
case ExecQueueTime:
return "Exec queue wait time"
case ViewDiskPreparationTime:
return "View building disk preparation time"
case ViewDataExtractionTime:

0 comments on commit 9ab1f6c

Please sign in to comment.