Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Log time consumed by SQL queries in debug level & drop HA debug logs …
Browse files Browse the repository at this point in the history
…noise
  • Loading branch information
VineethReddy02 committed Jun 22, 2021
1 parent d07e837 commit b52954f
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 16 deletions.
3 changes: 0 additions & 3 deletions pkg/ha/filter.go
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/prometheus/common/model"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/prompb"
)

Expand Down Expand Up @@ -60,7 +59,6 @@ func (h *Filter) Process(_ *http.Request, wr *prompb.WriteRequest) error {
if !minT.Before(leaseStart) {
if !allowInsert {
wr.Timeseries = wr.Timeseries[:0]
log.Debug("msg", "the samples aren't from the leader prom instance. skipping the insert", "replica", replicaName)
}
return nil
}
Expand All @@ -80,7 +78,6 @@ func (h *Filter) Process(_ *http.Request, wr *prompb.WriteRequest) error {
case !hasBackfill && !allowInsert:
// No data to insert.
wr.Timeseries = wr.Timeseries[:0]
log.Debug("msg", "the samples aren't from the leader prom instance. skipping the insert", "replica", replicaName)
default:
// This case covers the instance when we have backfill and data in current lease to ingest.
// The data has already been filtered out so there is nothing to do.
Expand Down
2 changes: 1 addition & 1 deletion pkg/pgclient/client.go
Expand Up @@ -9,7 +9,7 @@ import (
"fmt"
"net/url"

pgx "github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/timescale/promscale/pkg/ha"
"github.com/timescale/promscale/pkg/log"
Expand Down
3 changes: 1 addition & 2 deletions pkg/pgmodel/metadata/metadata.go
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"fmt"

"github.com/jackc/pgx/v4"
"github.com/timescale/promscale/pkg/pgmodel/common/schema"
"github.com/timescale/promscale/pkg/pgmodel/model"
"github.com/timescale/promscale/pkg/pgxconn"
Expand All @@ -17,7 +16,7 @@ import (
// MetricQuery returns metadata corresponding to metric or metric_family.
func MetricQuery(conn pgxconn.PgxConn, metric string, limit int) (map[string][]model.Metadata, error) {
var (
rows pgx.Rows
rows pgxconn.PgxRows
err error
)
if metric != "" {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pgmodel/model/sql_test_utils.go
Expand Up @@ -70,7 +70,7 @@ func (r *SqlRecorder) Exec(ctx context.Context, sql string, arguments ...interfa
return results[0][0].(pgconn.CommandTag), err
}

func (r *SqlRecorder) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) {
func (r *SqlRecorder) Query(ctx context.Context, sql string, args ...interface{}) (pgxconn.PgxRows, error) {
r.lock.Lock()
defer r.lock.Unlock()
rows, err := r.checkQuery(sql, args...)
Expand Down
5 changes: 2 additions & 3 deletions pkg/pgmodel/querier/querier.go
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/jackc/pgconn"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
Expand Down Expand Up @@ -254,7 +253,7 @@ func (q *pgxQuerier) queryMultipleMetrics(filter metricTimeRangeFilter, cases []
defer batchResults.Close()

for i := 0; i < numQueries; i++ {
rows, err = batchResults.Query()
rows, err := batchResults.Query()
if err != nil {
rows.Close()
return nil, nil, err
Expand Down Expand Up @@ -325,7 +324,7 @@ func (q *pgxQuerier) queryMetricTableName(metric string) (string, error) {

// appendTsRows adds new results rows to already existing result rows and
// returns the as a result.
func appendTsRows(out []timescaleRow, in pgx.Rows) ([]timescaleRow, error) {
func appendTsRows(out []timescaleRow, in pgxconn.PgxRows) ([]timescaleRow, error) {
if in.Err() != nil {
return out, in.Err()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pgmodel/querier/query_builder.go
Expand Up @@ -423,7 +423,7 @@ func getAggregators(hints *storage.SelectHints, path []parser.Node) (*aggregator
return &qf, nil, nil
}

func GetSeriesPerMetric(rows pgx.Rows) ([]string, [][]pgmodel.SeriesID, error) {
func GetSeriesPerMetric(rows pgxconn.PgxRows) ([]string, [][]pgmodel.SeriesID, error) {
metrics := make([]string, 0)
series := make([][]pgmodel.SeriesID, 0)

Expand Down
62 changes: 57 additions & 5 deletions pkg/pgxconn/pgx_conn.go
Expand Up @@ -6,10 +6,13 @@ package pgxconn

import (
"context"
"fmt"
"time"

"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/timescale/promscale/pkg/log"
)

type PgxBatch interface {
Expand All @@ -19,14 +22,21 @@ type PgxBatch interface {
type PgxConn interface {
Close()
Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
Query(ctx context.Context, sql string, args ...interface{}) (PgxRows, error)
QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)
CopyFromRows(rows [][]interface{}) pgx.CopyFromSource
NewBatch() PgxBatch
SendBatch(ctx context.Context, b PgxBatch) (pgx.BatchResults, error)
}

type PgxRows interface {
Next() bool
Scan(...interface{}) error
Err() error
Close()
}

func NewPgxConn(pool *pgxpool.Pool) PgxConn {
return &connImpl{
Conn: pool,
Expand All @@ -37,21 +47,63 @@ type connImpl struct {
Conn *pgxpool.Pool
}

type pgxRows struct {
pgx.Rows
sqlQuery string
args []interface{}
startTime time.Time
loggged bool
}

func (p *connImpl) Close() {
conn := p.Conn
p.Conn = nil
conn.Close()
}

func (p *connImpl) Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) {
return p.Conn.Exec(ctx, sql, arguments...)
func (p *pgxRows) Next() bool {
if !p.loggged {
p.loggged = true
LogQueryStats(p.sqlQuery, p.startTime, p.args)()
}
return p.Rows.Next()
}

func (p *pgxRows) Scan(dest ...interface{}) error {
return p.Rows.Scan(dest...)
}

func (p *pgxRows) Err() error {
return p.Rows.Err()
}

func (p *pgxRows) Close() {
p.Rows.Close()
}

// calc SQL query execution time
func LogQueryStats(sql string, startTime time.Time, args ...interface{}) func() {
if startTime.IsZero() {
startTime = time.Now()
}
return func() {
log.Debug("msg", fmt.Sprintf("time taken by SQL query: %s with args: %v is %v", sql, args, time.Since(startTime)))
}
}

func (p *connImpl) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error) {
defer LogQueryStats(sql, time.Time{}, args...)()
return p.Conn.Exec(ctx, sql, args...)
}

func (p *connImpl) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) {
return p.Conn.Query(ctx, sql, args...)
func (p *connImpl) Query(ctx context.Context, sql string, args ...interface{}) (PgxRows, error) {
startTime := time.Now()
rows, err := p.Conn.Query(ctx, sql, args...)
return &pgxRows{Rows: rows, sqlQuery: sql, args: args, startTime: startTime}, err
}

func (p *connImpl) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row {
defer LogQueryStats(sql, time.Time{}, args...)()
return p.Conn.QueryRow(ctx, sql, args...)
}

Expand Down

0 comments on commit b52954f

Please sign in to comment.