Skip to content

Commit

Permalink
Query timeout added.
Browse files Browse the repository at this point in the history
This is related to #454. Queries now timeout after a duration set by
the -query.timeout flag. The TotalEvalTimer is now started/stopped
inside any of the ast.Eval* functions.
  • Loading branch information
fabxc committed Feb 3, 2015
1 parent bd4d04f commit fa1e900
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 7 deletions.
24 changes: 22 additions & 2 deletions rules/ast/ast.go
Expand Up @@ -29,7 +29,16 @@ import (
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
) )


var stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.") var (
stalenessDelta = flag.Duration("query.staleness-delta", 300*time.Second, "Staleness delta allowance during expression evaluations.")
queryTimeout = flag.Duration("query.timeout", 2*time.Minute, "Maximum time a query may take before being aborted.")
)

type queryTimeoutError struct {
timeoutAfter time.Duration
}

func (e queryTimeoutError) Error() string { return fmt.Sprintf("query timeout after %v", e.timeoutAfter) }


// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// Raw data value types. // Raw data value types.
Expand Down Expand Up @@ -391,16 +400,24 @@ func labelsToKey(labels clientmodel.Metric) uint64 {


// EvalVectorInstant evaluates a VectorNode with an instant query. // EvalVectorInstant evaluates a VectorNode with an instant query.
func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (Vector, error) { func EvalVectorInstant(node VectorNode, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (Vector, error) {
totalEvalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start()
defer totalEvalTimer.Stop()

closer, err := prepareInstantQuery(node, timestamp, storage, queryStats) closer, err := prepareInstantQuery(node, timestamp, storage, queryStats)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer closer.Close() defer closer.Close()
if et := totalEvalTimer.ElapsedTime(); et > *queryTimeout {
return nil, queryTimeoutError{et}
}
return node.Eval(timestamp), nil return node.Eval(timestamp), nil
} }


// EvalVectorRange evaluates a VectorNode with a range query. // EvalVectorRange evaluates a VectorNode with a range query.
func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage local.Storage, queryStats *stats.TimerGroup) (Matrix, error) { func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage local.Storage, queryStats *stats.TimerGroup) (Matrix, error) {
totalEvalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start()
defer totalEvalTimer.Stop()
// Explicitly initialize to an empty matrix since a nil Matrix encodes to // Explicitly initialize to an empty matrix since a nil Matrix encodes to
// null in JSON. // null in JSON.
matrix := Matrix{} matrix := Matrix{}
Expand All @@ -413,10 +430,13 @@ func EvalVectorRange(node VectorNode, start clientmodel.Timestamp, end clientmod
} }
defer closer.Close() defer closer.Close()


// TODO implement watchdog timer for long-running queries.
evalTimer := queryStats.GetTimer(stats.InnerEvalTime).Start() evalTimer := queryStats.GetTimer(stats.InnerEvalTime).Start()
sampleStreams := map[uint64]*SampleStream{} sampleStreams := map[uint64]*SampleStream{}
for t := start; !t.After(end); t = t.Add(interval) { for t := start; !t.After(end); t = t.Add(interval) {
if et := totalEvalTimer.ElapsedTime(); et > *queryTimeout {
evalTimer.Stop()
return nil, queryTimeoutError{et}
}
vector := node.Eval(t) vector := node.Eval(t)
for _, sample := range vector { for _, sample := range vector {
samplePair := metric.SamplePair{ samplePair := metric.SamplePair{
Expand Down
6 changes: 6 additions & 0 deletions rules/ast/printer.go
Expand Up @@ -160,6 +160,9 @@ func TypedValueToJSON(data interface{}, typeStr string) string {


// EvalToString evaluates the given node into a string of the given format. // EvalToString evaluates the given node into a string of the given format.
func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputFormat, storage local.Storage, queryStats *stats.TimerGroup) string { func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputFormat, storage local.Storage, queryStats *stats.TimerGroup) string {
totalEvalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start()
defer totalEvalTimer.Stop()

prepareTimer := queryStats.GetTimer(stats.TotalQueryPreparationTime).Start() prepareTimer := queryStats.GetTimer(stats.TotalQueryPreparationTime).Start()
closer, err := prepareInstantQuery(node, timestamp, storage, queryStats) closer, err := prepareInstantQuery(node, timestamp, storage, queryStats)
prepareTimer.Stop() prepareTimer.Stop()
Expand Down Expand Up @@ -212,6 +215,9 @@ func EvalToString(node Node, timestamp clientmodel.Timestamp, format OutputForma


// EvalToVector evaluates the given node into a Vector. Matrices aren't supported. // EvalToVector evaluates the given node into a Vector. Matrices aren't supported.
func EvalToVector(node Node, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (Vector, error) { func EvalToVector(node Node, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (Vector, error) {
totalEvalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start()
defer totalEvalTimer.Stop()

prepareTimer := queryStats.GetTimer(stats.TotalQueryPreparationTime).Start() prepareTimer := queryStats.GetTimer(stats.TotalQueryPreparationTime).Start()
closer, err := prepareInstantQuery(node, timestamp, storage, queryStats) closer, err := prepareInstantQuery(node, timestamp, storage, queryStats)
prepareTimer.Stop() prepareTimer.Stop()
Expand Down
30 changes: 28 additions & 2 deletions rules/ast/query_analyzer.go
Expand Up @@ -110,22 +110,35 @@ func (i *iteratorInitializer) Visit(node Node) {
} }


func prepareInstantQuery(node Node, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (local.Preloader, error) { func prepareInstantQuery(node Node, timestamp clientmodel.Timestamp, storage local.Storage, queryStats *stats.TimerGroup) (local.Preloader, error) {
totalTimer := queryStats.GetTimer(stats.TotalEvalTime)

analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start() analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start()
analyzer := NewQueryAnalyzer(storage) analyzer := NewQueryAnalyzer(storage)
Walk(analyzer, node) Walk(analyzer, node)
analyzeTimer.Stop() analyzeTimer.Stop()


// TODO: Preloading should time out after a given duration.
preloadTimer := queryStats.GetTimer(stats.PreloadTime).Start() preloadTimer := queryStats.GetTimer(stats.PreloadTime).Start()
p := storage.NewPreloader() p := storage.NewPreloader()
for fp, rangeDuration := range analyzer.FullRanges { for fp, rangeDuration := range analyzer.FullRanges {
if et := totalTimer.ElapsedTime(); et > *queryTimeout {
preloadTimer.Stop()
p.Close()
return nil, queryTimeoutError{et}
}
if err := p.PreloadRange(fp, timestamp.Add(-rangeDuration), timestamp, *stalenessDelta); err != nil { if err := p.PreloadRange(fp, timestamp.Add(-rangeDuration), timestamp, *stalenessDelta); err != nil {
preloadTimer.Stop()
p.Close() p.Close()
return nil, err return nil, err
} }
} }
for fp := range analyzer.IntervalRanges { for fp := range analyzer.IntervalRanges {
if et := totalTimer.ElapsedTime(); et > *queryTimeout {
preloadTimer.Stop()
p.Close()
return nil, queryTimeoutError{et}
}
if err := p.PreloadRange(fp, timestamp, timestamp, *stalenessDelta); err != nil { if err := p.PreloadRange(fp, timestamp, timestamp, *stalenessDelta); err != nil {
preloadTimer.Stop()
p.Close() p.Close()
return nil, err return nil, err
} }
Expand All @@ -141,16 +154,23 @@ func prepareInstantQuery(node Node, timestamp clientmodel.Timestamp, storage loc
} }


func prepareRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage local.Storage, queryStats *stats.TimerGroup) (local.Preloader, error) { func prepareRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.Timestamp, interval time.Duration, storage local.Storage, queryStats *stats.TimerGroup) (local.Preloader, error) {
totalTimer := queryStats.GetTimer(stats.TotalEvalTime)

analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start() analyzeTimer := queryStats.GetTimer(stats.QueryAnalysisTime).Start()
analyzer := NewQueryAnalyzer(storage) analyzer := NewQueryAnalyzer(storage)
Walk(analyzer, node) Walk(analyzer, node)
analyzeTimer.Stop() analyzeTimer.Stop()


// TODO: Preloading should time out after a given duration.
preloadTimer := queryStats.GetTimer(stats.PreloadTime).Start() preloadTimer := queryStats.GetTimer(stats.PreloadTime).Start()
p := storage.NewPreloader() p := storage.NewPreloader()
for fp, rangeDuration := range analyzer.FullRanges { for fp, rangeDuration := range analyzer.FullRanges {
if et := totalTimer.ElapsedTime(); et > *queryTimeout {
preloadTimer.Stop()
p.Close()
return nil, queryTimeoutError{et}
}
if err := p.PreloadRange(fp, start.Add(-rangeDuration), end, *stalenessDelta); err != nil { if err := p.PreloadRange(fp, start.Add(-rangeDuration), end, *stalenessDelta); err != nil {
preloadTimer.Stop()
p.Close() p.Close()
return nil, err return nil, err
} }
Expand All @@ -169,7 +189,13 @@ func prepareRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.T
*/ */
} }
for fp := range analyzer.IntervalRanges { for fp := range analyzer.IntervalRanges {
if et := totalTimer.ElapsedTime(); et > *queryTimeout {
preloadTimer.Stop()
p.Close()
return nil, queryTimeoutError{et}
}
if err := p.PreloadRange(fp, start, end, *stalenessDelta); err != nil { if err := p.PreloadRange(fp, start, end, *stalenessDelta); err != nil {
preloadTimer.Stop()
p.Close() p.Close()
return nil, err return nil, err
} }
Expand Down
5 changes: 5 additions & 0 deletions stats/timer.go
Expand Up @@ -40,6 +40,11 @@ func (t *Timer) Stop() {
t.duration += time.Since(t.start) t.duration += time.Since(t.start)
} }


// ElapsedTime returns the time that passed since starting the timer.
func (t *Timer) ElapsedTime() time.Duration {
return time.Since(t.start)
}

// Return a string representation of the Timer. // Return a string representation of the Timer.
func (t *Timer) String() string { func (t *Timer) String() string {
return fmt.Sprintf("%s: %s", t.name, t.duration) return fmt.Sprintf("%s: %s", t.name, t.duration)
Expand Down
3 changes: 0 additions & 3 deletions web/api/query.go
Expand Up @@ -65,7 +65,6 @@ func (serv MetricsService) Query(w http.ResponseWriter, r *http.Request) {
} }


timestamp := clientmodel.TimestampFromTime(serv.time.Now()) timestamp := clientmodel.TimestampFromTime(serv.time.Now())

queryStats := stats.NewTimerGroup() queryStats := stats.NewTimerGroup()
result := ast.EvalToString(exprNode, timestamp, format, serv.Storage, queryStats) result := ast.EvalToString(exprNode, timestamp, format, serv.Storage, queryStats)
glog.V(1).Infof("Instant query: %s\nQuery stats:\n%s\n", expr, queryStats) glog.V(1).Infof("Instant query: %s\nQuery stats:\n%s\n", expr, queryStats)
Expand Down Expand Up @@ -123,7 +122,6 @@ func (serv MetricsService) QueryRange(w http.ResponseWriter, r *http.Request) {


queryStats := stats.NewTimerGroup() queryStats := stats.NewTimerGroup()


evalTimer := queryStats.GetTimer(stats.TotalEvalTime).Start()
matrix, err := ast.EvalVectorRange( matrix, err := ast.EvalVectorRange(
exprNode.(ast.VectorNode), exprNode.(ast.VectorNode),
clientmodel.TimestampFromUnixNano(end-duration), clientmodel.TimestampFromUnixNano(end-duration),
Expand All @@ -135,7 +133,6 @@ func (serv MetricsService) QueryRange(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, ast.ErrorToJSON(err)) fmt.Fprint(w, ast.ErrorToJSON(err))
return return
} }
evalTimer.Stop()


sortTimer := queryStats.GetTimer(stats.ResultSortTime).Start() sortTimer := queryStats.GetTimer(stats.ResultSortTime).Start()
sort.Sort(matrix) sort.Sort(matrix)
Expand Down

0 comments on commit fa1e900

Please sign in to comment.