Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Fix bad check for error in MetricTableName
Browse files Browse the repository at this point in the history
Previously, bad error checking logic in MetricTableName could
have returned a blank table name on error which eventually
led to the following postgres error:

"ERROR: zero-length delimited identifier at or near \"\"\"\" (SQLSTATE 42601)"

As reported in issue #668

This fixes that and adds defensive checks to avoid blank table name.

It's not clear what the underlying error was though.
  • Loading branch information
cevian committed Jun 25, 2021
1 parent d07e837 commit 89df3c2
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 48 deletions.
29 changes: 27 additions & 2 deletions pkg/pgmodel/ingestor/ingestor_sql_test.go
Expand Up @@ -736,7 +736,8 @@ func TestPGXInserterInsertData(t *testing.T) {
},
},
{
name: "Metrics get error",
//cache errors get recovered from and the insert succeeds
name: "Metrics cache get error",
rows: map[string][]model.Samples{
"metric_0": {
model.NewPromSample(makeLabel(), make([]prompb.Sample, 1)),
Expand All @@ -746,6 +747,29 @@ func TestPGXInserterInsertData(t *testing.T) {
sqlQueries: []model.SqlQuery{
{Sql: "SELECT 'prom_api.label_array'::regtype::oid", Results: model.RowResults{{uint32(434)}}},
{Sql: "CALL _prom_catalog.finalize_metric_creation()"},
{
Sql: "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)",
Args: []interface{}{"metric_0"},
Results: model.RowResults{{"metric_0", true}},
Err: error(nil),
},
{
Sql: "SELECT _prom_catalog.insert_metric_row($1, $2::TIMESTAMPTZ[], $3::DOUBLE PRECISION[], $4::BIGINT[])",
Args: []interface{}{
"metric_0",
[]time.Time{time.Unix(0, 0)},
[]float64{0},
[]int64{1},
},
Results: model.RowResults{{int64(1)}},
Err: error(nil),
},
{
Sql: "SELECT CASE current_epoch > $1::BIGINT + 1 WHEN true THEN _prom_catalog.epoch_abort($1) END FROM _prom_catalog.ids_epoch LIMIT 1",
Args: []interface{}{int64(1)},
Results: model.RowResults{{[]byte{}}},
Err: error(nil),
},
},
},
}
Expand Down Expand Up @@ -773,7 +797,8 @@ func TestPGXInserterInsertData(t *testing.T) {

switch {
case c.metricsGetErr != nil:
expErr = c.metricsGetErr
//cache errors recover
expErr = nil
case c.name == "Can't find/create table in DB":
expErr = pgmodelErrs.ErrMissingTableName
default:
Expand Down
54 changes: 47 additions & 7 deletions pkg/pgmodel/ingestor/metric_batcher.go
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/jackc/pgtype"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgmodel/cache"
"github.com/timescale/promscale/pkg/pgmodel/common/errors"
"github.com/timescale/promscale/pkg/pgmodel/common/schema"
Expand All @@ -18,7 +19,10 @@ import (
"github.com/timescale/promscale/pkg/pgxconn"
)

const seriesInsertSQL = "SELECT (_prom_catalog.get_or_create_series_id_for_label_array($1, l.elem)).series_id, l.nr FROM unnest($2::prom_api.label_array[]) WITH ORDINALITY l(elem, nr) ORDER BY l.elem"
const (
seriesInsertSQL = "SELECT (_prom_catalog.get_or_create_series_id_for_label_array($1, l.elem)).series_id, l.nr FROM unnest($2::prom_api.label_array[]) WITH ORDINALITY l(elem, nr) ORDER BY l.elem"
getCreateMetricsTableWithNewSQL = "SELECT table_name, possibly_new FROM " + schema.Catalog + ".get_or_create_metric_table_name($1)"
)

type metricBatcher struct {
conn pgxconn.PgxConn
Expand All @@ -29,15 +33,51 @@ type metricBatcher struct {
labelArrayOID uint32
}

func metricTableName(conn pgxconn.PgxConn, metric string) (string, bool, error) {
res, err := conn.Query(
context.Background(),
getCreateMetricsTableWithNewSQL,
metric,
)

if err != nil {
return "", true, fmt.Errorf("failed to get the table name for metric %s: %w", metric, err)
}

var tableName string
var possiblyNew bool
defer res.Close()
if !res.Next() {
if err := res.Err(); err != nil {
return "", true, fmt.Errorf("failed to get the table name for metric %s: %w", metric, err)
}
return "", true, errors.ErrMissingTableName
}

if err := res.Scan(&tableName, &possiblyNew); err != nil {
return "", true, fmt.Errorf("failed to get the table name for metric %s: %w", metric, err)
}

if err := res.Err(); err != nil {
return "", true, fmt.Errorf("failed to get the table name for metric %s: %w", metric, err)
}

if tableName == "" {
return "", true, fmt.Errorf("failed to get the table name for metric %s: empty table name returned", metric)
}

return tableName, possiblyNew, nil
}

// Create the metric table for the metric we handle, if it does not already
// exist. This only does the most critical part of metric table creation, the
// rest is handled by completeMetricTableCreation().
func initializeMetricBatcher(conn pgxconn.PgxConn, metricName string, completeMetricCreationSignal chan struct{}, metricTableNames cache.MetricCache) (tableName string, err error) {
tableName, err = metricTableNames.Get(metricName)
if err == errors.ErrEntryNotFound {
if err != nil || tableName == "" {
var possiblyNew bool
tableName, possiblyNew, err = model.MetricTableName(conn, metricName)
if err != nil {
tableName, possiblyNew, err = metricTableName(conn, metricName)
if err != nil || tableName == "" {
return "", err
}

Expand All @@ -51,8 +91,6 @@ func initializeMetricBatcher(conn pgxconn.PgxConn, metricName string, completeMe
default:
}
}
} else if err != nil {
return "", err
}
return tableName, err
}
Expand All @@ -72,7 +110,9 @@ func runMetricBatcher(conn pgxconn.PgxConn,
var err error
tableName, err = initializeMetricBatcher(conn, metricName, completeMetricCreationSignal, metricTableNames)
if err != nil {
firstReq.reportResult(fmt.Errorf("initializing the insert routine has failed with %w", err))
err := fmt.Errorf("initializing the insert routine for metric %v has failed with %w", metricName, err)
log.Error("msg", err)
firstReq.reportResult(err)
} else {
firstReqSet = true
break
Expand Down
78 changes: 78 additions & 0 deletions pkg/pgmodel/ingestor/metric_batcher_test.go
@@ -0,0 +1,78 @@
package ingestor

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
"github.com/timescale/promscale/pkg/pgmodel/model"
)

func TestMetricTableName(t *testing.T) {
testCases := []struct {
name string
tableName string
errExpected bool
sqlQueries []model.SqlQuery
}{
{
name: "no error",
tableName: "res1",
sqlQueries: []model.SqlQuery{
{
Sql: "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)",
Args: []interface{}{"t1"},
Results: model.RowResults{{"res1", true}},
},
},
},
{
name: "no error2",
tableName: "res2",
sqlQueries: []model.SqlQuery{
{
Sql: "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)",
Args: []interface{}{"t1"},
Results: model.RowResults{{"res2", true}},
},
},
},
{
name: "error",
tableName: "res1",
errExpected: true,
sqlQueries: []model.SqlQuery{
{
Sql: "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)",
Args: []interface{}{"t1"},
Err: fmt.Errorf("test"),
},
},
},
{
name: "empty table name",
tableName: "res2",
errExpected: true,
sqlQueries: []model.SqlQuery{
{
Sql: "SELECT table_name, possibly_new FROM _prom_catalog.get_or_create_metric_table_name($1)",
Args: []interface{}{"t1"},
Results: model.RowResults{{"", true}},
},
},
},
}

for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
mock := model.NewSqlRecorder(c.sqlQueries, t)

name, _, err := metricTableName(mock, "t1")
require.Equal(t, c.errExpected, err != nil)

if err == nil {
require.Equal(t, c.tableName, name)
}
})
}
}
38 changes: 1 addition & 37 deletions pkg/pgmodel/model/interface.go
Expand Up @@ -5,21 +5,13 @@
package model

import (
"context"
"math"
"time"

"github.com/jackc/pgtype"
"github.com/timescale/promscale/pkg/pgmodel/common/errors"
"github.com/timescale/promscale/pkg/pgmodel/common/schema"

"github.com/timescale/promscale/pkg/pgxconn"
)

const (
MetricNameLabelName = "__name__"
getCreateMetricsTableWithNewSQL = "SELECT table_name, possibly_new FROM " + schema.Catalog + ".get_or_create_metric_table_name($1)"
)
const MetricNameLabelName = "__name__"

var (
MinTime = time.Unix(math.MinInt64/1000+62135596801, 0).UTC()
Expand All @@ -34,34 +26,6 @@ type Dispatcher interface {
Close()
}

func MetricTableName(conn pgxconn.PgxConn, metric string) (string, bool, error) {
res, err := conn.Query(
context.Background(),
getCreateMetricsTableWithNewSQL,
metric,
)

if err != nil {
return "", true, err
}

var tableName string
var possiblyNew bool
defer res.Close()
if !res.Next() {
if res.Err() != nil {
return "", true, err
}
return "", true, errors.ErrMissingTableName
}

if err := res.Scan(&tableName, &possiblyNew); err != nil {
return "", true, err
}

return tableName, possiblyNew, nil
}

func TimestamptzToMs(t pgtype.Timestamptz) int64 {
switch t.InfinityModifier {
case pgtype.NegativeInfinity:
Expand Down
4 changes: 2 additions & 2 deletions pkg/tests/end_to_end_tests/concurrent_sql_test.go
Expand Up @@ -23,11 +23,11 @@ import (
func testConcurrentMetricTable(t testing.TB, db *pgxpool.Pool, metricName string) int64 {
var id *int64
var name *string
err := db.QueryRow(context.Background(), "SELECT id, table_name FROM _prom_catalog.create_metric_table($1)", metricName).Scan(&id, &name)
err := db.QueryRow(context.Background(), "SELECT id, table_name FROM _prom_catalog.get_or_create_metric_table_name($1)", metricName).Scan(&id, &name)
if err != nil {
t.Fatal(err)
}
if id == nil || name == nil {
if id == nil || name == nil || *name == "" {
t.Fatalf("NULL found")
}
return *id
Expand Down

0 comments on commit 89df3c2

Please sign in to comment.