Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Stop fetching timestamps for regular series from the DB
Browse files Browse the repository at this point in the history
Regular series contain a timestamp series of pre-determined
timestamp values. Previously, this was provided by the database
using generate_series call but this is wasteful not only in
terms of DB work but also the memory used for the fetched timestamp
array. Here we change that to provide the series from a Go interface
instead and regular series are trivially implemented as a simple Golang
class.
  • Loading branch information
cevian committed Jul 1, 2021
1 parent 87b7f2b commit 4bbb867
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 43 deletions.
6 changes: 3 additions & 3 deletions pkg/pgmodel/querier/querier.go
Expand Up @@ -189,7 +189,7 @@ func (q *pgxQuerier) querySingleMetric(metric string, filter metricTimeRangeFilt
}
filter.metric = tableName

sqlQuery, values, topNode, err := buildTimeseriesByLabelClausesQuery(filter, cases, values, hints, qh, path)
sqlQuery, values, topNode, tsSeries, err := buildTimeseriesByLabelClausesQuery(filter, cases, values, hints, qh, path)
if err != nil {
return nil, nil, err
}
Expand All @@ -206,7 +206,7 @@ func (q *pgxQuerier) querySingleMetric(metric string, filter metricTimeRangeFilt
defer rows.Close()

// TODO this allocation assumes we usually have 1 row, if not, refactor
tsRows, err := appendTsRows(make([]timescaleRow, 0, 1), rows)
tsRows, err := appendTsRows(make([]timescaleRow, 0, 1), rows, tsSeries)
return tsRows, topNode, err
}

Expand Down Expand Up @@ -262,7 +262,7 @@ func (q *pgxQuerier) queryMultipleMetrics(filter metricTimeRangeFilter, cases []
return nil, nil, err
}
// Append all rows into results.
results, err = appendTsRows(results, rows)
results, err = appendTsRows(results, rows, nil)
// Can't defer because we need to Close before the next loop iteration.
rows.Close()
if err != nil {
Expand Down
51 changes: 30 additions & 21 deletions pkg/pgmodel/querier/query_builder.go
Expand Up @@ -101,10 +101,10 @@ const (
WHERE
labels && (SELECT COALESCE(array_agg(l.id), array[]::int[]) FROM _prom_catalog.label l WHERE l.key = 'job' and l.value = 'demo');
*/
timeseriesByMetricSQLFormat = `SELECT series.labels, result.time_array, result.value_array
timeseriesByMetricSQLFormat = `SELECT series.labels, %[7]s
FROM %[2]s series
INNER JOIN LATERAL (
SELECT %[6]s as time_array, %[7]s as value_array
SELECT %[6]s
FROM
(
SELECT time, value
Expand All @@ -121,10 +121,10 @@ const (
/* optimized for no clauses besides __name__
uses a inner join without a lateral to allow for better parallel execution
*/
timeseriesByMetricSQLFormatNoClauses = `SELECT series.labels, result.time_array, result.value_array
timeseriesByMetricSQLFormatNoClauses = `SELECT series.labels, %[7]s
FROM %[2]s series
INNER JOIN (
SELECT series_id, %[6]s as time_array, %[7]s as value_array
SELECT series_id, %[6]s
FROM
(
SELECT series_id, time, value
Expand Down Expand Up @@ -322,7 +322,7 @@ func buildTimeSeries(rows []timescaleRow, lr lreader.LabelsReader) ([]*prompb.Ti
return nil, row.err
}

if len(row.times.Elements) != len(row.values.Elements) {
if row.times.Len() != len(row.values.Elements) {
return nil, errors.ErrQueryMismatchTimestampValue
}

Expand All @@ -348,12 +348,12 @@ func buildTimeSeries(rows []timescaleRow, lr lreader.LabelsReader) ([]*prompb.Ti

result := &prompb.TimeSeries{
Labels: promLabels,
Samples: make([]prompb.Sample, 0, len(row.times.Elements)),
Samples: make([]prompb.Sample, 0, row.times.Len()),
}

for i := range row.times.Elements {
for i := 0; i < row.times.Len(); i++ {
result.Samples = append(result.Samples, prompb.Sample{
Timestamp: pgmodel.TimestamptzToMs(row.times.Elements[i]),
Timestamp: row.times.At(i),
Value: row.values.Elements[i].Float,
})
}
Expand Down Expand Up @@ -384,20 +384,30 @@ func buildTimeseriesBySeriesIDQuery(filter metricTimeRangeFilter, series []pgmod
}

func buildTimeseriesByLabelClausesQuery(filter metricTimeRangeFilter, cases []string, values []interface{},
hints *storage.SelectHints, qh *QueryHints, path []parser.Node) (string, []interface{}, parser.Node, error) {
hints *storage.SelectHints, qh *QueryHints, path []parser.Node) (string, []interface{}, parser.Node, TimestampSeries, error) {
qf, node, err := getAggregators(hints, qh, path)
if err != nil {
return "", nil, nil, err
return "", nil, nil, nil, err
}

timeClauseBound, values, err := setParameterNumbers(qf.timeClause, values, qf.timeParams...)
if err != nil {
return "", nil, nil, err
selectors := []string{}
selectorClauses := []string{}

if qf.timeClause != "" {
var timeClauseBound string
timeClauseBound, values, err = setParameterNumbers(qf.timeClause, values, qf.timeParams...)
if err != nil {
return "", nil, nil, nil, err
}
selectors = append(selectors, "result.time_array")
selectorClauses = append(selectorClauses, timeClauseBound+" as time_array")
}
valueClauseBound, values, err := setParameterNumbers(qf.valueClause, values, qf.valueParams...)
if err != nil {
return "", nil, nil, err
return "", nil, nil, nil, err
}
selectors = append(selectors, "result.value_array")
selectorClauses = append(selectorClauses, valueClauseBound+" as value_array")

orderByClause := "ORDER BY time"
if qf.unOrdered {
Expand All @@ -418,12 +428,12 @@ func buildTimeseriesByLabelClausesQuery(filter metricTimeRangeFilter, cases []st
strings.Join(cases, " AND "),
filter.startTime,
filter.endTime,
timeClauseBound,
valueClauseBound,
strings.Join(selectorClauses, ","),
strings.Join(selectors, ","),
orderByClause,
)

return finalSQL, values, node, nil
return finalSQL, values, node, qf.tsSeries, nil
}

func hasSubquery(path []parser.Node) bool {
Expand All @@ -442,6 +452,7 @@ type aggregators struct {
valueClause string
valueParams []interface{}
unOrdered bool
tsSeries TimestampSeries //can be NULL and only present if timeClause == ""
}

/* vectorSelectors called by the timestamp function have special handling see engine.go */
Expand Down Expand Up @@ -473,11 +484,10 @@ func callAggregator(hints *storage.SelectHints, funcName string) (*aggregators,
}
}
qf := aggregators{
timeClause: "ARRAY(SELECT generate_series($%d::timestamptz, $%d::timestamptz, $%d))",
timeParams: []interface{}{model.Time(queryStart).Time(), model.Time(queryEnd).Time(), stepDuration},
valueClause: "prom_" + funcName + "($%d, $%d,$%d, $%d, time, value)",
valueParams: []interface{}{model.Time(hints.Start).Time(), model.Time(queryEnd).Time(), int64(stepDuration.Milliseconds()), int64(rangeDuration.Milliseconds())},
unOrdered: false,
tsSeries: NewRegularTimestampSeries(model.Time(queryStart).Time(), model.Time(queryEnd).Time(), stepDuration),
}
return &qf, nil
}
Expand Down Expand Up @@ -530,11 +540,10 @@ func getAggregators(hints *storage.SelectHints, qh *QueryHints, path []parser.No
vs.Offset == time.Duration(0) &&
vectorSelectorExtensionRange(extension.PromscaleExtensionVersion) {
qf := aggregators{
timeClause: "ARRAY(SELECT generate_series($%d::timestamptz, $%d::timestamptz, $%d))",
timeParams: []interface{}{qh.StartTime, qh.EndTime, time.Duration(hints.Step) * time.Millisecond},
valueClause: "vector_selector($%d, $%d,$%d, $%d, time, value)",
valueParams: []interface{}{qh.StartTime, qh.EndTime, hints.Step, qh.Lookback.Milliseconds()},
unOrdered: true,
tsSeries: NewRegularTimestampSeries(qh.StartTime, qh.EndTime, time.Duration(hints.Step)*time.Millisecond),
}
return &qf, qh.CurrentNode, nil
}
Expand Down
29 changes: 20 additions & 9 deletions pkg/pgmodel/querier/row.go
Expand Up @@ -131,34 +131,45 @@ func (dstwrapper *float8ArrayWrapper) DecodeBinary(ci *pgtype.ConnInfo, src []by

type timescaleRow struct {
labelIds []int64
times *pgtype.TimestamptzArray
times TimestampSeries
values *pgtype.Float8Array
err error

//only used to hold ownership for releasing to pool
timeArrayOwnership *pgtype.TimestamptzArray
}

func (r *timescaleRow) Close() {
tPool.Put(r.times)
if r.timeArrayOwnership != nil {
tPool.Put(r.timeArrayOwnership)
}
fPool.Put(r.values)
}

// appendTsRows adds new results rows to already existing result rows and
// returns the as a result.
func appendTsRows(out []timescaleRow, in pgxconn.PgxRows) ([]timescaleRow, error) {
func appendTsRows(out []timescaleRow, in pgxconn.PgxRows, tsSeries TimestampSeries) ([]timescaleRow, error) {
if in.Err() != nil {
return out, in.Err()
}
for in.Next() {
var row timescaleRow
values := fPool.Get().(*pgtype.Float8Array)
times := tPool.Get().(*pgtype.TimestamptzArray)
values.Elements = values.Elements[:0]
times.Elements = times.Elements[:0]

valuesWrapper := float8ArrayWrapper{values}
timesWrapper := timestamptzArrayWrapper{times}
row.err = in.Scan(&row.labelIds, &timesWrapper, &valuesWrapper)

row.times = times
if tsSeries == nil {
times := tPool.Get().(*pgtype.TimestamptzArray)
times.Elements = times.Elements[:0]
timesWrapper := timestamptzArrayWrapper{times}
row.err = in.Scan(&row.labelIds, &timesWrapper, &valuesWrapper)
row.timeArrayOwnership = times
row.times = NewRowTimestampSeries(times)
} else {
row.err = in.Scan(&row.labelIds, &valuesWrapper)
row.times = tsSeries
}

row.values = values

out = append(out, row)
Expand Down
16 changes: 7 additions & 9 deletions pkg/pgmodel/querier/series_set.go
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/timescale/promscale/pkg/pgmodel/common/errors"
"github.com/timescale/promscale/pkg/pgmodel/model"
)

const (
Expand Down Expand Up @@ -77,7 +76,7 @@ func (p *pgxSeriesSet) At() storage.Series {
if row.err != nil {
return nil
}
if len(row.times.Elements) != len(row.values.Elements) {
if row.times.Len() != len(row.values.Elements) {
p.err = errors.ErrInvalidRowData
return nil
}
Expand Down Expand Up @@ -137,7 +136,7 @@ func (p *pgxSeriesSet) Close() {
// pgxSeries implements storage.Series.
type pgxSeries struct {
labels labels.Labels
times *pgtype.TimestamptzArray
times TimestampSeries
values *pgtype.Float8Array
}

Expand All @@ -155,15 +154,15 @@ func (p *pgxSeries) Iterator() chunkenc.Iterator {
type pgxSeriesIterator struct {
cur int
totalSamples int
times *pgtype.TimestamptzArray
times TimestampSeries
values *pgtype.Float8Array
}

// newIterator returns an iterator over the samples. It expects times and values to be the same length.
func newIterator(times *pgtype.TimestamptzArray, values *pgtype.Float8Array) *pgxSeriesIterator {
func newIterator(times TimestampSeries, values *pgtype.Float8Array) *pgxSeriesIterator {
return &pgxSeriesIterator{
cur: -1,
totalSamples: len(times.Elements),
totalSamples: times.Len(),
times: times,
values: values,
}
Expand All @@ -184,7 +183,7 @@ func (p *pgxSeriesIterator) Seek(t int64) bool {

// getTs returns a Unix timestamp in milliseconds.
func (p *pgxSeriesIterator) getTs() int64 {
return model.TimestamptzToMs(p.times.Elements[p.cur])
return p.times.At(p.cur)
}

func (p *pgxSeriesIterator) getVal() float64 {
Expand All @@ -206,8 +205,7 @@ func (p *pgxSeriesIterator) Next() bool {
if p.cur >= p.totalSamples {
return false
}
if p.times.Elements[p.cur].Status == pgtype.Present &&
p.values.Elements[p.cur].Status == pgtype.Present {
if p.values.Elements[p.cur].Status == pgtype.Present {
return true
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pgmodel/querier/series_set_test.go
Expand Up @@ -413,7 +413,7 @@ func genPgxRows(m [][]seriesSetRow, err error) []timescaleRow {
for _, r := range mm {
result = append(result, timescaleRow{
labelIds: r.labels,
times: toTimestampTzArray(r.timestamps),
times: NewRowTimestampSeries(toTimestampTzArray(r.timestamps)),
values: toFloat8Array(r.values),
err: err,
})
Expand Down
56 changes: 56 additions & 0 deletions pkg/pgmodel/querier/timestamp_series.go
@@ -0,0 +1,56 @@
package querier

import (
"time"

"github.com/jackc/pgtype"
prommodel "github.com/prometheus/common/model"
"github.com/timescale/promscale/pkg/pgmodel/model"
)

type TimestampSeries interface {
At(index int) int64
Len() int
}

type rowTimestampSeries struct {
times *pgtype.TimestamptzArray
}

func NewRowTimestampSeries(times *pgtype.TimestamptzArray) *rowTimestampSeries {
return &rowTimestampSeries{times: times}
}

func (t *rowTimestampSeries) At(index int) int64 {
return model.TimestamptzToMs(t.times.Elements[index])
}

func (t *rowTimestampSeries) Len() int {
return len(t.times.Elements)
}

type regularTimestampSeries struct {
start time.Time
end time.Time
step time.Duration
len int
}

func NewRegularTimestampSeries(start time.Time, end time.Time, step time.Duration) *regularTimestampSeries {
len := (end.Sub(start) / step) + 1
return &regularTimestampSeries{
start: start,
end: end,
step: step,
len: int(len),
}
}

func (t *regularTimestampSeries) Len() int {
return t.len
}

func (t *regularTimestampSeries) At(index int) int64 {
time := t.start.Add(time.Duration(index) * t.step)
return int64(prommodel.TimeFromUnixNano(time.UnixNano()))
}

0 comments on commit 4bbb867

Please sign in to comment.