Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Add tests for metadata ingestion/querying.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen Singh <harkishensingh@hotmail.com>
  • Loading branch information
Harkishen-Singh committed Jun 18, 2021
1 parent c3d10f5 commit 7bcb492
Show file tree
Hide file tree
Showing 12 changed files with 456 additions and 65 deletions.
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -3,6 +3,7 @@ module github.com/timescale/promscale
go 1.15

require (
github.com/Azure/azure-sdk-for-go v52.5.0+incompatible
github.com/NYTimes/gziphandler v1.1.1
github.com/armon/go-metrics v0.3.3 // indirect
github.com/blang/semver/v4 v4.0.0
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/write_test.go
Expand Up @@ -353,7 +353,7 @@ type mockInserter struct {
err error
}

func (m *mockInserter) Ingest(r *prompb.WriteRequest) (uint64, int, error) {
func (m *mockInserter) Ingest(r *prompb.WriteRequest) (uint64, uint64, error) {
m.ts = r.Timeseries
return m.result, 0, m.err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pgclient/client.go
Expand Up @@ -190,7 +190,7 @@ func (c *Client) Ingestor() *ingestor.DBIngestor {
}

// Ingest writes the timeseries object into the DB
func (c *Client) Ingest(r *prompb.WriteRequest) (uint64, int, error) {
func (c *Client) Ingest(r *prompb.WriteRequest) (uint64, uint64, error) {
return c.ingestor.Ingest(r)
}

Expand Down
20 changes: 11 additions & 9 deletions pkg/pgmodel/ingestor/ingestor.go
Expand Up @@ -62,7 +62,7 @@ func NewPgxIngestorForTests(conn pgxconn.PgxConn, cfg *Cfg) (*DBIngestor, error)
// tts the []Timeseries to insert
// req the WriteRequest backing tts. It will be added to our WriteRequest
// pool when it is no longer needed.
func (ingestor *DBIngestor) Ingest(r *prompb.WriteRequest) (uint64, int, error) {
func (ingestor *DBIngestor) Ingest(r *prompb.WriteRequest) (numSamples uint64, numMetadata uint64, err error) {
numTs := len(r.Timeseries)
numMeta := len(r.Metadata)
// WriteRequests can contain pointers into the original buffer we deserialized
Expand All @@ -72,6 +72,10 @@ func (ingestor *DBIngestor) Ingest(r *prompb.WriteRequest) (uint64, int, error)
// In order for this to work correctly, any data we wish to keep using (e.g.
// samples) must no longer be reachable from req.
defer FinishWriteRequest(r)
// todo: replace with switch
if numTs == 0 && numMeta == 0 {
return 0, 0, nil
}
if numTs > 0 && numMeta == 0 {
// Write request contains only time-series.
n, err := ingestor.ingestSamples(r)
Expand All @@ -82,7 +86,6 @@ func (ingestor *DBIngestor) Ingest(r *prompb.WriteRequest) (uint64, int, error)
n, err := ingestor.ingestMetadata(r)
return 0, n, err
}
fmt.Println("multi case")
// Write request contains both samples and metadata, hence we ingest concurrently.
type result struct {
id int8
Expand All @@ -97,12 +100,11 @@ func (ingestor *DBIngestor) Ingest(r *prompb.WriteRequest) (uint64, int, error)
}()
go func() {
n, err := ingestor.ingestMetadata(r)
res <- result{2, uint64(n), err}
res <- result{2, n, err}
}()
var (
err error
samplesRowsInserted uint64
metadataRowsInserted int
metadataRowsInserted uint64
)
mergeErr := func(prevErr, err error, message string) error {
if prevErr != nil {
Expand All @@ -117,7 +119,7 @@ func (ingestor *DBIngestor) Ingest(r *prompb.WriteRequest) (uint64, int, error)
samplesRowsInserted = response.numRows
err = mergeErr(err, response.err, "ingesting samples")
case 2:
metadataRowsInserted = int(response.numRows)
metadataRowsInserted = response.numRows
err = mergeErr(err, response.err, "ingesting metadata")
}
}
Expand Down Expand Up @@ -159,7 +161,7 @@ func (ingestor *DBIngestor) ingestSamples(r *prompb.WriteRequest) (uint64, error
return samplesRowsInserted, errSamples
}

func (ingestor *DBIngestor) ingestMetadata(r *prompb.WriteRequest) (int, error) {
func (ingestor *DBIngestor) ingestMetadata(r *prompb.WriteRequest) (uint64, error) {
metadataRows := len(r.Metadata)
metadata := make([]model.Metadata, len(r.Metadata))
for i := 0; i < metadataRows; i++ {
Expand All @@ -171,11 +173,11 @@ func (ingestor *DBIngestor) ingestMetadata(r *prompb.WriteRequest) (int, error)
Help: tmp.Help,
}
}
_, errMetadata := ingestor.metadataDispatcher.Insert(metadata)
rowsInserted, errMetadata := ingestor.metadataDispatcher.Insert(metadata)
if errMetadata != nil {
return 0, errMetadata
}
return metadataRows, nil
return rowsInserted, nil
}

// Parts of metric creation not needed to insert data
Expand Down
2 changes: 1 addition & 1 deletion pkg/pgmodel/ingestor/ingestor_interface.go
Expand Up @@ -11,5 +11,5 @@ import "github.com/timescale/promscale/pkg/prompb"
type DBInserter interface {
// Ingest takes an array of TimeSeries and attepts to store it into the database.
// Returns the number of metrics ingested and any error encountered before finishing.
Ingest(*prompb.WriteRequest) (uint64, int, error)
Ingest(*prompb.WriteRequest) (uint64, uint64, error)
}
121 changes: 103 additions & 18 deletions pkg/pgmodel/ingestor/ingestor_test.go
Expand Up @@ -18,8 +18,10 @@ func TestDBIngestorIngest(t *testing.T) {
testCases := []struct {
name string
metrics []prompb.TimeSeries
count uint64
metadata []prompb.MetricMetadata
countSamples uint64
countSeries int
countMetadata uint64
insertSeriesErr error
insertDataErr error
getSeriesErr error
Expand All @@ -41,8 +43,45 @@ func TestDBIngestorIngest(t *testing.T) {
},
},
},
count: 1,
countSeries: 1,
countSamples: 1,
countSeries: 1,
},
{
name: "One metadata",
metrics: []prompb.TimeSeries{},
metadata: []prompb.MetricMetadata{
{
MetricFamilyName: "random_metric",
Unit: "units",
Type: 1,
Help: "random test metric",
},
},
countMetadata: 1,
},
{
name: "One metric & one metadata",
metrics: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: model.MetricNameLabelName, Value: "test"},
},
Samples: []prompb.Sample{
{Timestamp: 1, Value: 0.1},
},
},
},
metadata: []prompb.MetricMetadata{
{
MetricFamilyName: "random_metric",
Unit: "units",
Type: 1,
Help: "random test metric",
},
},
countMetadata: 1,
countSamples: 1,
countSeries: 1,
},
{
name: "One metric, no sample",
Expand Down Expand Up @@ -77,8 +116,48 @@ func TestDBIngestorIngest(t *testing.T) {
},
},
},
count: 2,
countSeries: 2,
countSamples: 2,
countSeries: 2,
},
{
name: "Two metrics & two metadata",
metrics: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: model.MetricNameLabelName, Value: "test"},
{Name: "foo", Value: "bar"},
},
Samples: []prompb.Sample{
{Timestamp: 1, Value: 0.1},
},
},
{
Labels: []prompb.Label{
{Name: model.MetricNameLabelName, Value: "test"},
{Name: "test", Value: "test"},
},
Samples: []prompb.Sample{
{Timestamp: 1, Value: 0.1},
},
},
},
metadata: []prompb.MetricMetadata{
{
MetricFamilyName: "random_metric",
Unit: "units",
Type: 1,
Help: "random test metric",
},
{
MetricFamilyName: "random_metric_2",
Unit: "units",
Type: 2,
Help: "random test metric 2",
},
},
countMetadata: 2,
countSamples: 2,
countSeries: 2,
},
{
name: "Two samples",
Expand All @@ -94,8 +173,8 @@ func TestDBIngestorIngest(t *testing.T) {
},
},
},
count: 2,
countSeries: 1,
countSamples: 2,
countSeries: 1,
},
{
name: "Two metrics, one series",
Expand All @@ -119,8 +198,8 @@ func TestDBIngestorIngest(t *testing.T) {
},
},
},
count: 2,
countSeries: 1,
countSamples: 2,
countSeries: 1,
},
{
name: "Insert series error",
Expand All @@ -135,7 +214,7 @@ func TestDBIngestorIngest(t *testing.T) {
},
},
},
count: 0,
countSamples: 0,
countSeries: 1,
insertSeriesErr: fmt.Errorf("some error"),
},
Expand All @@ -152,7 +231,7 @@ func TestDBIngestorIngest(t *testing.T) {
},
},
},
count: 0,
countSamples: 0,
countSeries: 1,
insertDataErr: fmt.Errorf("some error"),
},
Expand All @@ -165,8 +244,8 @@ func TestDBIngestorIngest(t *testing.T) {
},
},
},
count: 0,
countSeries: 0,
countSamples: 0,
countSeries: 0,
},
}

Expand All @@ -179,13 +258,15 @@ func TestDBIngestorIngest(t *testing.T) {
InsertedSeries: make(map[string]model.SeriesID),
}
i := DBIngestor{
samplesDispatcher: &inserter,
sCache: sCache,
samplesDispatcher: &inserter,
metadataDispatcher: &inserter,
sCache: sCache,
}

wr := NewWriteRequest()
wr.Timeseries = c.metrics
count, _, err := i.Ingest(wr)
wr.Metadata = c.metadata
countSamples, countMetadata, err := i.Ingest(wr)

if err != nil {
if c.insertSeriesErr != nil && err != c.insertSeriesErr {
Expand All @@ -211,8 +292,12 @@ func TestDBIngestorIngest(t *testing.T) {
}
}

if count != c.count {
t.Errorf("invalid number of metrics inserted: got %d, want %d\n", count, c.count)
if countSamples != c.countSamples {
t.Errorf("invalid number of metrics inserted: got %d, want %d\n", countSamples, c.countSamples)
}

if countMetadata != c.countMetadata {
t.Errorf("invalid number of metadata inserted: got %d, want %d\n", countMetadata, c.countMetadata)
}

if c.countSeries != len(inserter.InsertedSeries) {
Expand Down
1 change: 1 addition & 0 deletions pkg/pgmodel/ingestor/insert_ctx.go
Expand Up @@ -33,6 +33,7 @@ func FinishWriteRequest(wr *prompb.WriteRequest) {
ts.XXX_unrecognized = nil
}
wr.Timeseries = wr.Timeseries[:0]
wr.Metadata = wr.Metadata[:0]
wr.XXX_unrecognized = nil
wrPool.Put(wr)
}

0 comments on commit 7bcb492

Please sign in to comment.