From e7eae3149e92f104d2f096375762cbae4a5aef3a Mon Sep 17 00:00:00 2001 From: Harkishen-Singh Date: Tue, 22 Feb 2022 17:13:34 +0530 Subject: [PATCH] Add database metrics into pgxconn package. Signed-off-by: Harkishen-Singh This commit updates the implementation of pgx in pkg/pgxconn package and implements the following database metrics: 1. promscale_database_requests_total 2. promscale_database_request_errors_total 3. promscale_database_request_duration_seconds --- go.sum | 2 - pkg/pgclient/client.go | 58 +++++++++++++++++++++++++-- pkg/pgxconn/pgx_conn.go | 88 +++++++++++++++++++++++++++++++++++++++-- 3 files changed, 140 insertions(+), 8 deletions(-) diff --git a/go.sum b/go.sum index 7f0911ca78..cae3a00ca3 100644 --- a/go.sum +++ b/go.sum @@ -1214,8 +1214,6 @@ github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08 github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM= github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc= github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= -github.com/jackc/pgx/v4 v4.15.0 h1:B7dTkXsdILD3MF987WGGCcg+tvLW6bZJdEcqVFeU//w= -github.com/jackc/pgx/v4 v4.15.0/go.mod h1:D/zyOyXiaM1TmVWnOM18p0xdDtdakRBa0RsVGI3U3bw= github.com/jackc/pgx/v4 v4.15.1-0.20220219175125-b6b24f9e8a5d h1:8r5Lurk3Ur6K9Tz94Zz3kekJLZ8iVAG/GWHNqtLlmnw= github.com/jackc/pgx/v4 v4.15.1-0.20220219175125-b6b24f9e8a5d/go.mod h1:D/zyOyXiaM1TmVWnOM18p0xdDtdakRBa0RsVGI3U3bw= github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= diff --git a/pkg/pgclient/client.go b/pkg/pgclient/client.go index e36be50b72..95f4ab4148 100644 --- a/pkg/pgclient/client.go +++ b/pkg/pgclient/client.go @@ -8,10 +8,15 @@ import ( "context" "fmt" "net/url" + "os" + "time" "github.com/google/uuid" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" + "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/collector/model/pdata" + "github.com/timescale/promscale/pkg/ha" "github.com/timescale/promscale/pkg/log" "github.com/timescale/promscale/pkg/pgmodel/cache" @@ -25,7 +30,7 @@ import ( "github.com/timescale/promscale/pkg/query" "github.com/timescale/promscale/pkg/telemetry" "github.com/timescale/promscale/pkg/tenancy" - "go.opentelemetry.io/collector/model/pdata" + "github.com/timescale/promscale/pkg/util" ) var PromscaleID uuid.UUID @@ -55,6 +60,7 @@ type Client struct { sigClose chan struct{} haService *ha.Service TelemetryEngine telemetry.Engine + stopHealthChecker context.CancelFunc } // NewClient creates a new PostgreSQL client @@ -179,19 +185,22 @@ func NewClientWithPool(cfg *Config, numCopiers int, connPool *pgxpool.Pool, mt t } } - healthChecker := health.NewHealthChecker(dbConn) + healthCheckerCtx, stopHealthChecker := context.WithCancel(context.Background()) + healthCheckRoutine(healthCheckerCtx, dbConn) + client := &Client{ Connection: dbConn, QuerierConnection: dbQuerierConn, ingestor: dbIngestor, querier: dbQuerier, - healthCheck: healthChecker, + healthCheck: health.NewHealthChecker(dbConn), queryable: queryable, metricCache: metricsCache, labelsCache: labelsCache, seriesCache: seriesCache, sigClose: sigClose, TelemetryEngine: telemetryEngine, + stopHealthChecker: stopHealthChecker, } InitClientMetrics(client) @@ -201,6 +210,9 @@ func NewClientWithPool(cfg *Config, numCopiers int, connPool *pgxpool.Pool, mt t // Close closes the client and performs cleanup func (c *Client) Close() { log.Info("msg", "Shutting down Client") + if c.stopHealthChecker != nil { + c.stopHealthChecker() + } if c.TelemetryEngine != nil { c.TelemetryEngine.Stop() } @@ -297,3 +309,43 @@ func observeStatementCacheState(conn *pgx.Conn) bool { statementCacheLen.Observe(float64(statementCacheSize)) return true } + +func healthCheckRoutine(ctx context.Context, conn pgxconn.PgxConn) { + r := prometheus.DefaultRegisterer + if env := os.Getenv("IS_TEST"); env == "true" { + r = prometheus.NewRegistry() + } + dbHealthChecks := prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: util.PromNamespace, + Subsystem: "database", + Name: "health_checks_total", + Help: "Total number of database health checks performed.", + }, + ) + dbHealthErrors := prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: util.PromNamespace, + Subsystem: "database", + Name: "health_check_errors_total", + Help: "Total number of database health check errors.", + }, + ) + r.MustRegister(dbHealthChecks, dbHealthErrors) + go func() { + check := time.NewTicker(time.Minute) + defer check.Stop() + connection := health.NewHealthChecker(conn) + for { + select { + case <-ctx.Done(): + return + case <-check.C: + } + dbHealthChecks.Inc() + if err := connection(); err != nil { + dbHealthErrors.Inc() + } + } + }() +} diff --git a/pkg/pgxconn/pgx_conn.go b/pkg/pgxconn/pgx_conn.go index b22b1fbb4e..f8a64d4a50 100644 --- a/pkg/pgxconn/pgx_conn.go +++ b/pkg/pgxconn/pgx_conn.go @@ -13,9 +13,47 @@ import ( "github.com/jackc/pgconn" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" + "github.com/prometheus/client_golang/prometheus" + "github.com/timescale/promscale/pkg/log" + "github.com/timescale/promscale/pkg/util" +) + +var ( + requestTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: util.PromNamespace, + Subsystem: "database", + Name: "requests_total", + Help: "Total number of database requests.", + }, []string{"method"}, + ) + errorsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: util.PromNamespace, + Subsystem: "database", + Name: "request_errors_total", + Help: "Total number of database request errors.", + }, []string{"method"}, + ) + duration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: util.PromNamespace, + Subsystem: "database", + Name: "requests_duration_seconds", + Help: "Time taken to complete a database request.", + }, []string{"method"}, + ) ) +func init() { + prometheus.MustRegister(requestTotal, errorsTotal, duration) +} + +func getLabelSet(method string) prometheus.Labels { + return prometheus.Labels{"method": method} +} + type PgxBatch interface { Queue(query string, arguments ...interface{}) Len() int @@ -109,15 +147,53 @@ func (p *connImpl) Close() { } func (p *connImpl) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error) { - return p.Conn.Exec(ctx, sql, args...) + lset := getLabelSet("exec") + requestTotal.With(lset).Inc() + start := time.Now() + defer func() { + duration.With(lset).Observe(time.Since(start).Seconds()) + }() + tag, err := p.Conn.Exec(ctx, sql, args...) + if err != nil { + errorsTotal.With(lset).Inc() + } + return tag, err } func (p *connImpl) Query(ctx context.Context, sql string, args ...interface{}) (PgxRows, error) { - return p.Conn.Query(ctx, sql, args...) + lset := getLabelSet("query") + requestTotal.With(lset).Inc() + start := time.Now() + defer func() { + duration.With(lset).Observe(time.Since(start).Seconds()) + }() + rows, err := p.Conn.Query(ctx, sql, args...) + if err != nil { + errorsTotal.With(lset).Inc() + } + return rows, err } func (p *connImpl) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row { - return p.Conn.QueryRow(ctx, sql, args...) + lset := getLabelSet("query_row") + requestTotal.With(lset).Inc() + start := time.Now() + defer func() { + duration.With(lset).Observe(time.Since(start).Seconds()) + }() + return &rowWrapper{p.Conn.QueryRow(ctx, sql, args...)} +} + +type rowWrapper struct { + r pgx.Row +} + +func (w *rowWrapper) Scan(dest ...interface{}) error { + err := w.r.Scan(dest...) + if err != nil { + errorsTotal.With(getLabelSet("query_row")).Inc() + } + return err } func (p *connImpl) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) { @@ -133,6 +209,12 @@ func (p *connImpl) NewBatch() PgxBatch { } func (p *connImpl) SendBatch(ctx context.Context, b PgxBatch) (pgx.BatchResults, error) { + lset := getLabelSet("send_batch") + requestTotal.With(lset).Inc() + start := time.Now() + defer func() { + duration.With(lset).Observe(time.Since(start).Seconds()) + }() return p.Conn.SendBatch(ctx, b.(*pgx.Batch)), nil }