Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Fixup error handling of sql_reader
Browse files Browse the repository at this point in the history
Make sure errors are propagated or handled in all cases. Notably
appendTsRows() now bubbles up any error it may encounter.
  • Loading branch information
JLockerman committed Aug 17, 2020
1 parent f3c4a7f commit 0a60ca8
Showing 1 changed file with 17 additions and 7 deletions.
24 changes: 17 additions & 7 deletions pkg/pgmodel/sql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (q *pgxQuerier) Query(query *prompb.Query) ([]*prompb.TimeSeries, error) {

results, err := buildTimeSeries(rows, q)

return results, nil
return results, err
}

func (q *pgxQuerier) LabelNames() ([]string, error) {
Expand Down Expand Up @@ -314,16 +314,23 @@ func (q *pgxQuerier) getResultRows(startTimestamp int64, endTimestamp int64, hin

batchResults, err := q.conn.SendBatch(context.Background(), batch)
defer batchResults.Close()
if err != nil {
return nil, nil, err
}

for i := 0; i < numQueries; i++ {
rows, err = batchResults.Query()
if err != nil {
rows.Close()
return nil, nil, err
}
results = appendTsRows(results, rows)
results, err = appendTsRows(results, rows)
// can't defer because we need to Close before the next loop iteration
rows.Close()
if err != nil {
rows.Close()
return nil, nil, err
}
}

return results, nil, nil
Expand Down Expand Up @@ -357,21 +364,24 @@ func (q *pgxQuerier) querySingleMetric(metric string, filter metricTimeRangeFilt
}

// TODO this allocation assumes we usually have 1 row, if not, refactor
tsRows := appendTsRows(make([]timescaleRow, 0, 1), rows)
return tsRows, topNode, nil
tsRows, err := appendTsRows(make([]timescaleRow, 0, 1), rows)
return tsRows, topNode, err
}

func appendTsRows(out []timescaleRow, in pgx.Rows) []timescaleRow {
func appendTsRows(out []timescaleRow, in pgx.Rows) ([]timescaleRow, error) {
if in.Err() != nil {
return out, in.Err()
}
for in.Next() {
var row timescaleRow
row.err = in.Scan(&row.labelIds, &row.times, &row.values)
out = append(out, row)
if row.err != nil {
log.Error("err", row.err)
return out
return out, row.err
}
}
return out
return out, in.Err()
}

func (q *pgxQuerier) getMetricTableName(metric string) (string, error) {
Expand Down

0 comments on commit 0a60ca8

Please sign in to comment.