Skip to content

Commit

Permalink
Google Cloud Spanner Receiver gracefully handles unreadable databases (
Browse files Browse the repository at this point in the history
…#26732)

…

**Description:** 
Fixing a bug. Google Cloud Spanner Receiver currently generates an
exception and exits if it attempts to read data from a database that
doesn't exist. However it's normal for a single receiver to poll
multiple databases, so this is not graceful failure. This PR makes a
change to gracefully generate an error in case of an unreadable missing
database and then continue reading other databases..

**Issue:**
[14624](#14624)

**Changelog:**
- `DatabaseReader.Read` returns a vector of `metadata.MetricsDatapoint`
and a `multierr.Errors` object instead of the `result` vector and a
single `Error`.
- `DatabaseReader.Read` appends any errors encountered to the
`multierr.Errors` object instead of returning `(nil, Error)`.
- `DatabaseReader.Read` adds a `NewPartialScrapeError` to
`multierr.Errors` object if errors were encountered while reading and
some results were still populated into the `result` vector
- `ProjectReader.Read` returns a vector of `metadata.MetricsDatapoint`
and a `Errors` object instead of the `result` vector and `nil`.
- `ProjectReader.Read` combines all `multierr.Errors` objects into a
single `Error` before returning them as `err`.
- `ProjectReader.Read` returns a `NewPartialScrapeError` instead of
`err` if errors were encountered while reading and some results were
still populated into the `result` vector.
- `googleCloudSpannerReceiver.Scrape` returns a vector of
`metadata.MetricsDatapoint` and a `Error` object instead of just the
`result` vector.
- All methods mentioned above continue processing metrics after an error
is encountered.

---------

Co-authored-by: Pablo Baeyens <pablo.baeyens@datadoghq.com>
  • Loading branch information
spidercensus and mx-psi committed Dec 5, 2023
1 parent a18e686 commit b0947a2
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 27 deletions.
27 changes: 27 additions & 0 deletions .chloggen/spidercensus_fix-googlecloudspanner.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: googlecloudspannerreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Google Cloud Spanner Receiver currently generates an exception and exits if it attempts to read data from a database that doesn't exist. However it's normal for a single receiver to poll multiple databases, so this is not graceful failure. This PR makes a change to gracefully generate an error in case of an unreadable missing database and then continue reading other databases.."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26732]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 1 addition & 1 deletion receiver/googlecloudspannerreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
go.opentelemetry.io/collector/consumer v0.90.2-0.20231201205146-6e2fdc755b34
go.opentelemetry.io/collector/pdata v1.0.1-0.20231201205146-6e2fdc755b34
go.opentelemetry.io/collector/receiver v0.90.2-0.20231201205146-6e2fdc755b34
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.26.0
google.golang.org/api v0.151.0
google.golang.org/grpc v1.59.0
Expand Down Expand Up @@ -56,7 +57,6 @@ require (
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/oauth2 v0.14.0 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"

"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/datasource"
Expand Down Expand Up @@ -61,26 +62,41 @@ func (databaseReader *DatabaseReader) Name() string {
}

func (databaseReader *DatabaseReader) Shutdown() {
databaseReader.logger.Debug("Closing connection to database",
zap.String("database", databaseReader.database.DatabaseID().ID()))
databaseReader.logger.Debug(
"Closing connection to database",
zap.String("database", databaseReader.database.DatabaseID().ID()),
)
databaseReader.database.Client().Close()
}

func (databaseReader *DatabaseReader) Read(ctx context.Context) ([]*metadata.MetricsDataPoint, error) {
databaseReader.logger.Debug("Executing read method for database",
zap.String("database", databaseReader.database.DatabaseID().ID()))
databaseReader.logger.Debug(
"Executing read method for database",
zap.String("database", databaseReader.database.DatabaseID().ID()),
)

var result []*metadata.MetricsDataPoint
var (
result []*metadata.MetricsDataPoint
err error
)

for _, reader := range databaseReader.readers {
dataPoints, err := reader.Read(ctx)
if err != nil {
return nil, fmt.Errorf("cannot read data for data points databaseReader %q because of an error: %w",
reader.Name(), err)
}

dataPoints, readErr := reader.Read(ctx)
result = append(result, dataPoints...)
if readErr != nil {
err = multierr.Append(
err,
fmt.Errorf("cannot read data for data points databaseReader %q because of an error: %w", reader.Name(), readErr),
)
}
}
if err != nil {
databaseReader.logger.Warn(
"Errors encountered while reading database",
zap.String("database", databaseReader.database.DatabaseID().ID()),
zap.Int("error_count", len(multierr.Errors(err))),
)
}

return result, nil
return result, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"strings"

"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/metadata"
Expand All @@ -33,18 +34,21 @@ func (projectReader *ProjectReader) Shutdown() {
}

func (projectReader *ProjectReader) Read(ctx context.Context) ([]*metadata.MetricsDataPoint, error) {
var result []*metadata.MetricsDataPoint
var (
result []*metadata.MetricsDataPoint
err error
)

for _, databaseReader := range projectReader.databaseReaders {
dataPoints, err := databaseReader.Read(ctx)
if err != nil {
return nil, err
dataPoints, readErr := databaseReader.Read(ctx)
if readErr == nil {
result = append(result, dataPoints...)
} else {
err = multierr.Append(err, readErr)
}

result = append(result, dataPoints...)
}

return result, nil
return result, err
}

func (projectReader *ProjectReader) Name() string {
Expand Down
24 changes: 18 additions & 6 deletions receiver/googlecloudspannerreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/scrapererror"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudspannerreceiver/internal/datasource"
Expand Down Expand Up @@ -41,18 +43,28 @@ func newGoogleCloudSpannerReceiver(logger *zap.Logger, config *Config) *googleCl
}

func (r *googleCloudSpannerReceiver) Scrape(ctx context.Context) (pmetric.Metrics, error) {
var allMetricsDataPoints []*metadata.MetricsDataPoint
var (
allMetricsDataPoints []*metadata.MetricsDataPoint
err error
)

for _, projectReader := range r.projectReaders {
dataPoints, err := projectReader.Read(ctx)
if err != nil {
return pmetric.Metrics{}, err
dataPoints, readErr := projectReader.Read(ctx)
allMetricsDataPoints = append(allMetricsDataPoints, dataPoints...)
if readErr != nil {
err = multierr.Append(err, readErr)
}
}

allMetricsDataPoints = append(allMetricsDataPoints, dataPoints...)
metrics, buildErr := r.metricsBuilder.Build(allMetricsDataPoints)
if buildErr != nil {
err = multierr.Append(err, buildErr)
}

return r.metricsBuilder.Build(allMetricsDataPoints)
if err != nil && metrics.DataPointCount() > 0 {
err = scrapererror.NewPartialScrapeError(err, len(multierr.Errors(err)))
}
return metrics, err
}

func (r *googleCloudSpannerReceiver) Start(ctx context.Context, _ component.Host) error {
Expand Down
2 changes: 1 addition & 1 deletion receiver/googlecloudspannerreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func newMetricsBuilder(throwErrorOnShutdown bool) metadata.MetricsBuilder {
}

func (b *metricsBuilder) Build([]*metadata.MetricsDataPoint) (pmetric.Metrics, error) {
return pmetric.Metrics{}, nil
return pmetric.NewMetrics(), nil
}

func (b *metricsBuilder) Shutdown() error {
Expand Down

0 comments on commit b0947a2

Please sign in to comment.