From 097c74557b4d1f5569397ea5204549e8953af043 Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Wed, 8 May 2024 09:32:00 -0700 Subject: [PATCH] [receiver/googlecloudpubsubreceiver] Fix memory leak during shutdown (#32361) **Description:** This PR contains the following changes: 1. Add `Close` call to the receiver's GRPC client. Without this, goroutines were being leaked on shutdown. 2. Change `grpc.Dial` -> `grpc.NewClient`. They offer the same functionality, but `Dial` is being deprecated in favor of `NewClient`. 3. Enable `goleak` checks on this receiver to help ensure no goroutines are being leaked. 4. Change a couple `Assert.Nil` calls to `Assert.NoError`. The output of `NoError` includes the error message if hit, `Nil` simply includes the object's address, i.e. `&status.Error{s:(*status.Status)(0xc00007e158)}` **Link to tracking Issue:** #30438 **Testing:** All existing tests are passing, as well as added goleak check. --- .chloggen/goleak_googlepubsubrec.yaml | 27 +++++++++++++++++++ .../generated_package_test.go | 4 ++- .../googlecloudpubsubreceiver/metadata.yaml | 5 +++- .../googlecloudpubsubreceiver/receiver.go | 16 ++++++++--- .../receiver_test.go | 4 +-- 5 files changed, 49 insertions(+), 7 deletions(-) create mode 100644 .chloggen/goleak_googlepubsubrec.yaml diff --git a/.chloggen/goleak_googlepubsubrec.yaml b/.chloggen/goleak_googlepubsubrec.yaml new file mode 100644 index 0000000000000..4e994677e8965 --- /dev/null +++ b/.chloggen/goleak_googlepubsubrec.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: googlecloudpubsubreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix memory leak during shutdown + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32361] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/googlecloudpubsubreceiver/generated_package_test.go b/receiver/googlecloudpubsubreceiver/generated_package_test.go index 9a70013ef2964..0bde1b757a429 100644 --- a/receiver/googlecloudpubsubreceiver/generated_package_test.go +++ b/receiver/googlecloudpubsubreceiver/generated_package_test.go @@ -4,8 +4,10 @@ package googlecloudpubsubreceiver import ( "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { - // skipping goleak test as per metadata.yml configuration + goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) } diff --git a/receiver/googlecloudpubsubreceiver/metadata.yaml b/receiver/googlecloudpubsubreceiver/metadata.yaml index 9930f9763546a..f4517dffd812b 100644 --- a/receiver/googlecloudpubsubreceiver/metadata.yaml +++ b/receiver/googlecloudpubsubreceiver/metadata.yaml @@ -18,5 +18,8 @@ tests: skip_lifecycle: true skip_shutdown: true goleak: - skip: true + skip: false + ignore: + # See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information. + top: go.opencensus.io/stats/view.(*worker).start diff --git a/receiver/googlecloudpubsubreceiver/receiver.go b/receiver/googlecloudpubsubreceiver/receiver.go index 02f9c1964843c..504546201e2e0 100644 --- a/receiver/googlecloudpubsubreceiver/receiver.go +++ b/receiver/googlecloudpubsubreceiver/receiver.go @@ -26,7 +26,9 @@ import ( "go.uber.org/zap" "google.golang.org/api/option" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal" ) @@ -76,7 +78,7 @@ func (receiver *pubsubReceiver) generateClientOptions() (copts []option.ClientOp if receiver.userAgent != "" { dialOpts = append(dialOpts, grpc.WithUserAgent(receiver.userAgent)) } - conn, _ := grpc.Dial(receiver.config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...) + conn, _ := grpc.NewClient(receiver.config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...) copts = append(copts, option.WithGRPCConn(conn)) } else { copts = append(copts, option.WithEndpoint(receiver.config.Endpoint)) @@ -113,13 +115,21 @@ func (receiver *pubsubReceiver) Start(ctx context.Context, _ component.Host) err } func (receiver *pubsubReceiver) Shutdown(_ context.Context) error { + var err error + if receiver.client != nil { + // A canceled code means the client connection is already closed, + // Shutdown shouldn't return an error in that case. + if closeErr := receiver.client.Close(); status.Code(closeErr) != codes.Canceled { + err = closeErr + } + } if receiver.handler == nil { - return nil + return err } receiver.logger.Info("Stopping Google Pubsub receiver") receiver.handler.CancelNow() receiver.logger.Info("Stopped Google Pubsub receiver") - return nil + return err } func (receiver *pubsubReceiver) handleLogStrings(ctx context.Context, message *pubsubpb.ReceivedMessage) error { diff --git a/receiver/googlecloudpubsubreceiver/receiver_test.go b/receiver/googlecloudpubsubreceiver/receiver_test.go index b355a38182371..8e428d63d705c 100644 --- a/receiver/googlecloudpubsubreceiver/receiver_test.go +++ b/receiver/googlecloudpubsubreceiver/receiver_test.go @@ -153,6 +153,6 @@ func TestReceiver(t *testing.T) { return len(logSink.AllLogs()) == 1 }, time.Second, 10*time.Millisecond) - assert.Nil(t, receiver.Shutdown(ctx)) - assert.Nil(t, receiver.Shutdown(ctx)) + assert.NoError(t, receiver.Shutdown(ctx)) + assert.NoError(t, receiver.Shutdown(ctx)) }