Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Add explanation and minor refactoring around pushdown functions
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesGuthrie committed Feb 9, 2022
1 parent f51923b commit 0f51d32
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 66 deletions.
129 changes: 86 additions & 43 deletions pkg/pgmodel/querier/query_builder.go
Expand Up @@ -207,6 +207,8 @@ var (
rateIncreaseExtensionRange = semver.MustParseRange(">= 0.2.0")
)

// aggregators represent postgres functions which are used for the array
// aggregation of series values.
type aggregators struct {
timeClause string
timeParams []interface{}
Expand All @@ -216,68 +218,109 @@ type aggregators struct {
tsSeries TimestampSeries //can be NULL and only present if timeClause == ""
}

/* The path is the list of ancestors (direct parent last) returned node is the most-ancestral node processed by the pushdown */
// todo: investigate if query hints can have only node and lookback
func getAggregators(md *promqlMetadata) (*aggregators, parser.Node, error) {
path := md.path // PromQL AST.
qh := md.queryHints
hints := md.selectHints
if !extension.ExtensionIsInstalled || qh == nil || hasSubquery(path) || hints == nil {
return getDefaultAggregators(), nil, nil
// getAggregators returns the aggregator which should be used to fetch data for
// a single metric. It may apply pushdowns to functions.
func getAggregators(metadata *promqlMetadata) (*aggregators, parser.Node, error) {
// todo: investigate if query hints can have only node and lookback
if canAttemptPushdown(metadata) {
agg, node, err := tryPushDown(metadata)
if err == nil && agg != nil {
return agg, node, nil
}
}

//switch on the current node being processed
vs, isVectorSelector := qh.CurrentNode.(*parser.VectorSelector)
defaultAggregators := &aggregators{
timeClause: "array_agg(time)",
valueClause: "array_agg(value)",
unOrdered: false,
}

return defaultAggregators, nil, nil
}

func canAttemptPushdown(metadata *promqlMetadata) bool {
path := metadata.path // PromQL AST.
queryHints := metadata.queryHints
selectHints := metadata.selectHints
return extension.ExtensionIsInstalled && queryHints != nil && !hasSubquery(path) && selectHints != nil
}

// tryPushDown inspects the AST above the current node to determine if it's
// possible to make use of a known pushdown function.
//
// We can push down some PromQL functions, as well as a VectorSelector. Refer
// to tryExtractPushdownableFunctionName to see which PromQL pushdowns are
// available.
//
// If pushdown is possible, tryPushDown returns the aggregator representing the
// pushed down function, as well as the new top node resulting from the
// pushdown. If no pushdown is possible, it returns nil.
// For more on top nodes, see `engine.populateSeries`
func tryPushDown(metadata *promqlMetadata) (*aggregators, parser.Node, error) {

// A function call like `rate(metric[5m])` parses to this AST:
//
// Call -> MatrixSelector -> VectorSelector
//
// This forms the basis for how we determine if we can do a promQL
// pushdown: If the current node is a vector selector, we look up the AST
// to check if the "grandparent" node to the vector selector is a known
// function which we can push down to the database.

path := metadata.path
queryHints := metadata.queryHints
selectHints := metadata.selectHints

vs, isVectorSelector := queryHints.CurrentNode.(*parser.VectorSelector)
if isVectorSelector {
/* Try to optimize the aggregation first since that would return less data than a plain vector selector */
if len(path) >= 2 {
//switch on the 2nd-to-last last path node
node := path[len(path)-2]
callNode, isCall := node.(*parser.Call)
if isCall {
switch callNode.Func.Name {
case "delta":
agg, err := callAggregator(hints, callNode.Func.Name)
return agg, node, err
case "rate", "increase":
if rateIncreaseExtensionRange(extension.PromscaleExtensionVersion) {
agg, err := callAggregator(hints, callNode.Func.Name)
return agg, node, err
}
}
grandparent := path[len(path)-2]
funcName, canPushDown := tryExtractPushdownableFunctionName(grandparent)
if canPushDown {
agg, err := callAggregator(selectHints, funcName)
return agg, grandparent, err
}
}

//TODO: handle the instant query (hints.Step==0) case too.
/* vector selector pushdown improves performance by selecting from the database only the last point
* in a vector selector window(step) this decreases the amount of samples transferred from the DB to Promscale
* by orders of magnitude. A vector selector aggregate also does not require ordered inputs which saves
* a sort and allows for parallel evaluation. */
if hints.Step > 0 &&
hints.Range == 0 && /* So this is not an aggregate. That's optimized above */

// vector selector pushdown improves performance by selecting from the
// database only the last point in a vector selector window(step).
// This decreases the number of samples transferred from the DB to
// Promscale by orders of magnitude. A vector selector aggregate also
// does not require ordered inputs which saves a sort and allows for
// parallel evaluation.
if selectHints.Step > 0 &&
selectHints.Range == 0 && // So this is not an aggregate. That's optimized above
!calledByTimestamp(path) &&
vs.OriginalOffset == time.Duration(0) &&
vs.Offset == time.Duration(0) &&
vectorSelectorExtensionRange(extension.PromscaleExtensionVersion) {
qf := aggregators{
valueClause: "vector_selector($%d, $%d,$%d, $%d, time, value)",
valueParams: []interface{}{qh.StartTime, qh.EndTime, hints.Step, qh.Lookback.Milliseconds()},
valueClause: "vector_selector($%d, $%d, $%d, $%d, time, value)",
valueParams: []interface{}{queryHints.StartTime, queryHints.EndTime, selectHints.Step, queryHints.Lookback.Milliseconds()},
unOrdered: true,
tsSeries: newRegularTimestampSeries(qh.StartTime, qh.EndTime, time.Duration(hints.Step)*time.Millisecond),
tsSeries: newRegularTimestampSeries(queryHints.StartTime, queryHints.EndTime, time.Duration(selectHints.Step)*time.Millisecond),
}
return &qf, qh.CurrentNode, nil
return &qf, queryHints.CurrentNode, nil
}
}

return getDefaultAggregators(), nil, nil
return nil, nil, nil
}

func getDefaultAggregators() *aggregators {
return &aggregators{
timeClause: "array_agg(time)",
valueClause: "array_agg(value)",
unOrdered: false,
func tryExtractPushdownableFunctionName(node parser.Node) (string, bool) {
callNode, isCall := node.(*parser.Call)
if isCall {
switch callNode.Func.Name {
case "delta":
return callNode.Func.Name, true
case "rate", "increase":
if rateIncreaseExtensionRange(extension.PromscaleExtensionVersion) {
return callNode.Func.Name, true
}
}
}
return "", false
}

func callAggregator(hints *storage.SelectHints, funcName string) (*aggregators, error) {
Expand All @@ -294,7 +337,7 @@ func callAggregator(hints *storage.SelectHints, funcName string) (*aggregators,
}
}
qf := aggregators{
valueClause: "prom_" + funcName + "($%d, $%d,$%d, $%d, time, value)",
valueClause: "prom_" + funcName + "($%d, $%d, $%d, $%d, time, value)",
valueParams: []interface{}{model.Time(hints.Start).Time(), model.Time(queryEnd).Time(), stepDuration.Milliseconds(), rangeDuration.Milliseconds()},
unOrdered: false,
tsSeries: newRegularTimestampSeries(model.Time(queryStart).Time(), model.Time(queryEnd).Time(), stepDuration),
Expand Down
37 changes: 29 additions & 8 deletions pkg/pgmodel/querier/query_builder_samples.go
Expand Up @@ -63,7 +63,7 @@ const (
WHERE
labels && (SELECT COALESCE(array_agg(l.id), array[]::int[]) FROM _prom_catalog.label l WHERE l.key = 'job' and l.value = 'demo');
*/
timeseriesByMetricSQLFormat = `SELECT series.labels, %[7]s
timeseriesByMetricSQLFormat = `SELECT series.labels, %[7]s
FROM %[2]s series
INNER JOIN LATERAL (
SELECT %[6]s
Expand All @@ -80,10 +80,9 @@ const (
WHERE
%[3]s`

/* optimized for no clauses besides __name__
uses a inner join without a lateral to allow for better parallel execution
*/
timeseriesByMetricSQLFormatNoClauses = `SELECT series.labels, %[7]s
// This is optimized for no clauses besides __name__, uses an inner join
// without a lateral to allow for better parallel execution.
timeseriesByMetricSQLFormatNoClauses = `SELECT series.labels, %[7]s
FROM %[2]s series
INNER JOIN (
SELECT series_id, %[6]s
Expand All @@ -102,10 +101,32 @@ const (
defaultColumnName = "value"
)

// buildSingleMetricSamplesQuery builds a SQL query which fetches the data for
// one metric.
func buildSingleMetricSamplesQuery(metadata *evalMetadata) (string, []interface{}, parser.Node, TimestampSeries, error) {
// Aggregators are not in exemplar queries. In sample query, we have aggregations since they are
// to serve promql evaluations. But, exemplar queries are fetch-only queries. Their responses are not meant to be
// served by any PromQL function.
// The basic structure of the SQL query which this function produces is:
// SELECT
// series.labels
// , <array_aggregator>(metric.value ORDER BY time) as value_array
// [, <array_aggregator>(metric.time ORDER BY time) as time_array] (optional)
// FROM
// <metric name> metric
// INNER JOIN
// <series name> series ON metric.series_id = series.id
// WHERE
// <some WHERE clauses>
// GROUP BY series.id;
//
// The <array_aggregator> produces an array of values, so each result row
// consists of an array of labels, an array of values, and optionally an
// array of timestamps.
//
// In the absence of available pushdowns, the <array_aggregator> is the
// `array_agg` Postgres function, and the `time_array` result set is
// returned.
// When pushdowns are available, the <array_aggregator> is a pushdown
// function which the promscale extension provides.

qf, node, err := getAggregators(metadata.promqlMetadata)
if err != nil {
return "", nil, nil, nil, err
Expand Down
8 changes: 5 additions & 3 deletions pkg/pgmodel/querier/query_sample.go
Expand Up @@ -67,9 +67,11 @@ func (q *querySamples) fetchSamplesRows(mint, maxt int64, hints *storage.SelectH
return sampleRows, nil, nil
}

// fetchSingleMetricSamples returns all the result rows for a single metric using the
// query metadata and the tools. It uses the hints and node path to try to push
// down query functions where possible.
// fetchSingleMetricSamples returns all the result rows for a single metric
// using the query metadata and the tools. It uses the hints and node path to
// try to push down query functions where possible. When a pushdown is
// successfully applied, the new top node is returned together with the metric
// rows. For more information about top nodes, see `engine.populateSeries`.
func fetchSingleMetricSamples(tools *queryTools, metadata *evalMetadata) ([]sampleRow, parser.Node, error) {
sqlQuery, values, topNode, tsSeries, err := buildSingleMetricSamplesQuery(metadata)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pgmodel/querier/row.go
Expand Up @@ -162,7 +162,7 @@ func (r *sampleRow) GetAdditionalLabels() (ll labels.Labels) {
return ll
}

// appendTsRows adds new results rows to already existing result rows and
// appendSampleRows adds new results rows to already existing result rows and
// returns the as a result.
func appendSampleRows(out []sampleRow, in pgxconn.PgxRows, tsSeries TimestampSeries, metric, schema, column string) ([]sampleRow, error) {
if in.Err() != nil {
Expand Down
34 changes: 23 additions & 11 deletions pkg/promql/engine.go
Expand Up @@ -800,7 +800,19 @@ func (ng *Engine) getTimeRangesForSelector(s *parser.EvalStmt, n *parser.VectorS
return start, end
}

func (ng *Engine) populateSeries(querier SamplesQuerier, s *parser.EvalStmt) map[parser.Node]struct{} {
// populateSeries traverses the promQL AST of evalStmt and augments nodes of
// type VectorSelector with the series data for that node. It uses the querier
// to fetch the series data from the database.
//
// It's possible that when fetching data for a VectorSelector, a pushdown is
// applied to the function which was applied to the VectorSelector. An example
// is the expression `rate(metric[5m])`: the `rate` function is pushed down,
// but the data is stored in the VectorSelector corresponding to `metric[5m]`.
// In this case, `rate` function becomes a terminal node for later expression
// evaluation. These terminal nodes are called "top nodes".
//
// populateSeries returns a map keyed by all top nodes.
func (ng *Engine) populateSeries(querier SamplesQuerier, evalStmt *parser.EvalStmt) map[parser.Node]struct{} {
var (
// Whenever a MatrixSelector is evaluated, evalRange is set to the corresponding range.
// The evaluation of the VectorSelector inside then evaluates the given range and unsets
Expand All @@ -809,22 +821,22 @@ func (ng *Engine) populateSeries(querier SamplesQuerier, s *parser.EvalStmt) map
topNodes map[parser.Node]struct{} = make(map[parser.Node]struct{})
)

parser.Inspect(s.Expr, func(node parser.Node, path []parser.Node) error {
parser.Inspect(evalStmt.Expr, func(node parser.Node, path []parser.Node) error {
switch n := node.(type) {
case *parser.VectorSelector:
var qh *pgquerier.QueryHints
start, end := ng.getTimeRangesForSelector(s, n, path, evalRange)
start, end := ng.getTimeRangesForSelector(evalStmt, n, path, evalRange)
hints := &storage.SelectHints{
Start: start,
End: end,
Step: durationMilliseconds(s.Interval),
Step: durationMilliseconds(evalStmt.Interval),
Range: durationMilliseconds(evalRange),
Func: extractFuncFromPath(path),
}

qh = &pgquerier.QueryHints{
StartTime: s.Start,
EndTime: s.End,
StartTime: evalStmt.Start,
EndTime: evalStmt.End,
CurrentNode: n,
Lookback: ng.lookbackDelta,
}
Expand Down Expand Up @@ -1244,16 +1256,16 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
defer span.Finish()

if _, isTopNode := ev.topNodes[expr]; isTopNode {
/* the storage layer has already processed this node. Just return
the result. */
// the storage layer has already processed this node. Just return
// the result.
var (
mat Matrix
warnings storage.Warnings
err error
)
parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
switch n := node.(type) {
/* Note that a MatrixSelector cascades down to it's VectorSelector in Inspect */
// Note that a MatrixSelector cascades down to its VectorSelector in Inspect.
case *parser.VectorSelector:
warnings, err = checkAndExpandSeriesSet(ev.ctx, n)
if err != nil {
Expand All @@ -1264,8 +1276,8 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, storage.Warnings) {
err = fmt.Errorf("Matrix is already filled in")
return err
}
//all range-vector function calls have their metric name dropped
//see eval() function
// All range-vector function calls have their metric name
// dropped, see eval() function.
_, isCall := expr.(*parser.Call)
mat = ev.getPushdownResult(n, numSteps, isCall)
}
Expand Down

0 comments on commit 0f51d32

Please sign in to comment.