Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
nits + comments + test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
cevian committed Jul 1, 2021
1 parent 4bbb867 commit 135f3e2
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 41 deletions.
4 changes: 1 addition & 3 deletions pkg/api/query_test.go
Expand Up @@ -46,9 +46,7 @@ func (m mockSeriesSet) Warnings() storage.Warnings {
return nil
}

func (m mockSeriesSet) Close() {
return
}
func (m mockSeriesSet) Close() {}

type mockQuerier struct {
timeToSleepOnSelect time.Duration
Expand Down
30 changes: 13 additions & 17 deletions pkg/pgmodel/model/sql_test_utils.go
Expand Up @@ -127,7 +127,7 @@ func (r *SqlRecorder) checkQuery(sql string, args ...interface{}) (RowResults, e
if sql != row.Sql {
dmp := diffmatchpatch.New()
diffs := dmp.DiffMain(sql, row.Sql, false)
r.t.Errorf("@ %d unexpected query:\ngot:\n\t%s\nexpected:\n\t%s\ndiff:\n\t%v", idx, sql, row.Sql, dmp.DiffPrettyText(diffs))
r.t.Errorf("@ %d unexpected query:\ngot:\n\t'%s'\nexpected:\n\t'%s'\ndiff:\n\t%v", idx, sql, row.Sql, dmp.DiffPrettyText(diffs))
}

require.Equal(r.t, len(row.Args), len(args), "Args of different lengths @ %d %s", idx, sql)
Expand Down Expand Up @@ -279,28 +279,24 @@ func (m *MockRows) Scan(dest ...interface{}) error {
case []time.Time:
if d, ok := dest[i].(*[]time.Time); ok {
*d = s
} else if d, ok := dest[i].(*pgtype.TimestamptzArray); ok {
*d = pgtype.TimestamptzArray{
Elements: make([]pgtype.Timestamptz, len(s)),
}
for i := range s {
d.Elements[i] = pgtype.Timestamptz{
Time: s[i],
}
} else if d, ok := dest[i].(pgtype.Value); ok {
err := d.Set(s)
if err != nil {
return err
}
} else {
return fmt.Errorf("wrong value type []time.Time")
}
case []float64:
if d, ok := dest[i].(*[]float64); ok {
*d = s
} else if d, ok := dest[i].(*pgtype.Float8Array); ok {
*d = pgtype.Float8Array{
Elements: make([]pgtype.Float8, len(s)),
}
for i := range s {
d.Elements[i] = pgtype.Float8{
Float: s[i],
}
} else if d, ok := dest[i].(pgtype.Value); ok {
err := d.Set(s)
if err != nil {
return err
}
} else {
return fmt.Errorf("wrong value type []float64")
}
case []int64:
if d, ok := dest[i].(*[]int64); ok {
Expand Down
4 changes: 3 additions & 1 deletion pkg/pgmodel/querier/querier.go
Expand Up @@ -34,6 +34,8 @@ type QueryHints struct {
CurrentNode parser.Node
Lookback time.Duration
}

// SerieSet adds a Close method to storage.SeriesSet to provide a way to free memory
type SeriesSet interface {
storage.SeriesSet
Close()
Expand Down Expand Up @@ -335,7 +337,7 @@ func (errorSeriesSet) Next() bool { return false }
func (errorSeriesSet) At() storage.Series { return nil }
func (e errorSeriesSet) Err() error { return e.err }
func (e errorSeriesSet) Warnings() storage.Warnings { return nil }
func (e errorSeriesSet) Close() { return }
func (e errorSeriesSet) Close() {}

type labelQuerier interface {
LabelsForIdMap(idMap map[int64]labels.Label) (err error)
Expand Down
2 changes: 1 addition & 1 deletion pkg/pgmodel/querier/querier_sql_test.go
Expand Up @@ -636,7 +636,7 @@ func TestPGXQuerierQuery(t *testing.T) {
}
}
} else if !reflect.DeepEqual(result, c.result) {
t.Errorf("unexpected result:\ngot\n%#v\nwanted\n%+v", result, c.result)
t.Errorf("unexpected result:\ngot\n%+v\nwanted\n%+v", result, c.result)
}
})
}
Expand Down
14 changes: 9 additions & 5 deletions pkg/pgmodel/querier/query_builder.go
Expand Up @@ -352,8 +352,12 @@ func buildTimeSeries(rows []timescaleRow, lr lreader.LabelsReader) ([]*prompb.Ti
}

for i := 0; i < row.times.Len(); i++ {
ts, ok := row.times.At(i)
if !ok {
return nil, fmt.Errorf("invalid timestamp found")
}
result.Samples = append(result.Samples, prompb.Sample{
Timestamp: row.times.At(i),
Timestamp: ts,
Value: row.values.Elements[i].Float,
})
}
Expand Down Expand Up @@ -428,8 +432,8 @@ func buildTimeseriesByLabelClausesQuery(filter metricTimeRangeFilter, cases []st
strings.Join(cases, " AND "),
filter.startTime,
filter.endTime,
strings.Join(selectorClauses, ","),
strings.Join(selectors, ","),
strings.Join(selectorClauses, ", "),
strings.Join(selectors, ", "),
orderByClause,
)

Expand Down Expand Up @@ -487,7 +491,7 @@ func callAggregator(hints *storage.SelectHints, funcName string) (*aggregators,
valueClause: "prom_" + funcName + "($%d, $%d,$%d, $%d, time, value)",
valueParams: []interface{}{model.Time(hints.Start).Time(), model.Time(queryEnd).Time(), int64(stepDuration.Milliseconds()), int64(rangeDuration.Milliseconds())},
unOrdered: false,
tsSeries: NewRegularTimestampSeries(model.Time(queryStart).Time(), model.Time(queryEnd).Time(), stepDuration),
tsSeries: newRegularTimestampSeries(model.Time(queryStart).Time(), model.Time(queryEnd).Time(), stepDuration),
}
return &qf, nil
}
Expand Down Expand Up @@ -543,7 +547,7 @@ func getAggregators(hints *storage.SelectHints, qh *QueryHints, path []parser.No
valueClause: "vector_selector($%d, $%d,$%d, $%d, time, value)",
valueParams: []interface{}{qh.StartTime, qh.EndTime, hints.Step, qh.Lookback.Milliseconds()},
unOrdered: true,
tsSeries: NewRegularTimestampSeries(qh.StartTime, qh.EndTime, time.Duration(hints.Step)*time.Millisecond),
tsSeries: newRegularTimestampSeries(qh.StartTime, qh.EndTime, time.Duration(hints.Step)*time.Millisecond),
}
return &qf, qh.CurrentNode, nil
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/pgmodel/querier/row.go
Expand Up @@ -21,7 +21,7 @@ var tPool = sync.Pool{
},
}

//wrapper to have code to reuse existing array so that a pool is effective
//wrapper to allow DecodeBinary to reuse the existing array so that a pool is effective
type timestamptzArrayWrapper struct {
*pgtype.TimestamptzArray
}
Expand Down Expand Up @@ -75,7 +75,7 @@ func (dstwrapper *timestamptzArrayWrapper) DecodeBinary(ci *pgtype.ConnInfo, src
return nil
}

//wrapper to have code to reuse existing array so that a pool is effective
//wrapper to to allow DecodeBinary to reuse existing array so that a pool is effective
type float8ArrayWrapper struct {
*pgtype.Float8Array
}
Expand Down Expand Up @@ -158,13 +158,14 @@ func appendTsRows(out []timescaleRow, in pgxconn.PgxRows, tsSeries TimestampSeri
values.Elements = values.Elements[:0]
valuesWrapper := float8ArrayWrapper{values}

//if a timeseries isn't provided it will be fetched from the database
if tsSeries == nil {
times := tPool.Get().(*pgtype.TimestamptzArray)
times.Elements = times.Elements[:0]
timesWrapper := timestamptzArrayWrapper{times}
row.err = in.Scan(&row.labelIds, &timesWrapper, &valuesWrapper)
row.timeArrayOwnership = times
row.times = NewRowTimestampSeries(times)
row.times = newRowTimestampSeries(times)
} else {
row.err = in.Scan(&row.labelIds, &valuesWrapper)
row.times = tsSeries
Expand Down
7 changes: 4 additions & 3 deletions pkg/pgmodel/querier/series_set.go
Expand Up @@ -130,7 +130,6 @@ func (p *pgxSeriesSet) Close() {
for _, row := range p.rows {
row.Close()
}
return
}

// pgxSeries implements storage.Series.
Expand Down Expand Up @@ -183,7 +182,8 @@ func (p *pgxSeriesIterator) Seek(t int64) bool {

// getTs returns a Unix timestamp in milliseconds.
func (p *pgxSeriesIterator) getTs() int64 {
return p.times.At(p.cur)
ts, _ := p.times.At(p.cur)
return ts
}

func (p *pgxSeriesIterator) getVal() float64 {
Expand All @@ -205,7 +205,8 @@ func (p *pgxSeriesIterator) Next() bool {
if p.cur >= p.totalSamples {
return false
}
if p.values.Elements[p.cur].Status == pgtype.Present {
_, ok := p.times.At(p.cur)
if ok && p.values.Elements[p.cur].Status == pgtype.Present {
return true
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pgmodel/querier/series_set_test.go
Expand Up @@ -413,7 +413,7 @@ func genPgxRows(m [][]seriesSetRow, err error) []timescaleRow {
for _, r := range mm {
result = append(result, timescaleRow{
labelIds: r.labels,
times: NewRowTimestampSeries(toTimestampTzArray(r.timestamps)),
times: newRowTimestampSeries(toTimestampTzArray(r.timestamps)),
values: toFloat8Array(r.values),
err: err,
})
Expand Down
19 changes: 12 additions & 7 deletions pkg/pgmodel/querier/timestamp_series.go
Expand Up @@ -8,35 +8,40 @@ import (
"github.com/timescale/promscale/pkg/pgmodel/model"
)

// TimestampSeries represent an array of timestamps (model.Time/int64) that is 0-indexed.
type TimestampSeries interface {
At(index int) int64
//At returns the element at an index location, as well as a bool to indicate
//whether the value is valid (or NULL for example)
At(index int) (int64, bool)
Len() int
}

//rowTimestampSeries is a TimestampSeries based on data fetched from a database row
type rowTimestampSeries struct {
times *pgtype.TimestamptzArray
}

func NewRowTimestampSeries(times *pgtype.TimestamptzArray) *rowTimestampSeries {
func newRowTimestampSeries(times *pgtype.TimestamptzArray) *rowTimestampSeries {
return &rowTimestampSeries{times: times}
}

func (t *rowTimestampSeries) At(index int) int64 {
return model.TimestamptzToMs(t.times.Elements[index])
func (t *rowTimestampSeries) At(index int) (int64, bool) {
return model.TimestamptzToMs(t.times.Elements[index]), t.times.Elements[index].Status == pgtype.Present
}

func (t *rowTimestampSeries) Len() int {
return len(t.times.Elements)
}

// regularTimestampSeries represents a time-series that is regular (e.g. each timestamp is step duration ahead of the previous one)
type regularTimestampSeries struct {
start time.Time
end time.Time
step time.Duration
len int
}

func NewRegularTimestampSeries(start time.Time, end time.Time, step time.Duration) *regularTimestampSeries {
func newRegularTimestampSeries(start time.Time, end time.Time, step time.Duration) *regularTimestampSeries {
len := (end.Sub(start) / step) + 1
return &regularTimestampSeries{
start: start,
Expand All @@ -50,7 +55,7 @@ func (t *regularTimestampSeries) Len() int {
return t.len
}

func (t *regularTimestampSeries) At(index int) int64 {
func (t *regularTimestampSeries) At(index int) (int64, bool) {
time := t.start.Add(time.Duration(index) * t.step)
return int64(prommodel.TimeFromUnixNano(time.UnixNano()))
return int64(prommodel.TimeFromUnixNano(time.UnixNano())), true
}

0 comments on commit 135f3e2

Please sign in to comment.