Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Probe database for connection limit, and make it configurable
Browse files Browse the repository at this point in the history
This commit contains two changes:
  1. At startup, we now probe the database to determine the number of
     connections it can handle in order to ensure we don't go over
     that limit
  2. We add a command line flag to override this setting for instances
     where our heuristic is incorrect.

This should make the connector a better-behaved client of the database,
making it much less likely to starve out other clients, or itself.
  • Loading branch information
JLockerman committed Aug 18, 2020
1 parent 0a60ca8 commit a96d677
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 33 deletions.
90 changes: 69 additions & 21 deletions pkg/pgclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/prometheus/client_golang/prometheus"

pgx "github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"

"github.com/timescale/timescale-prometheus/pkg/clockcache"
Expand All @@ -21,18 +22,20 @@ import (

// Config for the database
type Config struct {
host string
port int
user string
password string
database string
sslMode string
dbConnectRetries int
AsyncAcks bool
ReportInterval int
LabelsCacheSize uint64
MetricsCacheSize uint64
SeriesCacheSize uint64
host string
port int
user string
password string
database string
sslMode string
dbConnectRetries int
AsyncAcks bool
ReportInterval int
LabelsCacheSize uint64
MetricsCacheSize uint64
SeriesCacheSize uint64
WriteConnectionsPerProc int
MaxConnections int
}

// ParseFlags parses the configuration flags specific to PostgreSQL and TimescaleDB
Expand All @@ -48,6 +51,8 @@ func ParseFlags(cfg *Config) *Config {
flag.IntVar(&cfg.ReportInterval, "tput-report", 0, "interval in seconds at which throughput should be reported")
flag.Uint64Var(&cfg.LabelsCacheSize, "labels-cache-size", 10000, "maximum number of labels to cache")
flag.Uint64Var(&cfg.MetricsCacheSize, "metrics-cache-size", pgmodel.DefaultMetricCacheSize, "maximum number of metric names to cache")
flag.IntVar(&cfg.WriteConnectionsPerProc, "db-writer-connection-concurrency", 4, "maximum number of database connections per go process writing to the database")
flag.IntVar(&cfg.MaxConnections, "db-connections-max", -1, "maximum connections that can be open at once, defaults to 80% of the max the DB can handle")
return cfg
}

Expand All @@ -65,17 +70,14 @@ type Client struct {
// NewClient creates a new PostgreSQL client
func NewClient(cfg *Config, readHist prometheus.ObserverVec) (*Client, error) {
connectionStr := cfg.GetConnectionStr()

maxProcs := runtime.GOMAXPROCS(-1)
if maxProcs <= 0 {
maxProcs = runtime.NumCPU()
}
if maxProcs <= 0 {
maxProcs = 1
minConnections, maxConnections, numCopiers, err := cfg.GetNumConnections()
if err != nil {
log.Error("err configuring number of connections", util.MaskPassword(err.Error()))
return nil, err
}
connectionPool, err := pgxpool.Connect(context.Background(), connectionStr+fmt.Sprintf(" pool_max_conns=%d pool_min_conns=%d", maxProcs*pgmodel.ConnectionsPerProc, maxProcs))
connectionPool, err := pgxpool.Connect(context.Background(), connectionStr+fmt.Sprintf(" pool_max_conns=%d pool_min_conns=%d", maxConnections, minConnections))

log.Info("msg", util.MaskPassword(connectionStr))
log.Info("msg", util.MaskPassword(connectionStr), "numCopiers", numCopiers)

if err != nil {
log.Error("err creating connection pool for new client", util.MaskPassword(err.Error()))
Expand All @@ -88,6 +90,7 @@ func NewClient(cfg *Config, readHist prometheus.ObserverVec) (*Client, error) {
AsyncAcks: cfg.AsyncAcks,
ReportInterval: cfg.ReportInterval,
SeriesCacheSize: cfg.SeriesCacheSize,
NumCopiers: numCopiers,
}
ingestor, err := pgmodel.NewPgxIngestorWithMetricCache(connectionPool, cache, &c)
if err != nil {
Expand All @@ -114,6 +117,51 @@ func (cfg *Config) GetConnectionStr() string {
cfg.host, cfg.port, cfg.user, cfg.database, cfg.password, cfg.sslMode)
}

func (cfg *Config) GetNumConnections() (min int, max int, numCopiers int, err error) {
maxProcs := runtime.GOMAXPROCS(-1)
if cfg.WriteConnectionsPerProc < 1 {
return 0, 0, 0, fmt.Errorf("invalid number of connections-per-proc %v, must be at least 1", cfg.WriteConnectionsPerProc)
}
perProc := cfg.WriteConnectionsPerProc
max = cfg.MaxConnections
if max < 1 {
conn, err := pgx.Connect(context.Background(), cfg.GetConnectionStr())
if err != nil {
return 0, 0, 0, err
}
defer func() { _ = conn.Close(context.Background()) }()
row := conn.QueryRow(context.Background(), "SHOW max_connections")
err = row.Scan(&max)
if err != nil {
return 0, 0, 0, err
}
if max <= 1 {
log.Warn("msg", "database can only handle 1 connection")
return 1, 1, 1, nil
}
// we try to only use 80% the database connections
max = int(0.8 * float32(max))
}

// we want to leave some connections for non-copier usages, so in the event
// there aren't enough connections available to satisfy our per-process
// preferences we'll scale down the number of copiers
min = maxProcs
if max <= min {
log.Warn("msg", fmt.Sprintf("database can only handle %v connection; connector has %v procs", max, maxProcs))
return 1, max, max / 2, nil
}

numCopiers = perProc * maxProcs
// we leave one connection per-core for non-copier usages
if numCopiers+maxProcs > max {
log.Warn("msg", fmt.Sprintf("had to reduce the number of copiers due to connection limits: wanted %v, reduced to %v", numCopiers, max/2))
numCopiers = max / 2
}

return
}

// Close closes the client and performs cleanup
func (c *Client) Close() {
c.ingestor.Close()
Expand Down
17 changes: 5 additions & 12 deletions pkg/pgmodel/sql_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package pgmodel
import (
"context"
"fmt"
"runtime"
"sort"
"strings"
"sync"
Expand All @@ -31,6 +30,7 @@ type Cfg struct {
AsyncAcks bool
ReportInterval int
SeriesCacheSize uint64
NumCopiers int
}

// NewPgxIngestorWithMetricCache returns a new Ingestor that uses connection pool and a metrics cache
Expand All @@ -55,21 +55,14 @@ func NewPgxIngestor(c *pgxpool.Pool) (*DBIngestor, error) {
return NewPgxIngestorWithMetricCache(c, cache, &Cfg{})
}

var ConnectionsPerProc = 5

func newPgxInserter(conn pgxConn, cache MetricCache, cfg *Cfg) (*pgxInserter, error) {
cmc := make(chan struct{}, 1)

maxProcs := runtime.GOMAXPROCS(-1)
if maxProcs <= 0 {
maxProcs = runtime.NumCPU()
}
if maxProcs <= 0 {
maxProcs = 1
numCopiers := cfg.NumCopiers
if numCopiers < 1 {
log.Warn("msg", "num copiers less than 1, setting to 1")
numCopiers = 1
}

// we leave one connection per-core for other usages
numCopiers := maxProcs*ConnectionsPerProc - maxProcs
toCopiers := make(chan copyRequest, numCopiers)
for i := 0; i < numCopiers; i++ {
go runInserter(conn, toCopiers)
Expand Down

0 comments on commit a96d677

Please sign in to comment.