Skip to content

Commit

Permalink
[receiver/mongodbatlas] Fix memory leak (#32206)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
The receiver was leaking goroutines by holding onto idle connections.
The solution is to reference the underlying `Transport` object, and call
`CloseIdleConnections` on it during shutdown.

This change also enables `goleak` on the MongoDB Atlas receiver to help
ensure no goroutines are being leaked.

**Link to tracking Issue:** <Issue number if applicable>
#30428

**Testing:** <Describe what testing was performed and which tests were
added.>
All existing tests are passing, as well as added `goleak` checks.
  • Loading branch information
crobert-1 committed Apr 15, 2024
1 parent 707236c commit d936776
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 12 deletions.
27 changes: 27 additions & 0 deletions .chloggen/goleak_mongodbatlas.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: mongodbatlasreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix memory leak by closing idle connections on shutdown

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32206]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
9 changes: 8 additions & 1 deletion receiver/mongodbatlasreceiver/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package mongodbatlasreceiver // import "github.com/open-telemetry/opentelemetry-
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -34,6 +35,7 @@ type eventsClient interface {
GetProjectEvents(ctx context.Context, groupID string, opts *internal.GetEventsOptions) (ret []*mongodbatlas.Event, nextPage bool, err error)
GetOrganization(ctx context.Context, orgID string) (*mongodbatlas.Organization, error)
GetOrganizationEvents(ctx context.Context, orgID string, opts *internal.GetEventsOptions) (ret []*mongodbatlas.Event, nextPage bool, err error)
Shutdown() error
}

type eventsReceiver struct {
Expand Down Expand Up @@ -97,7 +99,12 @@ func (er *eventsReceiver) Shutdown(ctx context.Context) error {
er.logger.Debug("Shutting down events receiver")
er.cancel()
er.wg.Wait()
return er.checkpoint(ctx)

var err []error
err = append(err, er.client.Shutdown())
err = append(err, er.checkpoint(ctx))

return errors.Join(err...)
}

func (er *eventsReceiver) startPolling(ctx context.Context) error {
Expand Down
21 changes: 14 additions & 7 deletions receiver/mongodbatlasreceiver/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,16 +164,14 @@ func TestProjectGetFailure(t *testing.T) {
mClient := &mockEventsClient{}
mClient.On("GetProject", mock.Anything, "fake-project").Return(nil, fmt.Errorf("unable to get project: %d", http.StatusUnauthorized))
mClient.On("GetOrganization", mock.Anything, "fake-org").Return(nil, fmt.Errorf("unable to get org: %d", http.StatusUnauthorized))
mClient.setupMock(t)
r.client = mClient

err := r.Start(context.Background(), componenttest.NewNopHost(), storage.NewNopClient())
require.NoError(t, err)

require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost(), storage.NewNopClient()))
require.Never(t, func() bool {
return sink.LogRecordCount() > 0
}, 2*time.Second, 500*time.Millisecond)

err = r.Shutdown(context.Background())
require.NoError(t, err)
require.NoError(t, r.Shutdown(context.Background()))
}

type mockEventsClient struct {
Expand Down Expand Up @@ -216,7 +214,12 @@ func (mec *mockEventsClient) loadTestEvents(t *testing.T, filename string) []*mo

func (mec *mockEventsClient) GetProject(ctx context.Context, pID string) (*mongodbatlas.Project, error) {
args := mec.Called(ctx, pID)
return args.Get(0).(*mongodbatlas.Project), args.Error(1)
receivedProject := args.Get(0)
if receivedProject == nil {
return nil, args.Error(1)
}

return receivedProject.(*mongodbatlas.Project), args.Error(1)
}

func (mec *mockEventsClient) GetProjectEvents(ctx context.Context, pID string, opts *internal.GetEventsOptions) ([]*mongodbatlas.Event, bool, error) {
Expand All @@ -233,3 +236,7 @@ func (mec *mockEventsClient) GetOrganizationEvents(ctx context.Context, oID stri
args := mec.Called(ctx, oID, opts)
return args.Get(0).([]*mongodbatlas.Event), args.Bool(1), args.Error(2)
}

func (mec *mockEventsClient) Shutdown() error {
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (rt *clientRoundTripper) RoundTrip(r *http.Request) (*http.Response, error)
type MongoDBAtlasClient struct {
log *zap.Logger
client *mongodbatlas.Client
transport *http.Transport
roundTripper *clientRoundTripper
}

Expand All @@ -133,18 +134,21 @@ func NewMongoDBAtlasClient(
backoffConfig configretry.BackOffConfig,
log *zap.Logger,
) *MongoDBAtlasClient {
t := digest.NewTransport(publicKey, privateKey)
defaultTransporter := &http.Transport{}
t := digest.NewTransportWithHTTPTransport(publicKey, privateKey, defaultTransporter)
roundTripper := newClientRoundTripper(t, log, backoffConfig)
tc := &http.Client{Transport: roundTripper}
client := mongodbatlas.NewClient(tc)
return &MongoDBAtlasClient{
log,
client,
defaultTransporter,
roundTripper,
}
}

func (s *MongoDBAtlasClient) Shutdown() error {
s.transport.CloseIdleConnections()
return s.roundTripper.Shutdown()
}

Expand Down
14 changes: 14 additions & 0 deletions receiver/mongodbatlasreceiver/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package mongodbatlasreceiver

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
10 changes: 7 additions & 3 deletions receiver/mongodbatlasreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ func TestDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
require.Equal(t, cfg.(*Config).ControllerConfig.CollectionInterval, 3*time.Minute)
recv, err := createMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, consumertest.NewNop())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

recv, err := createMetricsReceiver(ctx, receivertest.NewNopCreateSettings(), cfg, consumertest.NewNop())
require.NoError(t, err)
require.NotNil(t, recv, "receiver creation failed")

err = recv.Start(context.Background(), componenttest.NewNopHost())
err = recv.Start(ctx, componenttest.NewNopHost())
require.NoError(t, err)

err = recv.Shutdown(context.Background())
err = recv.Shutdown(ctx)
require.NoError(t, err)
}

Expand Down

0 comments on commit d936776

Please sign in to comment.