Skip to content

Commit

Permalink
*: implement query params
Browse files Browse the repository at this point in the history
This adds a parameter to the storage selection interface which allows
query engine(s) to pass information about the operations surrounding a
data selection.
This can for example be used by remote storage backends to infer the
correct downsampling aggregates that need to be provided.
  • Loading branch information
fabxc committed Feb 13, 2018
1 parent 4801573 commit 7ccd4b3
Show file tree
Hide file tree
Showing 18 changed files with 156 additions and 138 deletions.
23 changes: 12 additions & 11 deletions cmd/prometheus/main.go
Expand Up @@ -88,12 +88,12 @@ func main() {
localStoragePath string
notifier notifier.Options
notifierTimeout model.Duration
queryEngine promql.EngineOptions
web web.Options
tsdb tsdb.Options
lookbackDelta model.Duration
webTimeout model.Duration
queryTimeout model.Duration
queryConcurrency int

prometheusURL string

Expand All @@ -102,9 +102,6 @@ func main() {
notifier: notifier.Options{
Registerer: prometheus.DefaultRegisterer,
},
queryEngine: promql.EngineOptions{
Metrics: prometheus.DefaultRegisterer,
},
}

a := kingpin.New(filepath.Base(os.Args[0]), "The Prometheus monitoring server")
Expand Down Expand Up @@ -178,7 +175,7 @@ func main() {
Default("2m").SetValue(&cfg.queryTimeout)

a.Flag("query.max-concurrency", "Maximum number of queries executed concurrently.").
Default("20").IntVar(&cfg.queryEngine.MaxConcurrentQueries)
Default("20").IntVar(&cfg.queryConcurrency)

promlogflag.AddFlags(a, &cfg.logLevel)

Expand Down Expand Up @@ -209,8 +206,6 @@ func main() {

promql.LookbackDelta = time.Duration(cfg.lookbackDelta)

cfg.queryEngine.Timeout = time.Duration(cfg.queryTimeout)

logger := promlog.New(cfg.logLevel)

// XXX(fabxc): Kubernetes does background logging which we can only customize by modifying
Expand All @@ -233,7 +228,6 @@ func main() {
fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
)

cfg.queryEngine.Logger = log.With(logger, "component", "query engine")
var (
ctxWeb, cancelWeb = context.WithCancel(context.Background())
ctxRule = context.Background()
Expand All @@ -247,10 +241,17 @@ func main() {
discoveryManagerNotify = discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"))

scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine)
ruleManager = rules.NewManager(&rules.ManagerOptions{

queryEngine = promql.NewEngine(
log.With(logger, "component", "query engine"),
prometheus.DefaultRegisterer,
cfg.queryConcurrency,
time.Duration(cfg.queryTimeout),
)

ruleManager = rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine),
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
NotifyFunc: sendAlerts(notifier, cfg.web.ExternalURL.String()),
Context: ctxRule,
ExternalURL: cfg.web.ExternalURL,
Expand Down
54 changes: 28 additions & 26 deletions promql/ast.go
Expand Up @@ -238,56 +238,58 @@ type VectorMatching struct {
}

// Visitor allows visiting a Node and its child nodes. The Visit method is
// invoked for each node encountered by Walk. If the result visitor w is not
// nil, Walk visits each of the children of node with the visitor w, followed
// by a call of w.Visit(nil).
// invoked for each node with the path leading to the node provided additionally.
// If the result visitor w is not nil, Walk visits each of the children
// of node with the visitor w, followed by a call of w.Visit(nil, nil).
type Visitor interface {
Visit(node Node) (w Visitor)
Visit(node Node, path []Node) (w Visitor)
}

// Walk traverses an AST in depth-first order: It starts by calling
// v.Visit(node); node must not be nil. If the visitor w returned by
// v.Visit(node) is not nil, Walk is invoked recursively with visitor
// v.Visit(node, path); node must not be nil. If the visitor w returned by
// v.Visit(node, path) is not nil, Walk is invoked recursively with visitor
// w for each of the non-nil children of node, followed by a call of
// w.Visit(nil).
func Walk(v Visitor, node Node) {
if v = v.Visit(node); v == nil {
// As the tree is descended the path of previous nodes is provided.
func Walk(v Visitor, node Node, path []Node) {
if v = v.Visit(node, path); v == nil {
return
}
path = append(path, node)

switch n := node.(type) {
case Statements:
for _, s := range n {
Walk(v, s)
Walk(v, s, path)
}
case *AlertStmt:
Walk(v, n.Expr)
Walk(v, n.Expr, path)

case *EvalStmt:
Walk(v, n.Expr)
Walk(v, n.Expr, path)

case *RecordStmt:
Walk(v, n.Expr)
Walk(v, n.Expr, path)

case Expressions:
for _, e := range n {
Walk(v, e)
Walk(v, e, path)
}
case *AggregateExpr:
Walk(v, n.Expr)
Walk(v, n.Expr, path)

case *BinaryExpr:
Walk(v, n.LHS)
Walk(v, n.RHS)
Walk(v, n.LHS, path)
Walk(v, n.RHS, path)

case *Call:
Walk(v, n.Args)
Walk(v, n.Args, path)

case *ParenExpr:
Walk(v, n.Expr)
Walk(v, n.Expr, path)

case *UnaryExpr:
Walk(v, n.Expr)
Walk(v, n.Expr, path)

case *MatrixSelector, *NumberLiteral, *StringLiteral, *VectorSelector:
// nothing to do
Expand All @@ -296,21 +298,21 @@ func Walk(v Visitor, node Node) {
panic(fmt.Errorf("promql.Walk: unhandled node type %T", node))
}

v.Visit(nil)
v.Visit(nil, nil)
}

type inspector func(Node) bool
type inspector func(Node, []Node) bool

func (f inspector) Visit(node Node) Visitor {
if f(node) {
func (f inspector) Visit(node Node, path []Node) Visitor {
if f(node, path) {
return f
}
return nil
}

// Inspect traverses an AST in depth-first order: It starts by calling
// f(node); node must not be nil. If f returns true, Inspect invokes f
// f(node, path); node must not be nil. If f returns true, Inspect invokes f
// for all the non-nil children of node, recursively.
func Inspect(node Node, f func(Node) bool) {
Walk(inspector(f), node)
func Inspect(node Node, f func(Node, []Node) bool) {
Walk(inspector(f), node, nil)
}

0 comments on commit 7ccd4b3

Please sign in to comment.