Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Add database metrics into pgxconn package.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>

This commit updates the implementation of pgx in pkg/pgxconn
package and implements the following database metrics:
1. promscale_database_requests_total
2. promscale_database_request_errors_total
3. promscale_database_request_duration_seconds
  • Loading branch information
Harkishen-Singh committed Feb 28, 2022
1 parent 71acec0 commit e7eae31
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 8 deletions.
2 changes: 0 additions & 2 deletions go.sum
Expand Up @@ -1214,8 +1214,6 @@ github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08
github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM=
github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc=
github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs=
github.com/jackc/pgx/v4 v4.15.0 h1:B7dTkXsdILD3MF987WGGCcg+tvLW6bZJdEcqVFeU//w=
github.com/jackc/pgx/v4 v4.15.0/go.mod h1:D/zyOyXiaM1TmVWnOM18p0xdDtdakRBa0RsVGI3U3bw=
github.com/jackc/pgx/v4 v4.15.1-0.20220219175125-b6b24f9e8a5d h1:8r5Lurk3Ur6K9Tz94Zz3kekJLZ8iVAG/GWHNqtLlmnw=
github.com/jackc/pgx/v4 v4.15.1-0.20220219175125-b6b24f9e8a5d/go.mod h1:D/zyOyXiaM1TmVWnOM18p0xdDtdakRBa0RsVGI3U3bw=
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
Expand Down
58 changes: 55 additions & 3 deletions pkg/pgclient/client.go
Expand Up @@ -8,10 +8,15 @@ import (
"context"
"fmt"
"net/url"
"os"
"time"

"github.com/google/uuid"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/collector/model/pdata"

"github.com/timescale/promscale/pkg/ha"
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgmodel/cache"
Expand All @@ -25,7 +30,7 @@ import (
"github.com/timescale/promscale/pkg/query"
"github.com/timescale/promscale/pkg/telemetry"
"github.com/timescale/promscale/pkg/tenancy"
"go.opentelemetry.io/collector/model/pdata"
"github.com/timescale/promscale/pkg/util"
)

var PromscaleID uuid.UUID
Expand Down Expand Up @@ -55,6 +60,7 @@ type Client struct {
sigClose chan struct{}
haService *ha.Service
TelemetryEngine telemetry.Engine
stopHealthChecker context.CancelFunc
}

// NewClient creates a new PostgreSQL client
Expand Down Expand Up @@ -179,19 +185,22 @@ func NewClientWithPool(cfg *Config, numCopiers int, connPool *pgxpool.Pool, mt t
}
}

healthChecker := health.NewHealthChecker(dbConn)
healthCheckerCtx, stopHealthChecker := context.WithCancel(context.Background())
healthCheckRoutine(healthCheckerCtx, dbConn)

client := &Client{
Connection: dbConn,
QuerierConnection: dbQuerierConn,
ingestor: dbIngestor,
querier: dbQuerier,
healthCheck: healthChecker,
healthCheck: health.NewHealthChecker(dbConn),
queryable: queryable,
metricCache: metricsCache,
labelsCache: labelsCache,
seriesCache: seriesCache,
sigClose: sigClose,
TelemetryEngine: telemetryEngine,
stopHealthChecker: stopHealthChecker,
}

InitClientMetrics(client)
Expand All @@ -201,6 +210,9 @@ func NewClientWithPool(cfg *Config, numCopiers int, connPool *pgxpool.Pool, mt t
// Close closes the client and performs cleanup
func (c *Client) Close() {
log.Info("msg", "Shutting down Client")
if c.stopHealthChecker != nil {
c.stopHealthChecker()
}
if c.TelemetryEngine != nil {
c.TelemetryEngine.Stop()
}
Expand Down Expand Up @@ -297,3 +309,43 @@ func observeStatementCacheState(conn *pgx.Conn) bool {
statementCacheLen.Observe(float64(statementCacheSize))
return true
}

func healthCheckRoutine(ctx context.Context, conn pgxconn.PgxConn) {
r := prometheus.DefaultRegisterer
if env := os.Getenv("IS_TEST"); env == "true" {
r = prometheus.NewRegistry()
}
dbHealthChecks := prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: util.PromNamespace,
Subsystem: "database",
Name: "health_checks_total",
Help: "Total number of database health checks performed.",
},
)
dbHealthErrors := prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: util.PromNamespace,
Subsystem: "database",
Name: "health_check_errors_total",
Help: "Total number of database health check errors.",
},
)
r.MustRegister(dbHealthChecks, dbHealthErrors)
go func() {
check := time.NewTicker(time.Minute)
defer check.Stop()
connection := health.NewHealthChecker(conn)
for {
select {
case <-ctx.Done():
return
case <-check.C:
}
dbHealthChecks.Inc()
if err := connection(); err != nil {
dbHealthErrors.Inc()
}
}
}()
}
88 changes: 85 additions & 3 deletions pkg/pgxconn/pgx_conn.go
Expand Up @@ -13,9 +13,47 @@ import (
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/prometheus/client_golang/prometheus"

"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/util"
)

var (
requestTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: util.PromNamespace,
Subsystem: "database",
Name: "requests_total",
Help: "Total number of database requests.",
}, []string{"method"},
)
errorsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: util.PromNamespace,
Subsystem: "database",
Name: "request_errors_total",
Help: "Total number of database request errors.",
}, []string{"method"},
)
duration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: util.PromNamespace,
Subsystem: "database",
Name: "requests_duration_seconds",
Help: "Time taken to complete a database request.",
}, []string{"method"},
)
)

func init() {
prometheus.MustRegister(requestTotal, errorsTotal, duration)
}

func getLabelSet(method string) prometheus.Labels {
return prometheus.Labels{"method": method}
}

type PgxBatch interface {
Queue(query string, arguments ...interface{})
Len() int
Expand Down Expand Up @@ -109,15 +147,53 @@ func (p *connImpl) Close() {
}

func (p *connImpl) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error) {
return p.Conn.Exec(ctx, sql, args...)
lset := getLabelSet("exec")
requestTotal.With(lset).Inc()
start := time.Now()
defer func() {
duration.With(lset).Observe(time.Since(start).Seconds())
}()
tag, err := p.Conn.Exec(ctx, sql, args...)
if err != nil {
errorsTotal.With(lset).Inc()
}
return tag, err
}

func (p *connImpl) Query(ctx context.Context, sql string, args ...interface{}) (PgxRows, error) {
return p.Conn.Query(ctx, sql, args...)
lset := getLabelSet("query")
requestTotal.With(lset).Inc()
start := time.Now()
defer func() {
duration.With(lset).Observe(time.Since(start).Seconds())
}()
rows, err := p.Conn.Query(ctx, sql, args...)
if err != nil {
errorsTotal.With(lset).Inc()
}
return rows, err
}

func (p *connImpl) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row {
return p.Conn.QueryRow(ctx, sql, args...)
lset := getLabelSet("query_row")
requestTotal.With(lset).Inc()
start := time.Now()
defer func() {
duration.With(lset).Observe(time.Since(start).Seconds())
}()
return &rowWrapper{p.Conn.QueryRow(ctx, sql, args...)}
}

type rowWrapper struct {
r pgx.Row
}

func (w *rowWrapper) Scan(dest ...interface{}) error {
err := w.r.Scan(dest...)
if err != nil {
errorsTotal.With(getLabelSet("query_row")).Inc()
}
return err
}

func (p *connImpl) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) {
Expand All @@ -133,6 +209,12 @@ func (p *connImpl) NewBatch() PgxBatch {
}

func (p *connImpl) SendBatch(ctx context.Context, b PgxBatch) (pgx.BatchResults, error) {
lset := getLabelSet("send_batch")
requestTotal.With(lset).Inc()
start := time.Now()
defer func() {
duration.With(lset).Observe(time.Since(start).Seconds())
}()
return p.Conn.SendBatch(ctx, b.(*pgx.Batch)), nil
}

Expand Down

0 comments on commit e7eae31

Please sign in to comment.