diff --git a/.chloggen/goleak_googlepubsubrec.yaml b/.chloggen/goleak_googlepubsubrec.yaml new file mode 100644 index 0000000000000..4e994677e8965 --- /dev/null +++ b/.chloggen/goleak_googlepubsubrec.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: googlecloudpubsubreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix memory leak during shutdown + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32361] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/googlecloudpubsubreceiver/generated_package_test.go b/receiver/googlecloudpubsubreceiver/generated_package_test.go index 9a70013ef2964..0bde1b757a429 100644 --- a/receiver/googlecloudpubsubreceiver/generated_package_test.go +++ b/receiver/googlecloudpubsubreceiver/generated_package_test.go @@ -4,8 +4,10 @@ package googlecloudpubsubreceiver import ( "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { - // skipping goleak test as per metadata.yml configuration + goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) } diff --git a/receiver/googlecloudpubsubreceiver/metadata.yaml b/receiver/googlecloudpubsubreceiver/metadata.yaml index 9930f9763546a..f4517dffd812b 100644 --- a/receiver/googlecloudpubsubreceiver/metadata.yaml +++ b/receiver/googlecloudpubsubreceiver/metadata.yaml @@ -18,5 +18,8 @@ tests: skip_lifecycle: true skip_shutdown: true goleak: - skip: true + skip: false + ignore: + # See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information. + top: go.opencensus.io/stats/view.(*worker).start diff --git a/receiver/googlecloudpubsubreceiver/receiver.go b/receiver/googlecloudpubsubreceiver/receiver.go index 02f9c1964843c..504546201e2e0 100644 --- a/receiver/googlecloudpubsubreceiver/receiver.go +++ b/receiver/googlecloudpubsubreceiver/receiver.go @@ -26,7 +26,9 @@ import ( "go.uber.org/zap" "google.golang.org/api/option" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal" ) @@ -76,7 +78,7 @@ func (receiver *pubsubReceiver) generateClientOptions() (copts []option.ClientOp if receiver.userAgent != "" { dialOpts = append(dialOpts, grpc.WithUserAgent(receiver.userAgent)) } - conn, _ := grpc.Dial(receiver.config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...) + conn, _ := grpc.NewClient(receiver.config.Endpoint, append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))...) copts = append(copts, option.WithGRPCConn(conn)) } else { copts = append(copts, option.WithEndpoint(receiver.config.Endpoint)) @@ -113,13 +115,21 @@ func (receiver *pubsubReceiver) Start(ctx context.Context, _ component.Host) err } func (receiver *pubsubReceiver) Shutdown(_ context.Context) error { + var err error + if receiver.client != nil { + // A canceled code means the client connection is already closed, + // Shutdown shouldn't return an error in that case. + if closeErr := receiver.client.Close(); status.Code(closeErr) != codes.Canceled { + err = closeErr + } + } if receiver.handler == nil { - return nil + return err } receiver.logger.Info("Stopping Google Pubsub receiver") receiver.handler.CancelNow() receiver.logger.Info("Stopped Google Pubsub receiver") - return nil + return err } func (receiver *pubsubReceiver) handleLogStrings(ctx context.Context, message *pubsubpb.ReceivedMessage) error { diff --git a/receiver/googlecloudpubsubreceiver/receiver_test.go b/receiver/googlecloudpubsubreceiver/receiver_test.go index b355a38182371..8e428d63d705c 100644 --- a/receiver/googlecloudpubsubreceiver/receiver_test.go +++ b/receiver/googlecloudpubsubreceiver/receiver_test.go @@ -153,6 +153,6 @@ func TestReceiver(t *testing.T) { return len(logSink.AllLogs()) == 1 }, time.Second, 10*time.Millisecond) - assert.Nil(t, receiver.Shutdown(ctx)) - assert.Nil(t, receiver.Shutdown(ctx)) + assert.NoError(t, receiver.Shutdown(ctx)) + assert.NoError(t, receiver.Shutdown(ctx)) }