Skip to content

Commit

Permalink
Add connection pooling (#31127)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
Reuse of connections created per database (configured or discovered) vs
current behavior to create & close connection per database on each
scrape.

**Link to tracking Issue:** 

#30831

**Testing:** 
Updated unit & integration tests. Also, ran locally multiple scenario:
- no feature gate specified (default): current behavior maintained,
connections created/closed on each database per scrape
- feature gate connection pool enabled, no connection pool config
specified (default): reduction of the number of connections
created/closed
- feature gate connection pool enabled, connection pool config tweaked:
connections created on first scrape & closed when configured lifetime
reached or collector shutdown

**Documentation:**
- change log
- readme for the feature gate & related optional configurations linked
to this feature

**Note**
Checking internally for getting the CLA signed
  • Loading branch information
kevinnoel-be committed Mar 4, 2024
1 parent 15ceef1 commit a57bec6
Show file tree
Hide file tree
Showing 14 changed files with 3,442 additions and 62 deletions.
29 changes: 29 additions & 0 deletions .chloggen/receiver-postgresql-connection-pool.yaml
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: postgresqlreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add `receiver.postgresql.connectionPool` feature gate to reuse database connections"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30831]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The default implementation recreates and closes connections on each scrape per database configured/discovered.
This change offers a feature gated alternative to keep connections open. Also, it exposes connection configuration to control the behavior of the pool.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
30 changes: 30 additions & 0 deletions receiver/postgresqlreceiver/README.md
Expand Up @@ -74,6 +74,36 @@ receivers:

The full list of settings exposed for this receiver are documented [here](./config.go) with detailed sample configurations [here](./testdata/config.yaml). TLS config is documented further under the [opentelemetry collector's configtls package](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md).

## Connection pool feature

The feature gate `receiver.postgresql.connectionPool` allows to enable the creation & reuse of a pool per database for the connections instead of creating & closing on each scrape.
This is generally a useful optimization but also alleviates the volume of generated audit logs when the PostgreSQL instance is configured with `log_connections=on` and/or `log_disconnections=on`.

When this feature gate is enabled, the following optional settings are available nested under `connection_pool` to help configure the connection pools:

- `max_idle_time`: The maximum amount of time a connection may be idle before being closed.
- `max_lifetime`: The maximum amount of time a connection may be reused.
- `max_idle`: The maximum number of connections in the idle connection pool.
- `max_open`: The maximum number of open connections to the database.

Those settings and their defaults are further documented in the `sql/database` package [here](https://pkg.go.dev/database/sql#DB).

### Example Configuration

```yaml
receivers:
postgresql:
endpoint: localhost:5432
transport: tcp
username: otel
password: ${env:POSTGRESQL_PASSWORD}
connection_pool:
max_idle_time: 10m
max_lifetime: 0
max_idle: 2
max_open: 5
```

## Metrics

Details about the metrics produced by this receiver can be found in [metadata.yaml](./metadata.yaml)
40 changes: 15 additions & 25 deletions receiver/postgresqlreceiver/client.go
Expand Up @@ -12,7 +12,6 @@ import (
"strings"
"time"

"github.com/lib/pq"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/featuregate"
Expand Down Expand Up @@ -60,8 +59,8 @@ type client interface {
}

type postgreSQLClient struct {
client *sql.DB
database string
client *sql.DB
closeFn func() error
}

var _ client = (*postgreSQLClient)(nil)
Expand Down Expand Up @@ -102,41 +101,32 @@ func sslConnectionString(tls configtls.TLSClientSetting) string {
return conn
}

func newPostgreSQLClient(conf postgreSQLConfig) (*postgreSQLClient, error) {
func (c postgreSQLConfig) ConnectionString() (string, error) {
// postgres will assume the supplied user as the database name if none is provided,
// so we must specify a databse name even when we are just collecting the list of databases.
dbField := "dbname=postgres"
if conf.database != "" {
dbField = fmt.Sprintf("dbname=%s ", conf.database)
// so we must specify a database name even when we are just collecting the list of databases.
database := defaultPostgreSQLDatabase
if c.database != "" {
database = c.database
}

host, port, err := net.SplitHostPort(conf.address.Endpoint)
host, port, err := net.SplitHostPort(c.address.Endpoint)
if err != nil {
return nil, err
return "", err
}

if conf.address.Transport == "unix" {
if c.address.Transport == "unix" {
// lib/pg expects a unix socket host to start with a "/" and appends the appropriate .s.PGSQL.port internally
host = fmt.Sprintf("/%s", host)
}

connStr := fmt.Sprintf("port=%s host=%s user=%s password=%s %s %s", port, host, conf.username, conf.password, dbField, sslConnectionString(conf.tls))

conn, err := pq.NewConnector(connStr)
if err != nil {
return nil, err
}

db := sql.OpenDB(conn)

return &postgreSQLClient{
client: db,
database: conf.database,
}, nil
return fmt.Sprintf("port=%s host=%s user=%s password=%s dbname=%s %s", port, host, c.username, c.password, database, sslConnectionString(c.tls)), nil
}

func (c *postgreSQLClient) Close() error {
return c.client.Close()
if c.closeFn != nil {
return c.closeFn()
}
return nil
}

type databaseStats struct {
Expand Down
155 changes: 155 additions & 0 deletions receiver/postgresqlreceiver/client_factory.go
@@ -0,0 +1,155 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package postgresqlreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/postgresqlreceiver"

import (
"database/sql"
"sync"

"github.com/lib/pq"
"go.opentelemetry.io/collector/featuregate"
"go.uber.org/multierr"
)

const connectionPoolGateID = "receiver.postgresql.connectionPool"

var (
connectionPoolGate = featuregate.GlobalRegistry().MustRegister(
connectionPoolGateID,
featuregate.StageAlpha,
featuregate.WithRegisterDescription("Use of connection pooling"),
featuregate.WithRegisterFromVersion("0.96.0"),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/30831"),
)
)

type postgreSQLClientFactory interface {
getClient(database string) (client, error)
close() error
}

// defaultClientFactory creates one PG connection per call
type defaultClientFactory struct {
baseConfig postgreSQLConfig
}

func newDefaultClientFactory(cfg *Config) *defaultClientFactory {
return &defaultClientFactory{
baseConfig: postgreSQLConfig{
username: cfg.Username,
password: string(cfg.Password),
address: cfg.AddrConfig,
tls: cfg.TLSClientSetting,
},
}
}

func (d *defaultClientFactory) getClient(database string) (client, error) {
db, err := getDB(d.baseConfig, database)
if err != nil {
return nil, err
}
return &postgreSQLClient{client: db, closeFn: db.Close}, nil
}

func (d *defaultClientFactory) close() error {
return nil
}

// poolClientFactory creates one PG connection per database, keeping a pool of connections
type poolClientFactory struct {
sync.Mutex
baseConfig postgreSQLConfig
poolConfig *ConnectionPool
pool map[string]*sql.DB
closed bool
}

func newPoolClientFactory(cfg *Config) *poolClientFactory {
poolCfg := cfg.ConnectionPool
return &poolClientFactory{
baseConfig: postgreSQLConfig{
username: cfg.Username,
password: string(cfg.Password),
address: cfg.AddrConfig,
tls: cfg.TLSClientSetting,
},
poolConfig: &poolCfg,
pool: make(map[string]*sql.DB),
closed: false,
}
}

func (p *poolClientFactory) getClient(database string) (client, error) {
p.Lock()
defer p.Unlock()
db, ok := p.pool[database]
if !ok {
var err error
db, err = getDB(p.baseConfig, database)
p.setPoolSettings(db)
if err != nil {
return nil, err
}
p.pool[database] = db
}
return &postgreSQLClient{client: db, closeFn: nil}, nil
}

func (p *poolClientFactory) close() error {
p.Lock()
defer p.Unlock()

if p.closed {
return nil
}

if p.pool != nil {
var err error
for _, db := range p.pool {
if closeErr := db.Close(); closeErr != nil {
err = multierr.Append(err, closeErr)
}
}
if err != nil {
return err
}
}

p.closed = true
return nil
}

func (p *poolClientFactory) setPoolSettings(db *sql.DB) {
if p.poolConfig == nil {
return
}
if p.poolConfig.MaxIdleTime != nil {
db.SetConnMaxIdleTime(*p.poolConfig.MaxIdleTime)
}
if p.poolConfig.MaxLifetime != nil {
db.SetConnMaxLifetime(*p.poolConfig.MaxLifetime)
}
if p.poolConfig.MaxIdle != nil {
db.SetMaxIdleConns(*p.poolConfig.MaxIdle)
}
if p.poolConfig.MaxOpen != nil {
db.SetMaxOpenConns(*p.poolConfig.MaxOpen)
}
}

func getDB(cfg postgreSQLConfig, database string) (*sql.DB, error) {
if database != "" {
cfg.database = database
}
connectionString, err := cfg.ConnectionString()
if err != nil {
return nil, err
}
conn, err := pq.NewConnector(connectionString)
if err != nil {
return nil, err
}
return sql.OpenDB(conn), nil
}
9 changes: 9 additions & 0 deletions receiver/postgresqlreceiver/config.go
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"net"
"time"

"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configopaque"
Expand Down Expand Up @@ -34,9 +35,17 @@ type Config struct {
ExcludeDatabases []string `mapstructure:"exclude_databases"`
confignet.AddrConfig `mapstructure:",squash"` // provides Endpoint and Transport
configtls.TLSClientSetting `mapstructure:"tls,omitempty"` // provides SSL details
ConnectionPool `mapstructure:"connection_pool,omitempty"`
metadata.MetricsBuilderConfig `mapstructure:",squash"`
}

type ConnectionPool struct {
MaxIdleTime *time.Duration `mapstructure:"max_idle_time,omitempty"`
MaxLifetime *time.Duration `mapstructure:"max_lifetime,omitempty"`
MaxIdle *int `mapstructure:"max_idle,omitempty"`
MaxOpen *int `mapstructure:"max_open,omitempty"`
}

func (cfg *Config) Validate() error {
var err error
if cfg.Username == "" {
Expand Down
36 changes: 32 additions & 4 deletions receiver/postgresqlreceiver/config_test.go
Expand Up @@ -109,14 +109,14 @@ func TestValidate(t *testing.T) {
}

func TestLoadConfig(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)
cm, confErr := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, confErr)

factory := NewFactory()
cfg := factory.CreateDefaultConfig()

t.Run("postgresql", func(t *testing.T) {
sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "").String())
t.Run("postgresql/minimal", func(t *testing.T) {
sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "minimal").String())
require.NoError(t, err)
require.NoError(t, component.UnmarshalConfig(sub, cfg))

Expand All @@ -128,6 +128,24 @@ func TestLoadConfig(t *testing.T) {
require.Equal(t, expected, cfg)
})

t.Run("postgresql/pool", func(t *testing.T) {
sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "pool").String())
require.NoError(t, err)
require.NoError(t, component.UnmarshalConfig(sub, cfg))

expected := factory.CreateDefaultConfig().(*Config)
expected.Endpoint = "localhost:5432"
expected.AddrConfig.Transport = "tcp"
expected.Username = "otel"
expected.Password = "${env:POSTGRESQL_PASSWORD}"
expected.ConnectionPool = ConnectionPool{
MaxIdleTime: ptr(30 * time.Second),
MaxIdle: ptr(5),
}

require.Equal(t, expected, cfg)
})

t.Run("postgresql/all", func(t *testing.T) {
sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "all").String())
require.NoError(t, err)
Expand All @@ -150,7 +168,17 @@ func TestLoadConfig(t *testing.T) {
KeyFile: "/home/otel/mypostgreskey.key",
},
}
expected.ConnectionPool = ConnectionPool{
MaxIdleTime: ptr(30 * time.Second),
MaxLifetime: ptr(time.Minute),
MaxIdle: ptr(5),
MaxOpen: ptr(10),
}

require.Equal(t, expected, cfg)
})
}

func ptr[T any](value T) *T {
return &value
}

0 comments on commit a57bec6

Please sign in to comment.