Skip to content

Commit

Permalink
[receiver/postgresql] Fix race condition when capturing errors (open-…
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored and Caleb-Hurshman committed Jul 6, 2023
1 parent 3344ecd commit da918e6
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 23 deletions.
20 changes: 20 additions & 0 deletions .chloggen/postgres-int-race.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: postgresqlreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix race condition when capturing errors from multiple requests simultaneously

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23026]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
68 changes: 45 additions & 23 deletions receiver/postgresqlreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,28 @@ type postgreSQLScraper struct {
clientFactory postgreSQLClientFactory
mb *metadata.MetricsBuilder
}
type errsMux struct {
sync.RWMutex
errs scrapererror.ScrapeErrors
}

func (e *errsMux) add(err error) {
e.Lock()
defer e.Unlock()
e.errs.Add(err)
}

func (e *errsMux) addPartial(err error) {
e.Lock()
defer e.Unlock()
e.errs.AddPartial(1, err)
}

func (e *errsMux) combine() error {
e.Lock()
defer e.Unlock()
return e.errs.Combine()
}

type postgreSQLClientFactory interface {
getClient(c *Config, database string) (client, error)
Expand Down Expand Up @@ -83,7 +105,7 @@ func (p *postgreSQLScraper) scrape(ctx context.Context) (pmetric.Metrics, error)

now := pcommon.NewTimestampFromTime(time.Now())

var errs scrapererror.ScrapeErrors
var errs errsMux
r := &dbRetrieval{
activityMap: make(map[databaseName]int64),
dbSizeMap: make(map[databaseName]int64),
Expand All @@ -94,7 +116,7 @@ func (p *postgreSQLScraper) scrape(ctx context.Context) (pmetric.Metrics, error)
for _, database := range databases {
dbClient, err := p.clientFactory.getClient(p.config, database)
if err != nil {
errs.Add(err)
errs.add(err)
p.logger.Error("Failed to initialize connection to postgres", zap.String("database", database), zap.Error(err))
continue
}
Expand All @@ -111,15 +133,15 @@ func (p *postgreSQLScraper) scrape(ctx context.Context) (pmetric.Metrics, error)
p.collectReplicationStats(ctx, now, listClient, &errs)
p.collectMaxConnections(ctx, now, listClient, &errs)

return p.mb.Emit(), errs.Combine()
return p.mb.Emit(), errs.combine()
}

func (p *postgreSQLScraper) retrieveDBMetrics(
ctx context.Context,
listClient client,
databases []string,
r *dbRetrieval,
errs *scrapererror.ScrapeErrors,
errs *errsMux,
) {
wg := &sync.WaitGroup{}

Expand Down Expand Up @@ -147,15 +169,15 @@ func (p *postgreSQLScraper) recordDatabase(now pcommon.Timestamp, db string, r *
p.mb.EmitForResource(metadata.WithPostgresqlDatabaseName(db))
}

func (p *postgreSQLScraper) collectTables(ctx context.Context, now pcommon.Timestamp, dbClient client, db string, errs *scrapererror.ScrapeErrors) (numTables int64) {
func (p *postgreSQLScraper) collectTables(ctx context.Context, now pcommon.Timestamp, dbClient client, db string, errs *errsMux) (numTables int64) {
blockReads, err := dbClient.getBlocksReadByTable(ctx, db)
if err != nil {
errs.AddPartial(1, err)
errs.addPartial(err)
}

tableMetrics, err := dbClient.getDatabaseTableMetrics(ctx, db)
if err != nil {
errs.AddPartial(1, err)
errs.addPartial(err)
}

for tableKey, tm := range tableMetrics {
Expand Down Expand Up @@ -192,11 +214,11 @@ func (p *postgreSQLScraper) collectIndexes(
now pcommon.Timestamp,
client client,
database string,
errs *scrapererror.ScrapeErrors,
errs *errsMux,
) {
idxStats, err := client.getIndexStats(ctx, database)
if err != nil {
errs.AddPartial(1, err)
errs.addPartial(err)
return
}

Expand All @@ -215,11 +237,11 @@ func (p *postgreSQLScraper) collectBGWriterStats(
ctx context.Context,
now pcommon.Timestamp,
client client,
errs *scrapererror.ScrapeErrors,
errs *errsMux,
) {
bgStats, err := client.getBGWriterStats(ctx)
if err != nil {
errs.AddPartial(1, err)
errs.addPartial(err)
return
}

Expand All @@ -243,11 +265,11 @@ func (p *postgreSQLScraper) collectMaxConnections(
ctx context.Context,
now pcommon.Timestamp,
client client,
errs *scrapererror.ScrapeErrors,
errs *errsMux,
) {
mc, err := client.getMaxConnections(ctx)
if err != nil {
errs.AddPartial(1, err)
errs.addPartial(err)
return
}
p.mb.RecordPostgresqlConnectionMaxDataPoint(now, mc)
Expand All @@ -257,11 +279,11 @@ func (p *postgreSQLScraper) collectReplicationStats(
ctx context.Context,
now pcommon.Timestamp,
client client,
errs *scrapererror.ScrapeErrors,
errs *errsMux,
) {
rss, err := client.getReplicationStats(ctx)
if err != nil {
errs.AddPartial(1, err)
errs.addPartial(err)
return
}
for _, rs := range rss {
Expand All @@ -284,15 +306,15 @@ func (p *postgreSQLScraper) collectWalAge(
ctx context.Context,
now pcommon.Timestamp,
client client,
errs *scrapererror.ScrapeErrors,
errs *errsMux,
) {
walAge, err := client.getLatestWalAgeSeconds(ctx)
if errors.Is(err, errNoLastArchive) {
// return no error as there is no last archive to derive the value from
return
}
if err != nil {
errs.AddPartial(1, fmt.Errorf("unable to determine latest WAL age: %w", err))
errs.addPartial(fmt.Errorf("unable to determine latest WAL age: %w", err))
return
}
p.mb.RecordPostgresqlWalAgeDataPoint(now, walAge)
Expand All @@ -304,13 +326,13 @@ func (p *postgreSQLScraper) retrieveDatabaseStats(
client client,
databases []string,
r *dbRetrieval,
errors *scrapererror.ScrapeErrors,
errs *errsMux,
) {
defer wg.Done()
dbStats, err := client.getDatabaseStats(ctx, databases)
if err != nil {
p.logger.Error("Errors encountered while fetching commits and rollbacks", zap.Error(err))
errors.AddPartial(1, err)
errs.addPartial(err)
return
}
r.Lock()
Expand All @@ -324,13 +346,13 @@ func (p *postgreSQLScraper) retrieveDatabaseSize(
client client,
databases []string,
r *dbRetrieval,
errors *scrapererror.ScrapeErrors,
errs *errsMux,
) {
defer wg.Done()
databaseSizeMetrics, err := client.getDatabaseSize(ctx, databases)
if err != nil {
p.logger.Error("Errors encountered while fetching database size", zap.Error(err))
errors.AddPartial(1, err)
errs.addPartial(err)
return
}
r.Lock()
Expand All @@ -344,12 +366,12 @@ func (p *postgreSQLScraper) retrieveBackends(
client client,
databases []string,
r *dbRetrieval,
errors *scrapererror.ScrapeErrors,
errs *errsMux,
) {
defer wg.Done()
activityByDB, err := client.getBackends(ctx, databases)
if err != nil {
errors.AddPartial(1, err)
errs.addPartial(err)
return
}
r.Lock()
Expand Down

0 comments on commit da918e6

Please sign in to comment.