Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Batch Multi-Metric Queries
Browse files Browse the repository at this point in the history
This is a drive-by performance optimization: there's no need to send
each of these queries individually, we can batch them together and save
on multiple roundtrip times.
  • Loading branch information
JLockerman committed Aug 13, 2020
1 parent 1e1f231 commit cd76d85
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 15 deletions.
14 changes: 12 additions & 2 deletions pkg/pgmodel/sql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,19 +294,29 @@ func (q *pgxQuerier) getResultRows(startTimestamp int64, endTimestamp int64, hin
// TODO this assume on average on row per-metric. Is this right?
results := make([]timescaleRow, 0, len(metrics))

numQueries := 0
batch := q.conn.NewBatch()
for i, metric := range metrics {
//TODO batch getMetricTableName
tableName, err := q.getMetricTableName(metric)
if err != nil {
// If the metric table is missing, there are no results for this query.
if err == errMissingTableName {
continue
}

return nil, nil, err
}
filter.metric = tableName
sqlQuery = buildTimeseriesBySeriesIDQuery(filter, series[i])
rows, err = q.conn.Query(context.Background(), sqlQuery)
batch.Queue(sqlQuery)
numQueries += 1
}

batchResults, err := q.conn.SendBatch(context.Background(), batch)
defer batchResults.Close()

for i := 0; i < numQueries; i++ {
rows, err = batchResults.Query()
if err != nil {
rows.Close()
return nil, nil, err
Expand Down
27 changes: 14 additions & 13 deletions pkg/pgmodel/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,7 @@ func TestPGXQuerierQuery(t *testing.T) {
{Type: prompb.LabelMatcher_NEQ, Name: "foo", Value: "bar"},
},
},
result: []*prompb.TimeSeries{},
sqlQueries: []sqlQuery{
{
sql: "SELECT m.metric_name, array_agg(s.id)\n\t" +
Expand Down Expand Up @@ -1054,6 +1055,12 @@ func TestPGXQuerierQuery(t *testing.T) {
results: rowResults{{"foo"}},
err: error(nil),
},
{
sql: "SELECT table_name FROM _prom_catalog.get_metric_table_name_if_exists($1)",
args: []interface{}{"bar"},
results: rowResults{{"bar"}},
err: error(nil),
},
{
sql: "SELECT s.labels, array_agg(m.time ORDER BY time), array_agg(m.value ORDER BY time)\n\t" +
"FROM \"prom_data\".\"foo\" m\n\t" +
Expand All @@ -1067,12 +1074,6 @@ func TestPGXQuerierQuery(t *testing.T) {
results: rowResults{{[]int64{3}, []time.Time{time.Unix(0, 0)}, []float64{1}}},
err: error(nil),
},
{
sql: "SELECT table_name FROM _prom_catalog.get_metric_table_name_if_exists($1)",
args: []interface{}{"bar"},
results: rowResults{{"bar"}},
err: error(nil),
},
{
sql: "SELECT s.labels, array_agg(m.time ORDER BY time), array_agg(m.value ORDER BY time)\n\t" +
"FROM \"prom_data\".\"bar\" m\n\t" +
Expand Down Expand Up @@ -1139,6 +1140,12 @@ func TestPGXQuerierQuery(t *testing.T) {
results: rowResults{{"foo"}},
err: error(nil),
},
{
sql: "SELECT table_name FROM _prom_catalog.get_metric_table_name_if_exists($1)",
args: []interface{}{"bar"},
results: rowResults{{"bar"}},
err: error(nil),
},
{
sql: "SELECT s.labels, array_agg(m.time ORDER BY time), array_agg(m.value ORDER BY time)\n\t" +
"FROM \"prom_data\".\"foo\" m\n\t" +
Expand All @@ -1152,12 +1159,6 @@ func TestPGXQuerierQuery(t *testing.T) {
results: rowResults{{[]int64{5}, []time.Time{time.Unix(0, 0)}, []float64{1}}},
err: error(nil),
},
{
sql: "SELECT table_name FROM _prom_catalog.get_metric_table_name_if_exists($1)",
args: []interface{}{"bar"},
results: rowResults{{"bar"}},
err: error(nil),
},
{
sql: "SELECT s.labels, array_agg(m.time ORDER BY time), array_agg(m.value ORDER BY time)\n\t" +
"FROM \"prom_data\".\"bar\" m\n\t" +
Expand Down Expand Up @@ -1398,7 +1399,7 @@ func TestPGXQuerierQuery(t *testing.T) {
}

if !reflect.DeepEqual(result, c.result) {
t.Errorf("unexpected result:\ngot\n%v\nwanted\n%v", result, c.result)
t.Errorf("unexpected result:\ngot\n%#v\nwanted\n%+v", result, c.result)
}
})
}
Expand Down

0 comments on commit cd76d85

Please sign in to comment.