Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Add new metrics counters for db connection close events #5225

Merged
merged 13 commits into from Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 6 additions & 2 deletions support/db/main.go
Expand Up @@ -118,10 +118,14 @@ type Session struct {
// DB is the database connection that queries should be executed against.
DB *sqlx.DB

tx *sqlx.Tx
txOptions *sql.TxOptions
tx *sqlx.Tx
txOptions *sql.TxOptions
errorHandlers []ErrorHandlerFunc
}

// dbErr - the Postgres error
// ctx - the caller's context
type ErrorHandlerFunc func(dbErr error, ctx context.Context)
type SessionInterface interface {
BeginTx(ctx context.Context, opts *sql.TxOptions) error
Begin(ctx context.Context) error
Expand Down
72 changes: 65 additions & 7 deletions support/db/metrics.go
Expand Up @@ -3,11 +3,13 @@ package db
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"time"

"github.com/Masterminds/squirrel"
"github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -58,6 +60,7 @@ type SessionWithMetrics struct {
maxLifetimeClosedCounter prometheus.CounterFunc
roundTripProbe *roundTripProbe
roundTripTimeSummary prometheus.Summary
abendCounter *prometheus.CounterVec
sreuland marked this conversation as resolved.
Show resolved Hide resolved
}

func RegisterMetrics(base *Session, namespace string, sub Subservice, registry *prometheus.Registry) SessionInterface {
Expand All @@ -66,6 +69,8 @@ func RegisterMetrics(base *Session, namespace string, sub Subservice, registry *
registry: registry,
}

base.AddErrorHandler(s.handleErrorEvent)

s.queryCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Expand Down Expand Up @@ -226,6 +231,18 @@ func RegisterMetrics(base *Session, namespace string, sub Subservice, registry *
)
registry.MustRegister(s.maxLifetimeClosedCounter)

s.abendCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "db",
Name: "abend_total",
Help: "total number of abends, details are captured in labels",
ConstLabels: prometheus.Labels{"subservice": string(sub)},
},
[]string{"origin", "condition", "type"},
)
registry.MustRegister(s.abendCounter)

s.roundTripTimeSummary = prometheus.NewSummary(
prometheus.SummaryOpts{
Namespace: namespace,
Expand Down Expand Up @@ -262,15 +279,10 @@ func (s *SessionWithMetrics) Close() error {
s.registry.Unregister(s.maxIdleClosedCounter)
s.registry.Unregister(s.maxIdleTimeClosedCounter)
s.registry.Unregister(s.maxLifetimeClosedCounter)
s.registry.Unregister(s.abendCounter)
return s.SessionInterface.Close()
}

// TODO: Implement these
// func (s *SessionWithMetrics) BeginTx(ctx context.Context, opts *sql.TxOptions) error {
// func (s *SessionWithMetrics) Begin(ctx context.Context) error {
// func (s *SessionWithMetrics) Commit(ctx context.Context) error
// func (s *SessionWithMetrics) Rollback(ctx context.Context) error

func (s *SessionWithMetrics) TruncateTables(ctx context.Context, tables []string) (err error) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
s.queryDurationSummary.With(prometheus.Labels{
Expand Down Expand Up @@ -314,6 +326,7 @@ func (s *SessionWithMetrics) Clone() SessionInterface {
maxIdleClosedCounter: s.maxIdleClosedCounter,
maxIdleTimeClosedCounter: s.maxIdleTimeClosedCounter,
maxLifetimeClosedCounter: s.maxLifetimeClosedCounter,
abendCounter: s.abendCounter,
}
}

Expand Down Expand Up @@ -356,6 +369,52 @@ func getQueryType(ctx context.Context, query squirrel.Sqlizer) QueryType {
return UndefinedQueryType
}

// derive the db 'abend_total' metric from the err returned by libpq sdk
//
// dbErr - the error returned by any libpq method call
// ctx - the caller's context used on libpb method call
func (s *SessionWithMetrics) handleErrorEvent(dbErr error, ctx context.Context) {
if dbErr == nil || s.NoRows(dbErr) {
return
}

// default the metric to based just on top level libpq error
abendOrigin := "libpq"
abendType := "error"
abendCondition := "n/a"
var pgDbErrorCode string
var pqErr *pq.Error

// apply db server error info if it exists
// libpq only provides a pg.Error if a server trip was made, otherwise it may not be present
if errors.As(dbErr, &pqErr) {
pgDbErrorCode = string(pqErr.Code)
abendOrigin = "db"
abendCondition = pgDbErrorCode
}

// apply remaining overrides to metric, when these specific points exist
switch {
case errors.Is(ctx.Err(), context.Canceled):
abendOrigin = "client_context"
abendType = "cancel"
case errors.Is(ctx.Err(), context.DeadlineExceeded):
abendOrigin = "horizon_context"
abendType = "timeout"
case pgDbErrorCode == "57014":
// if getting here, no context deadline happened, but
// the db reported query_canceled, which leaves only the possibility of
// db-side statement timeout was triggered
abendType = "timeout"
}

s.abendCounter.With(prometheus.Labels{
"origin": abendOrigin,
"condition": abendCondition,
"type": abendType,
tamirms marked this conversation as resolved.
Show resolved Hide resolved
}).Inc()
}

func (s *SessionWithMetrics) Get(ctx context.Context, dest interface{}, query squirrel.Sqlizer) (err error) {
queryType := string(getQueryType(ctx, query))
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
Expand All @@ -373,7 +432,6 @@ func (s *SessionWithMetrics) Get(ctx context.Context, dest interface{}, query sq
"route": contextRoute(ctx),
}).Inc()
}()

err = s.SessionInterface.Get(ctx, dest, query)
return err
}
Expand Down
81 changes: 57 additions & 24 deletions support/db/session.go
Expand Up @@ -3,13 +3,15 @@ package db
import (
"context"
"database/sql"
go_errors "errors"
sreuland marked this conversation as resolved.
Show resolved Hide resolved
"fmt"
"reflect"
"strings"
"time"

sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/stellar/go/support/db/sqlutils"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
Expand All @@ -23,7 +25,7 @@ func (s *Session) Begin(ctx context.Context) error {

tx, err := s.DB.BeginTxx(ctx, nil)
if err != nil {
if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
return knownErr
}

Expand All @@ -44,7 +46,7 @@ func (s *Session) BeginTx(ctx context.Context, opts *sql.TxOptions) error {

tx, err := s.DB.BeginTxx(ctx, opts)
if err != nil {
if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
return knownErr
}

Expand Down Expand Up @@ -92,7 +94,7 @@ func (s *Session) Commit() error {
s.tx = nil
s.txOptions = nil

if knownErr := s.replaceWithKnownError(err, context.Background()); knownErr != nil {
if knownErr := s.handleError(err, context.Background()); knownErr != nil {
return knownErr
}
return err
Expand Down Expand Up @@ -146,7 +148,7 @@ func (s *Session) GetRaw(ctx context.Context, dest interface{}, query string, ar
return nil
}

if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
return knownErr
}

Expand Down Expand Up @@ -215,7 +217,7 @@ func (s *Session) ExecRaw(ctx context.Context, query string, args ...interface{}
return result, nil
}

if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
return nil, knownErr
}

Expand All @@ -232,29 +234,60 @@ func (s *Session) NoRows(err error) bool {
return err == sql.ErrNoRows
}

// replaceWithKnownError tries to replace Postgres error with package error.
// Returns a new error if the err is known.
func (s *Session) replaceWithKnownError(err error, ctx context.Context) error {
if err == nil {
func (s *Session) AddErrorHandler(handler ErrorHandlerFunc) {
s.errorHandlers = append(s.errorHandlers, handler)
}

// handleError does housekeeping on errors from db.
// dbErr - the libpq client error
// ctx - the calling context
//
// tries to replace dbErr with horizon package error, returns a new error if the err is known.
// invokes any additional error handlers that may have been
// added to the session, passing the caller's context
func (s *Session) handleError(dbErr error, ctx context.Context) error {
if dbErr == nil {
return nil
}

for _, handler := range s.errorHandlers {
handler(dbErr, ctx)
}

var abendDbErrorCode pq.ErrorCode
var pqErr *pq.Error

// if libpql sends to server, and then any server side error is reported,
// libpq passes back only an pq.ErrorCode from method call
// even if the caller context generates a cancel/deadline error during the server trip,
// libpq will only return an instance of pq.ErrorCode as a non-wrapped error
if go_errors.As(dbErr, &pqErr) {
abendDbErrorCode = pqErr.Code
}

switch {
sreuland marked this conversation as resolved.
Show resolved Hide resolved
case ctx.Err() == context.Canceled:
return ErrCancelled
case ctx.Err() == context.DeadlineExceeded:
// if libpq waits too long to obtain conn from pool, can get ctx timeout before server trip
return ErrTimeout
case strings.Contains(err.Error(), "pq: canceling statement due to user request"):
return ErrTimeout
case strings.Contains(err.Error(), "pq: canceling statement due to conflict with recovery"):
case strings.Contains(dbErr.Error(), "pq: canceling statement due to conflict with recovery"):
return ErrConflictWithRecovery
case strings.Contains(err.Error(), "driver: bad connection"):
case strings.Contains(dbErr.Error(), "driver: bad connection"):
return ErrBadConnection
case strings.Contains(err.Error(), "pq: canceling statement due to statement timeout"):
sreuland marked this conversation as resolved.
Show resolved Hide resolved
return ErrStatementTimeout
case strings.Contains(err.Error(), "transaction has already been committed or rolled back"):
case strings.Contains(dbErr.Error(), "transaction has already been committed or rolled back"):
return ErrAlreadyRolledback
case go_errors.Is(ctx.Err(), context.Canceled):
// when horizon's context is cancelled by it's upstream api client,
// it will propagate to here and libpq will emit a wrapped err that has the cancel err
return ErrCancelled
case go_errors.Is(ctx.Err(), context.DeadlineExceeded):
// when horizon's context times out(it's set to app connection-timeout),
// it will trigger libpq to emit a wrapped err that has the deadline err
return ErrTimeout
case abendDbErrorCode == "57014":
// https://www.postgresql.org/docs/12/errcodes-appendix.html, query_canceled
// this code can be generated for multiple cases,
// by libpq sending a signal to server when it experiences a context cancel/deadline
// or it could happen based on just server statement_timeout setting
// since we check the context cancel/deadline err state first, getting here means
// this can only be from a statement timeout
return ErrStatementTimeout
default:
return nil
}
Expand Down Expand Up @@ -284,7 +317,7 @@ func (s *Session) QueryRaw(ctx context.Context, query string, args ...interface{
return result, nil
}

if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
return nil, knownErr
}

Expand Down Expand Up @@ -318,7 +351,7 @@ func (s *Session) Rollback() error {
s.tx = nil
s.txOptions = nil

if knownErr := s.replaceWithKnownError(err, context.Background()); knownErr != nil {
if knownErr := s.handleError(err, context.Background()); knownErr != nil {
return knownErr
}
return err
Expand Down Expand Up @@ -362,7 +395,7 @@ func (s *Session) SelectRaw(
return nil
}

if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
return knownErr
}

Expand Down