Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Close pgx.Rows as soon as possible
Browse files Browse the repository at this point in the history
Keeping around a pgx.Rows is dangerous, as doing so keeps a lock on the
underlying connection. This means that the connecter can self-starve if
it gets partway through a Rows and decides it needs to fetch more data
from the database. To fix this, this commit materializes all pgx.Rows
into a []timescaleRow as soon as possible.
  • Loading branch information
JLockerman committed Aug 13, 2020
1 parent a0f03c9 commit 1e1f231
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 102 deletions.
40 changes: 11 additions & 29 deletions pkg/pgmodel/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,33 +172,19 @@ func (c *clauseBuilder) build() ([]string, []interface{}) {
return c.clauses, c.args
}

func buildSeriesSet(rows []pgx.Rows, sortSeries bool, querier *pgxQuerier) (storage.SeriesSet, storage.Warnings, error) {
return &pgxSeriesSet{
rows: rows,
querier: querier,
}, nil, nil
}

func buildTimeSeries(rows pgx.Rows, q *pgxQuerier) ([]*prompb.TimeSeries, error) {
results := make([]*prompb.TimeSeries, 0)

for rows.Next() {
var (
labelIDs []int64
timestamps []time.Time
values []float64
)
err := rows.Scan(&labelIDs, &timestamps, &values)
func buildTimeSeries(rows []timescaleRow, q *pgxQuerier) ([]*prompb.TimeSeries, error) {
results := make([]*prompb.TimeSeries, 0, len(rows))

if err != nil {
return nil, err
for _, row := range rows {
if row.err != nil {
return nil, row.err
}

if len(timestamps) != len(values) {
if len(row.times.Elements) != len(row.values.Elements) {
return nil, fmt.Errorf("query returned a mismatch in timestamps and values")
}

promLabels, err := q.getPrompbLabelsForIds(labelIDs)
promLabels, err := q.getPrompbLabelsForIds(row.labelIds)
if err != nil {
return nil, err
}
Expand All @@ -209,23 +195,19 @@ func buildTimeSeries(rows pgx.Rows, q *pgxQuerier) ([]*prompb.TimeSeries, error)

result := &prompb.TimeSeries{
Labels: promLabels,
Samples: make([]prompb.Sample, 0, len(timestamps)),
Samples: make([]prompb.Sample, 0, len(row.times.Elements)),
}

for i := range timestamps {
for i := range row.times.Elements {
result.Samples = append(result.Samples, prompb.Sample{
Timestamp: toMilis(timestamps[i]),
Value: values[i],
Timestamp: timestamptzToMs(row.times.Elements[i]),
Value: row.values.Elements[i].Float,
})
}

results = append(results, result)
}

if rows.Err() != nil {
return nil, rows.Err()
}

return results, nil
}

Expand Down
64 changes: 28 additions & 36 deletions pkg/pgmodel/series_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package pgmodel

import (
"fmt"
"math"
"sort"

"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
Expand All @@ -26,58 +24,62 @@ var (
// pgxSeriesSet implements storage.SeriesSet.
type pgxSeriesSet struct {
rowIdx int
rows []pgx.Rows
rows []timescaleRow
err error
querier labelQuerier
}

// pgxSeriesSet must implement storage.SeriesSet
var _ storage.SeriesSet = (*pgxSeriesSet)(nil)

func buildSeriesSet(rows []timescaleRow, querier labelQuerier) (storage.SeriesSet, storage.Warnings, error) {
return &pgxSeriesSet{
rows: rows,
querier: querier,
rowIdx: -1,
}, nil, nil
}

// Next forwards the internal cursor to next storage.Series
func (p *pgxSeriesSet) Next() bool {
if p.rowIdx >= len(p.rows) {
return false
}
for !p.rows[p.rowIdx].Next() {
if p.err == nil {
p.err = p.rows[p.rowIdx].Err()
}
p.rows[p.rowIdx].Close()
p.rowIdx++
if p.rowIdx >= len(p.rows) {
return false
}
p.rowIdx += 1
if p.rowIdx >= len(p.rows) {
return false
}
if p.err == nil {
p.err = p.rows[p.rowIdx].err
}
return true
}

// At returns the current storage.Series. It expects to get rows to contain
// four arrays in binary format which it attempts to deserialize into specific types.
// It also expects that the first two and second two arrays are the same length.
// At returns the current storage.Series.
func (p *pgxSeriesSet) At() storage.Series {
if p.rowIdx >= len(p.rows) {
return nil
}

// Setting invalid data until we confirm that all data is valid.
p.err = errInvalidData
row := &p.rows[p.rowIdx]

ps := &pgxSeries{}
var labelIds []int64
if err := p.rows[p.rowIdx].Scan(&labelIds, &ps.times, &ps.values); err != nil {
log.Error("err", err)
if row.err != nil {
return nil
}

if len(ps.times.Elements) != len(ps.values.Elements) {
if len(row.times.Elements) != len(row.values.Elements) {
p.err = errInvalidData
return nil
}

ps := &pgxSeries{
times: row.times,
values: row.values,
}

// this should pretty much always be non-empty due to __name__, but it
// costs little to check here
if len(labelIds) != 0 {
lls, err := p.querier.getLabelsForIds(labelIds)
if len(row.labelIds) != 0 {
lls, err := p.querier.getLabelsForIds(row.labelIds)
if err != nil {
log.Error("err", err)
return nil
Expand All @@ -86,7 +88,6 @@ func (p *pgxSeriesSet) At() storage.Series {
ps.labels = lls
}

p.err = nil
return ps
}

Expand Down Expand Up @@ -148,16 +149,7 @@ func (p *pgxSeriesIterator) Seek(t int64) bool {

// getTs returns a Unix timestamp in milliseconds.
func (p *pgxSeriesIterator) getTs() int64 {
v := p.times.Elements[p.cur]

switch v.InfinityModifier {
case pgtype.NegativeInfinity:
return math.MinInt64
case pgtype.Infinity:
return math.MaxInt64
default:
return v.Time.UnixNano() / 1e6
}
return timestamptzToMs(p.times.Elements[p.cur])
}

func (p *pgxSeriesIterator) getVal() float64 {
Expand Down
43 changes: 32 additions & 11 deletions pkg/pgmodel/series_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/jackc/pgconn"
"github.com/jackc/pgproto3/v2"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/prometheus/prometheus/pkg/labels"
)

Expand Down Expand Up @@ -118,6 +117,8 @@ type seriesSetRow struct {
values []pgtype.Float8
}

var arbitraryErr = fmt.Errorf("arbitrary err")

func TestPgxSeriesSet(t *testing.T) {
testCases := []struct {
name string
Expand All @@ -131,9 +132,9 @@ func TestPgxSeriesSet(t *testing.T) {
}{
{
name: "invalid row",
rowErr: fmt.Errorf("arbitrary err"),
rowErr: arbitraryErr,
input: [][]seriesSetRow{{seriesSetRow{}}},
err: errInvalidData,
err: arbitraryErr,
rowCount: 1,
},
{
Expand Down Expand Up @@ -239,7 +240,7 @@ func TestPgxSeriesSet(t *testing.T) {
c.input = [][]seriesSetRow{{
genSeries(labels, c.ts, c.vs)}}
}
p := pgxSeriesSet{rows: genPgxRows(c.input, c.rowErr), querier: mapQuerier{labelMapping}}
p, _, _ := buildSeriesSet(genPgxRows(c.input, c.rowErr), mapQuerier{labelMapping})

for c.rowCount > 0 {
c.rowCount--
Expand Down Expand Up @@ -393,19 +394,39 @@ func genRows(count int) [][][]byte {
return result
}

func genPgxRows(m [][]seriesSetRow, err error) []pgx.Rows {
result := make([]pgx.Rows, len(m))

for i := range result {
result[i] = &mockPgxRows{
results: m[i],
err: err,
func genPgxRows(m [][]seriesSetRow, err error) []timescaleRow {
var result []timescaleRow

for _, mm := range m {
for _, r := range mm {
result = append(result, timescaleRow{
labelIds: r.labels,
times: toTimestampTzArray(r.timestamps),
values: toFloat8Array(r.values),
err: err,
})
}
}

return result
}

func toTimestampTzArray(times []pgtype.Timestamptz) pgtype.TimestamptzArray {
return pgtype.TimestamptzArray{
Elements: times,
Dimensions: nil,
Status: pgtype.Present,
}
}

func toFloat8Array(values []pgtype.Float8) pgtype.Float8Array {
return pgtype.Float8Array{
Elements: values,
Dimensions: nil,
Status: pgtype.Present,
}
}

func genSeries(labels []int64, ts []pgtype.Timestamptz, vs []pgtype.Float8) seriesSetRow {

for i := range ts {
Expand Down

0 comments on commit 1e1f231

Please sign in to comment.