Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Use read-only ingestor for -readOnly mode.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>
  • Loading branch information
Harkishen-Singh committed Jul 13, 2022
1 parent a5c1a10 commit d833a01
Show file tree
Hide file tree
Showing 15 changed files with 147 additions and 133 deletions.
4 changes: 1 addition & 3 deletions go.mod
Expand Up @@ -40,6 +40,7 @@ require (
github.com/testcontainers/testcontainers-go v0.13.0
github.com/thanos-io/thanos v0.26.0
github.com/walle/targz v0.0.0-20140417120357-57fe4206da5a
go.opentelemetry.io/collector/model v0.50.0
go.opentelemetry.io/collector/pdata v0.54.0
go.opentelemetry.io/collector/semconv v0.54.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.32.0
Expand All @@ -63,8 +64,6 @@ require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/Microsoft/go-winio v0.5.1 // indirect
github.com/Microsoft/hcsshim v0.8.23 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/apache/thrift v0.16.0 // indirect
github.com/armon/go-metrics v0.3.10 // indirect
Expand All @@ -83,7 +82,6 @@ require (
github.com/docker/go-units v0.4.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/frankban/quicktest v1.14.3 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
Expand Down
156 changes: 60 additions & 96 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/api/delete.go
Expand Up @@ -71,7 +71,7 @@ func deleteHandler(config *Config, client *pgclient.Client) http.HandlerFunc {
if client == nil {
continue
}
pgDelete := deletePkg.PgDelete{Conn: client.Connection()}
pgDelete := deletePkg.PgDelete{Conn: client.ReadOnlyConnection()}
touchedMetrics, deletedSeriesIDs, rowsDeleted, err := pgDelete.DeleteSeries(matchers, start, end)
if err != nil {
respondErrorWithMessage(w, http.StatusInternalServerError, err, "deleting_series",
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/metadata.go
Expand Up @@ -38,7 +38,7 @@ func metricMetadataHandler(client *pgclient.Client) http.HandlerFunc {
return
}
}
data, err := metadata.MetricQuery(client.Connection(), metric, int(limit))
data, err := metadata.MetricQuery(client.ReadOnlyConnection(), metric, int(limit))
if err != nil {
respondError(w, http.StatusInternalServerError, err, "fetching metric metadata")
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/router.go
Expand Up @@ -32,7 +32,7 @@ import (
func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.Client, query *jaegerQuery.Query, reload func() error) (*mux.Router, error) {
var writePreprocessors []parser.Preprocessor
if apiConf.HighAvailability {
service := ha.NewService(haClient.NewLeaseClient(client.Connection()))
service := ha.NewService(haClient.NewLeaseClient(client.ReadOnlyConnection()))
writePreprocessors = append(writePreprocessors, ha.NewFilter(service))
}
if apiConf.MultiTenancy != nil {
Expand Down Expand Up @@ -104,7 +104,7 @@ func GenerateRouter(apiConf *Config, promqlConf *query.Config, client *pgclient.
reloadHandler := timeHandler(metrics.HTTPRequestDuration, "/-/reload", Reload(reload, apiConf.AdminAPIEnabled))
router.Path("/-/reload").Methods(http.MethodPost).HandlerFunc(reloadHandler)

jaeger.ExtendQueryAPIs(router, client.Connection(), query)
jaeger.ExtendQueryAPIs(router, client.ReadOnlyConnection(), query)

debugProf := router.PathPrefix("/debug/pprof").Subrouter()
debugProf.Path("").Methods(http.MethodGet).HandlerFunc(pprof.Index)
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/write_test.go
Expand Up @@ -280,11 +280,11 @@ type mockInserter struct {
func (m *mockInserter) IngestTraces(_ context.Context, _ ptrace.Traces) error {
panic("not implemented") // TODO: Implement
}

func (m *mockInserter) Ingest(_ context.Context, r *prompb.WriteRequest) (uint64, uint64, error) {
m.ts = r.Timeseries
return uint64(m.result), 0, m.err
}
func (m *mockInserter) Close() {}

func getReader(s string) io.Reader {
var r io.Reader = strings.NewReader(s)
Expand Down
66 changes: 46 additions & 20 deletions pkg/pgclient/client.go
Expand Up @@ -11,9 +11,8 @@ import (

"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"go.opentelemetry.io/collector/pdata/ptrace"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/timescale/promscale/pkg/ha"
"github.com/timescale/promscale/pkg/log"
Expand All @@ -29,6 +28,16 @@ import (
"github.com/timescale/promscale/pkg/tenancy"
)

const (
// defaultConnFraction is multipled with total connections to get
// the max conns for a pool.
defaultConnFraction = 0.5

// defaultReaderFraction is the fraction of connections that should be
// assigned to the reader pool, if Promscale is not in read-only mode.
defaultReaderFraction = 0.3
)

// LockFunc does connect validation function, useful for things such as acquiring locks
// that should live the duration of the connection
type LockFunc = func(ctx context.Context, conn *pgx.Conn) error
Expand All @@ -37,33 +46,38 @@ type LockFunc = func(ctx context.Context, conn *pgx.Conn) error
type Client struct {
readerPool pgxconn.PgxConn
writerPool pgxconn.PgxConn
ingestor *ingestor.DBIngestor
ingestor ingestor.DBInserter
querier querier.Querier
promqlEngine *promql.Engine
healthCheck health.HealthCheckerFn
queryable promql.Queryable
metricCache cache.MetricCache
labelsCache cache.LabelsCache
seriesCache cache.SeriesCache
closePool bool
sigClose chan struct{}
haService *ha.Service
}

// NewClient creates a new PostgreSQL client
func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly bool) (*Client, error) {
var (
err error
writerPoolSize int
numCopiers int
writerPool *pgxpool.Pool
defaultReaderPool = 0.5
err error
writerPoolSize int
numCopiers int
totalConns int
writerPool *pgxpool.Pool
readerFraction = defaultConnFraction
writerFraction = defaultConnFraction
)
if !readOnly {
defaultReaderPool = 0.3 // Since defaultReaderPool + defaultWriterPool should be 0.8 or 80% of allowed database connections.
writerPoolSize, err = cfg.GetPoolSize("writer", 0.5, cfg.WriterPoolSize)
readerFraction = defaultReaderFraction // Since readerFraction + writerFraction should be 0.8 or 80% of allowed database connections.
writerPoolSize, err = cfg.GetPoolSize("writer", writerFraction, cfg.WriterPoolSize)
if err != nil {
return nil, fmt.Errorf("get writer pool size: %w", err)
}
totalConns += writerPoolSize

numCopiers, err = cfg.GetNumCopiers()
if err != nil {
return nil, fmt.Errorf("get num copiers: %w", err)
Expand All @@ -85,10 +99,16 @@ func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, sche
}
}

readerPoolSize, err := cfg.GetPoolSize("reader", defaultReaderPool, cfg.ReaderPoolSize)
readerPoolSize, err := cfg.GetPoolSize("reader", readerFraction, cfg.ReaderPoolSize)
if err != nil {
return nil, fmt.Errorf("get reader pool size: %w", err)
}
totalConns += readerPoolSize

if cfg.MaxConnections != defaultMaxConns && totalConns > cfg.MaxConnections {
return nil, fmt.Errorf("reader-pool (size=%d) + writer-pool (size=%d) more than db.connections-max (%d). Increase the db.connections-max or decrease the pool-sizes", readerPoolSize, writerPoolSize, cfg.MaxConnections)
}

readerPgConfig, err := cfg.getPgConfig(readerPoolSize)
if err != nil {
return nil, fmt.Errorf("get reader pg-config: %w", err)
Expand All @@ -111,8 +131,12 @@ func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, sche
if err != nil {
return nil, fmt.Errorf("err creating reader connection pool: %w", err)
}

return NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, mt, readOnly)
client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, mt, readOnly)
if err != nil {
return client, err
}
client.closePool = true
return client, err
}

func (cfg *Config) getPgConfig(poolSize int) (*pgxpool.Config, error) {
Expand Down Expand Up @@ -172,7 +196,7 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
dbQuerier := querier.NewQuerier(readerConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer())
queryable := query.NewQueryable(dbQuerier, labelsReader)

var dbIngestor *ingestor.DBIngestor
dbIngestor := ingestor.DBInserter(ingestor.ReadOnlyIngestor{})
if !readOnly {
var err error
writerConn = pgxconn.NewPgxConn(writerPool)
Expand Down Expand Up @@ -224,18 +248,20 @@ func (c *Client) Close() {
c.ingestor.Close()
}
close(c.sigClose)
if c.writerPool != nil {
c.writerPool.Close()
}
if c.readerPool != nil {
c.readerPool.Close()
if c.closePool {
if c.writerPool != nil {
c.writerPool.Close()
}
if c.readerPool != nil {
c.readerPool.Close()
}
}
if c.haService != nil {
c.haService.Close()
}
}

func (c *Client) Ingestor() *ingestor.DBIngestor {
func (c *Client) Inserter() ingestor.DBInserter {
return c.ingestor
}

Expand Down
10 changes: 7 additions & 3 deletions pkg/pgclient/config.go
Expand Up @@ -53,6 +53,7 @@ const (
defaultDbStatementsCache = true
MinPoolSize = 2
defaultPoolSize = -1
defaultMaxConns = -1
)

var (
Expand Down Expand Up @@ -82,6 +83,8 @@ func ParseFlags(fs *flag.FlagSet, cfg *Config) *Config {
"allowed by the database.")
fs.IntVar(&cfg.ReaderPoolSize, "db.connections.reader-pool.size", defaultPoolSize, "Maximum size of the reader pool of database connections. This defaults to 30% of max_connections "+
"allowed by the database.")
fs.IntVar(&cfg.MaxConnections, "db.connections-max", defaultMaxConns, "Maximum number of connections to the database that should be opened at once. "+
"It defaults to 80% of the maximum connections that the database can handle. ")
fs.StringVar(&cfg.DbUri, "db.uri", defaultDBUri, "TimescaleDB/Vanilla Postgres DB URI. "+
"Example DB URI `postgres://postgres:password@localhost:5432/timescale?sslmode=require`")
fs.BoolVar(&cfg.EnableStatementsCache, "db.statements-cache", defaultDbStatementsCache, "Whether database connection pool should use cached prepared statements. "+
Expand Down Expand Up @@ -184,6 +187,10 @@ func (cfg *Config) maxConn() (int, error) {
}

func (cfg *Config) GetNumCopiers() (int, error) {
if cfg.WriteConnections > 0 {
return cfg.WriteConnections, nil
}

conn, err := pgx.Connect(context.Background(), cfg.GetConnectionStr())
if err != nil {
return 0, err
Expand All @@ -196,8 +203,5 @@ func (cfg *Config) GetNumCopiers() (int, error) {
return 0, fmt.Errorf("error fetching number of CPUs from extension: %w", err)
}

if cfg.WriteConnections > 0 {
numCopiers = cfg.WriteConnections
}
return numCopiers, nil
}
10 changes: 10 additions & 0 deletions pkg/pgmodel/ingestor/ingestor.go
Expand Up @@ -285,3 +285,13 @@ func (ingestor *DBIngestor) Close() {
ingestor.closed.Store(true)
ingestor.dispatcher.Close()
}

type ReadOnlyIngestor struct{}

func (ReadOnlyIngestor) Ingest(context.Context, *prompb.WriteRequest) (numInsertablesIngested uint64, numMetadataIngested uint64, err error) {
return 0, 0, fmt.Errorf("ingesting metric data not allowed in read-only mode")
}
func (ReadOnlyIngestor) IngestTraces(context.Context, ptrace.Traces) error {
return fmt.Errorf("ingesting traces not allowed in read-only mode")
}
func (ReadOnlyIngestor) Close() {}
1 change: 1 addition & 0 deletions pkg/pgmodel/ingestor/ingestor_interface.go
Expand Up @@ -18,4 +18,5 @@ type DBInserter interface {
// Returns the number of metrics ingested and any error encountered before finishing.
Ingest(context.Context, *prompb.WriteRequest) (uint64, uint64, error)
IngestTraces(context.Context, ptrace.Traces) error
Close()
}
8 changes: 6 additions & 2 deletions pkg/rules/adapters/ingest.go
Expand Up @@ -25,8 +25,12 @@ type ingestAdapter struct {
}

// NewIngestAdapter acts as an adapter to make Promscale's DBIngestor compatible with storage.Appendable
func NewIngestAdapter(ingestor *ingestor.DBIngestor) *ingestAdapter {
return &ingestAdapter{ingestor}
func NewIngestAdapter(inserter ingestor.DBInserter) (*ingestAdapter, error) {
dbIngestor, ok := inserter.(*ingestor.DBIngestor)
if !ok {
return nil, fmt.Errorf("unable to ingest: DBIngestor not found. Received %T", inserter)
}
return &ingestAdapter{dbIngestor}, nil
}

type appenderAdapter struct {
Expand Down
7 changes: 6 additions & 1 deletion pkg/rules/rules.go
Expand Up @@ -74,8 +74,13 @@ func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.C
return nil, nil, fmt.Errorf("parsing UI-URL: %w", err)
}

ingestAdapter, err := adapters.NewIngestAdapter(client.Inserter())
if err != nil {
return nil, nil, fmt.Errorf("error creating ingest adapter: %w", err)
}

rulesManager := prom_rules.NewManager(&prom_rules.ManagerOptions{
Appendable: adapters.NewIngestAdapter(client.Ingestor()),
Appendable: ingestAdapter,
Queryable: adapters.NewQueryAdapter(client.Queryable()),
Context: ctx,
ExternalURL: parsedUrl,
Expand Down
Expand Up @@ -314,15 +314,15 @@ func buildRouterWithAPIConfig(pool *pgxpool.Pool, cfg *api.Config) (*mux.Router,

pgClient, err := pgclient.NewClientWithPool(prometheus.NewRegistry(), conf, 1, pool, pool, tenancy.NewNoopAuthorizer(), cfg.ReadOnly)
if err != nil {
return nil, pgClient, fmt.Errorf("cannot run test, cannot instantiate pgClient")
return nil, pgClient, fmt.Errorf("cannot run test, cannot instantiate pgClient: %w", err)
}

qryCfg := defaultQueryConfig()
if err = pgClient.InitPromQLEngine(qryCfg); err != nil {
return nil, nil, fmt.Errorf("init promql engine: %w", err)
}

jaegerQuery := jaegerquery.New(pgClient.Connection(), &jaegerquery.DefaultConfig)
jaegerQuery := jaegerquery.New(pgClient.ReadOnlyConnection(), &jaegerquery.DefaultConfig)

router, err := api.GenerateRouter(cfg, qryCfg, pgClient, jaegerQuery, nil)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/tests/end_to_end_tests/rules_test.go
Expand Up @@ -48,7 +48,7 @@ func TestRecordingRulesEval(t *testing.T) {
})
require.NoError(t, err)

ingestor := pgClient.Ingestor()
ingestor := pgClient.Inserter()
ts := tsToSeconds(generateSmallTimeseries(), time.Second) // Converts ts of samples into seconds.
_, _, err = ingestor.Ingest(context.Background(), newWriteRequestWithTs(ts))
require.NoError(t, err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/tests/end_to_end_tests/telemetry_test.go
Expand Up @@ -355,6 +355,8 @@ func TestPromQLBasedTelemetry(t *testing.T) {
SslMode: "allow",
MaxConnections: -1,
CacheConfig: cache.DefaultConfig,
WriterPoolSize: 2,
ReaderPoolSize: 2,
},
}
defer conn.Release()
Expand Down

0 comments on commit d833a01

Please sign in to comment.