Skip to content

Commit

Permalink
[receiver/sqlquery] Fix memory leak and failing tests (#31785)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
This PR includes two changes that are dependent on each other.

1. Fix test failing in
#31778.
Explanation given
[here.](#31778 (comment))
All changes in `integration_test.go` are related to this.
2. When the test was fixed, `goleak` started failing. The logs receiver
opens a DB connection when it's started, but shutdown does not close the
DB. This DB needs to be closed during shutdown to avoid a leaked
goroutine. All changes outside of `integration_test.go` are for this.
3. Since the memory leak changes were modifying errors, I moved from
using `multierr.append` to `errors.Join` as well.

**Link to tracking Issue:** <Issue number if applicable>
Resolves #31782
Related to
#31778

**Testing:** <Describe what testing was performed and which tests were
added.>
Tests are passing
  • Loading branch information
crobert-1 committed Mar 16, 2024
1 parent d82d6f5 commit 794cd24
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 17 deletions.
27 changes: 27 additions & 0 deletions .chloggen/sqlquery_shutdown.yaml
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: sqlqueryreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix memory leak on shutdown for log telemetry

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31782]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion receiver/sqlqueryreceiver/go.mod
Expand Up @@ -20,7 +20,6 @@ require (
go.opentelemetry.io/otel/metric v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
)

Expand Down Expand Up @@ -141,6 +140,7 @@ require (
go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.24.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3 // indirect
golang.org/x/mod v0.14.0 // indirect
Expand Down
16 changes: 9 additions & 7 deletions receiver/sqlqueryreceiver/integration_test.go
Expand Up @@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.uber.org/zap"

Expand All @@ -48,7 +49,8 @@ func TestPostgresIntegrationLogsTrackingWithoutStorage(t *testing.T) {
}()

// Start the SQL Query receiver.
receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort)
receiverCreateSettings := receivertest.NewNopCreateSettings()
receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings)
config.CollectionInterval = time.Second
config.Queries = []sqlquery.Query{
{
Expand Down Expand Up @@ -84,7 +86,7 @@ func TestPostgresIntegrationLogsTrackingWithoutStorage(t *testing.T) {
require.NoError(t, err)

// Start new SQL Query receiver with the same configuration.
receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort)
receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings)
config.CollectionInterval = time.Second
config.Queries = []sqlquery.Query{
{
Expand Down Expand Up @@ -134,7 +136,8 @@ func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) {
storageExtension := storagetest.NewFileBackedStorageExtension("test", storageDir)

// create SQL Query receiver configured with the File Storage extension
receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort)
receiverCreateSettings := receivertest.NewNopCreateSettings()
receiver, config, consumer := createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings)
config.CollectionInterval = time.Second
config.StorageID = &storageExtension.ID
config.Queries = []sqlquery.Query{
Expand Down Expand Up @@ -176,7 +179,7 @@ func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) {
testAllSimpleLogs(t, consumer.AllLogs())

// start the SQL Query receiver again
receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort)
receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings)
config.CollectionInterval = time.Second
config.StorageID = &storageExtension.ID
config.Queries = []sqlquery.Query{
Expand Down Expand Up @@ -209,7 +212,7 @@ func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) {
insertPostgresSimpleLogs(t, dbContainer, initialLogCount, newLogCount)

// start the SQL Query receiver again
receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort)
receiver, config, consumer = createTestLogsReceiverForPostgres(t, externalPort, receiverCreateSettings)
config.CollectionInterval = time.Second
config.StorageID = &storageExtension.ID
config.Queries = []sqlquery.Query{
Expand Down Expand Up @@ -276,15 +279,14 @@ func startPostgresDbContainer(t *testing.T, externalPort string) testcontainers.
return container
}

func createTestLogsReceiverForPostgres(t *testing.T, externalPort string) (*logsReceiver, *Config, *consumertest.LogsSink) {
func createTestLogsReceiverForPostgres(t *testing.T, externalPort string, receiverCreateSettings receiver.CreateSettings) (*logsReceiver, *Config, *consumertest.LogsSink) {
factory := NewFactory()
config := factory.CreateDefaultConfig().(*Config)
config.CollectionInterval = time.Second
config.Driver = "postgres"
config.DataSource = fmt.Sprintf("host=localhost port=%s user=otel password=otel sslmode=disable", externalPort)

consumer := &consumertest.LogsSink{}
receiverCreateSettings := receivertest.NewNopCreateSettings()
receiverCreateSettings.Logger = zap.NewExample()
receiver, err := factory.CreateLogsReceiver(
context.Background(),
Expand Down
23 changes: 14 additions & 9 deletions receiver/sqlqueryreceiver/logs_receiver.go
Expand Up @@ -6,6 +6,7 @@ package sqlqueryreceiver // import "github.com/open-telemetry/opentelemetry-coll
import (
"context"
"database/sql"
"errors"
"fmt"
"time"

Expand All @@ -16,7 +17,6 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery"
Expand Down Expand Up @@ -174,21 +174,21 @@ func (receiver *logsReceiver) Shutdown(ctx context.Context) error {
return nil
}

var errs []error
receiver.settings.Logger.Debug("stopping...")
receiver.stopCollecting()
for _, queryReceiver := range receiver.queryReceivers {
queryReceiver.shutdown(ctx)
errs = append(errs, queryReceiver.shutdown(ctx))
}

var errors error
if receiver.storageClient != nil {
errors = multierr.Append(errors, receiver.storageClient.Close(ctx))
errs = append(errs, receiver.storageClient.Close(ctx))
}

receiver.isStarted = false
receiver.settings.Logger.Debug("stopped.")

return errors
return errors.Join(errs...)
}

func (receiver *logsReceiver) stopCollecting() {
Expand Down Expand Up @@ -286,19 +286,19 @@ func (queryReceiver *logsQueryReceiver) collect(ctx context.Context) (plog.Logs,
return logs, fmt.Errorf("error getting rows: %w", err)
}

var errs error
var errs []error
scopeLogs := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
for logsConfigIndex, logsConfig := range queryReceiver.query.Logs {
for _, row := range rows {
logRecord := scopeLogs.AppendEmpty()
rowToLog(row, logsConfig, logRecord)
logRecord.SetObservedTimestamp(observedAt)
if logsConfigIndex == 0 {
errs = multierr.Append(errs, queryReceiver.storeTrackingValue(ctx, row))
errs = append(errs, queryReceiver.storeTrackingValue(ctx, row))
}
}
}
return logs, nil
return logs, errors.Join(errs...)
}

func (queryReceiver *logsQueryReceiver) storeTrackingValue(ctx context.Context, row sqlquery.StringMap) error {
Expand All @@ -319,5 +319,10 @@ func rowToLog(row sqlquery.StringMap, config sqlquery.LogsCfg, logRecord plog.Lo
logRecord.Body().SetStr(row[config.BodyColumn])
}

func (queryReceiver *logsQueryReceiver) shutdown(_ context.Context) {
func (queryReceiver *logsQueryReceiver) shutdown(_ context.Context) error {
if queryReceiver.db == nil {
return nil
}

return queryReceiver.db.Close()
}

0 comments on commit 794cd24

Please sign in to comment.