Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
REFACTOR: only pass SampleInfoIterator when needed
Browse files Browse the repository at this point in the history
We don't need to create a SampleInfoIterator until we're actually
passing data to pgx, until that point a regular slice will do.
  • Loading branch information
JLockerman committed Apr 8, 2020
1 parent b505854 commit 257bdb4
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 80 deletions.
61 changes: 4 additions & 57 deletions pkg/pgmodel/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package pgmodel
import (
"fmt"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
)

Expand All @@ -24,7 +23,7 @@ type SeriesID int64

// Inserter is responsible for inserting label, series and data into the storage.
type Inserter interface {
InsertNewData(newSeries []SeriesWithCallback, rows map[string]*SampleInfoIterator) (uint64, error)
InsertNewData(newSeries []SeriesWithCallback, rows map[string][]*samplesInfo) (uint64, error)
Close()
}

Expand All @@ -44,54 +43,6 @@ type samplesInfo struct {
samples []prompb.Sample
}

// SampleInfoIterator is an iterator over a collection of sampleInfos that returns
// data in the format expected for the data table row.
type SampleInfoIterator struct {
sampleInfos []*samplesInfo
sampleInfoIndex int
sampleIndex int
}

// NewSampleInfoIterator is the constructor
func NewSampleInfoIterator() *SampleInfoIterator {
return &SampleInfoIterator{sampleInfos: make([]*samplesInfo, 0), sampleIndex: -1, sampleInfoIndex: 0}
}

//Append adds a sample info to the back of the iterator
func (t *SampleInfoIterator) Append(s *samplesInfo) {
t.sampleInfos = append(t.sampleInfos, s)
}

// Next returns true if there is another row and makes the next row data
// available to Values(). When there are no more rows available or an error
// has occurred it returns false.
func (t *SampleInfoIterator) Next() bool {
t.sampleIndex++
if t.sampleInfoIndex < len(t.sampleInfos) && t.sampleIndex >= len(t.sampleInfos[t.sampleInfoIndex].samples) {
t.sampleInfoIndex++
t.sampleIndex = 0
}
return t.sampleInfoIndex < len(t.sampleInfos)
}

// Values returns the values for the current row
func (t *SampleInfoIterator) Values() ([]interface{}, error) {
info := t.sampleInfos[t.sampleInfoIndex]
sample := info.samples[t.sampleIndex]
row := []interface{}{
model.Time(sample.Timestamp).Time(),
sample.Value,
info.seriesID,
}
return row, nil
}

// Err returns any error that has been encountered by the CopyFromSource. If
// this is not nil *Conn.CopyFrom will abort the copy.
func (t *SampleInfoIterator) Err() error {
return nil
}

// DBIngestor ingest the TimeSeries data into Timescale database.
type DBIngestor struct {
cache Cache
Expand All @@ -113,9 +64,9 @@ func (i *DBIngestor) Ingest(tts []prompb.TimeSeries) (uint64, error) {
return rowsInserted, err
}

func (i *DBIngestor) parseData(tts []prompb.TimeSeries) ([]SeriesWithCallback, map[string]*SampleInfoIterator, int, error) {
func (i *DBIngestor) parseData(tts []prompb.TimeSeries) ([]SeriesWithCallback, map[string][]*samplesInfo, int, error) {
var seriesToInsert []SeriesWithCallback
dataSamples := make(map[string]*SampleInfoIterator)
dataSamples := make(map[string][]*samplesInfo, 0)
rows := 0

for _, t := range tts {
Expand Down Expand Up @@ -158,11 +109,7 @@ func (i *DBIngestor) parseData(tts []prompb.TimeSeries) ([]SeriesWithCallback, m
})
}

if _, ok := dataSamples[metricName]; !ok {
dataSamples[metricName] = NewSampleInfoIterator()
}

dataSamples[metricName].Append(&sample)
dataSamples[metricName] = append(dataSamples[metricName], &sample)
}

return seriesToInsert, dataSamples, rows, nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/pgmodel/ingestor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (m *mockCache) SetSeries(lset Labels, id SeriesID) error {

type mockInserter struct {
insertedSeries []SeriesWithCallback
insertedData []map[string]*SampleInfoIterator
insertedData []map[string][]*samplesInfo
insertSeriesErr error
insertDataErr error
}
Expand All @@ -45,7 +45,7 @@ func (m *mockInserter) Close() {

}

func (m *mockInserter) InsertNewData(newSeries []SeriesWithCallback, rows map[string]*SampleInfoIterator) (uint64, error) {
func (m *mockInserter) InsertNewData(newSeries []SeriesWithCallback, rows map[string][]*samplesInfo) (uint64, error) {

err := m.InsertSeries(newSeries)
if err != nil {
Expand All @@ -66,11 +66,11 @@ func (m *mockInserter) InsertSeries(seriesToInsert []SeriesWithCallback) error {
return m.insertSeriesErr
}

func (m *mockInserter) InsertData(rows map[string]*SampleInfoIterator) (uint64, error) {
func (m *mockInserter) InsertData(rows map[string][]*samplesInfo) (uint64, error) {
m.insertedData = append(m.insertedData, rows)
ret := 0
for _, data := range rows {
for _, si := range data.sampleInfos {
for _, si := range data {
ret += len(si.samples)
}
}
Expand Down
63 changes: 56 additions & 7 deletions pkg/pgmodel/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/prompb"
)
Expand Down Expand Up @@ -138,6 +139,54 @@ func (p *pgxConnImpl) SendBatch(ctx context.Context, b pgxBatch) (pgx.BatchResul
return conn.SendBatch(ctx, b.(*pgx.Batch)), nil
}

// SampleInfoIterator is an iterator over a collection of sampleInfos that returns
// data in the format expected for the data table row.
type SampleInfoIterator struct {
sampleInfos []*samplesInfo
sampleInfoIndex int
sampleIndex int
}

// NewSampleInfoIterator is the constructor
func NewSampleInfoIterator() *SampleInfoIterator {
return &SampleInfoIterator{sampleInfos: make([]*samplesInfo, 0), sampleIndex: -1, sampleInfoIndex: 0}
}

//Append adds a sample info to the back of the iterator
func (t *SampleInfoIterator) Append(s *samplesInfo) {
t.sampleInfos = append(t.sampleInfos, s)
}

// Next returns true if there is another row and makes the next row data
// available to Values(). When there are no more rows available or an error
// has occurred it returns false.
func (t *SampleInfoIterator) Next() bool {
t.sampleIndex++
if t.sampleInfoIndex < len(t.sampleInfos) && t.sampleIndex >= len(t.sampleInfos[t.sampleInfoIndex].samples) {
t.sampleInfoIndex++
t.sampleIndex = 0
}
return t.sampleInfoIndex < len(t.sampleInfos)
}

// Values returns the values for the current row
func (t *SampleInfoIterator) Values() ([]interface{}, error) {
info := t.sampleInfos[t.sampleInfoIndex]
sample := info.samples[t.sampleIndex]
row := []interface{}{
model.Time(sample.Timestamp).Time(),
sample.Value,
info.seriesID,
}
return row, nil
}

// Err returns any error that has been encountered by the CopyFromSource. If
// this is not nil *Conn.CopyFrom will abort the copy.
func (t *SampleInfoIterator) Err() error {
return nil
}

// NewPgxIngestorWithMetricCache returns a new Ingestor that uses connection pool and a metrics cache
// for caching metric table names.
func NewPgxIngestorWithMetricCache(c *pgxpool.Pool, cache MetricCache) *DBIngestor {
Expand Down Expand Up @@ -182,7 +231,7 @@ func (p *pgxInserter) Close() {
})
}

func (p *pgxInserter) InsertNewData(newSeries []SeriesWithCallback, rows map[string]*SampleInfoIterator) (uint64, error) {
func (p *pgxInserter) InsertNewData(newSeries []SeriesWithCallback, rows map[string][]*samplesInfo) (uint64, error) {
err := p.InsertSeries(newSeries)
if err != nil {
return 0, err
Expand Down Expand Up @@ -260,7 +309,7 @@ func (p *pgxInserter) InsertSeries(seriesToInsert []SeriesWithCallback) error {
}

type insertDataRequest struct {
data *SampleInfoIterator
data []*samplesInfo
finished *sync.WaitGroup
errChan chan error
}
Expand All @@ -270,7 +319,7 @@ type insertDataTask struct {
errChan chan error
}

func (p *pgxInserter) InsertData(rows map[string]*SampleInfoIterator) (uint64, error) {
func (p *pgxInserter) InsertData(rows map[string][]*samplesInfo) (uint64, error) {
// check that all the metrics are valid, we don't want to deadlock waiting
// on an Insert to a metric that does not exist
for metricName := range rows {
Expand All @@ -285,7 +334,7 @@ func (p *pgxInserter) InsertData(rows map[string]*SampleInfoIterator) (uint64, e
workFinished.Add(len(rows))
errChan := make(chan error, 1)
for metricName, data := range rows {
for _, si := range data.sampleInfos {
for _, si := range data {
numRows += uint64(len(si.samples))
}
p.insertMetricData(metricName, data, workFinished, errChan)
Expand All @@ -301,7 +350,7 @@ func (p *pgxInserter) InsertData(rows map[string]*SampleInfoIterator) (uint64, e
return numRows, err
}

func (p *pgxInserter) insertMetricData(metric string, data *SampleInfoIterator, finished *sync.WaitGroup, errChan chan error) {
func (p *pgxInserter) insertMetricData(metric string, data []*samplesInfo, finished *sync.WaitGroup, errChan chan error) {
inserter := p.getMetricTableInserter(metric)
inserter <- insertDataRequest{data: data, finished: finished, errChan: errChan}
}
Expand Down Expand Up @@ -391,14 +440,14 @@ func (p *pgxInserter) metricTableInsert(metric string, input chan insertDataRequ
return
}
needsResponse = append(needsResponse, insertDataTask{finished: req.finished, errChan: req.errChan})
batch.sampleInfos = append(batch.sampleInfos, req.data.sampleInfos...)
batch.sampleInfos = append(batch.sampleInfos, req.data...)

TryReadMore:
for len(batch.sampleInfos) < batchSize {
select {
case req := <-input:
needsResponse = append(needsResponse, insertDataTask{finished: req.finished, errChan: req.errChan})
batch.sampleInfos = append(batch.sampleInfos, req.data.sampleInfos...)
batch.sampleInfos = append(batch.sampleInfos, req.data...)
default:
break TryReadMore
}
Expand Down
22 changes: 10 additions & 12 deletions pkg/pgmodel/pgx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type mockPGXConn struct {
QueryErr map[int]error // Mapping query call to error response.
CopyFromTableName []pgx.Identifier
CopyFromColumns [][]string
CopyFromRowSource []pgx.CopyFromSource
CopyFromRowSource [][]*samplesInfo
CopyFromResult int64
CopyFromError error
CopyFromRowsRows [][]interface{}
Expand Down Expand Up @@ -77,9 +77,9 @@ func (m *mockPGXConn) CopyFrom(ctx context.Context, tableName pgx.Identifier, co
m.CopyFromTableName = append(m.CopyFromTableName, tableName)
m.CopyFromColumns = append(m.CopyFromColumns, columnNames)
src := rowSrc.(*SampleInfoIterator)
rows := SampleInfoIterator{sampleIndex: -1}
rows.sampleInfos = append(rows.sampleInfos, src.sampleInfos...)
m.CopyFromRowSource = append(m.CopyFromRowSource, &rows)
rows := make([]*samplesInfo, 0, len(src.sampleInfos))
rows = append(rows, src.sampleInfos...)
m.CopyFromRowSource = append(m.CopyFromRowSource, rows)
return m.CopyFromResult, m.CopyFromError
}

Expand Down Expand Up @@ -450,12 +450,12 @@ func TestPGXInserterInsertSeries(t *testing.T) {
}
}

func createRows(x int) map[string]*SampleInfoIterator {
func createRows(x int) map[string][]*samplesInfo {
return createRowsByMetric(x, 1)
}

func createRowsByMetric(x int, metricCount int) map[string]*SampleInfoIterator {
ret := make(map[string]*SampleInfoIterator)
func createRowsByMetric(x int, metricCount int) map[string][]*samplesInfo {
ret := make(map[string][]*samplesInfo, 0)
i := 0

metrics := make([]string, 0, metricCount)
Expand All @@ -469,10 +469,8 @@ func createRowsByMetric(x int, metricCount int) map[string]*SampleInfoIterator {

for i < x {
metricIndex := i % metricCount
if _, ok := ret[metrics[metricIndex]]; !ok {
ret[metrics[metricIndex]] = NewSampleInfoIterator()
}
ret[metrics[metricIndex]].Append(&samplesInfo{})

ret[metrics[metricIndex]] = append(ret[metrics[metricIndex]], &samplesInfo{})
i++
}
return ret
Expand All @@ -481,7 +479,7 @@ func createRowsByMetric(x int, metricCount int) map[string]*SampleInfoIterator {
func TestPGXInserterInsertData(t *testing.T) {
testCases := []struct {
name string
rows map[string]*SampleInfoIterator
rows map[string][]*samplesInfo
queryNoRows bool
queryErr map[int]error
copyFromResult int64
Expand Down

0 comments on commit 257bdb4

Please sign in to comment.