Skip to content

Commit

Permalink
[receiver/k8sobjects] Fix memory leak caused by pull mode's interval …
Browse files Browse the repository at this point in the history
…ticker (#31919)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
The k8sobjects receiver was starting a ticker when in pull mode, that
would tick every set interval. This ticker needs to be stopped during
shutdown, so I added functionality to cancel the context during
shutdown, properly stopping the ticker.

The original intention here was to add `goleak`, but due to a bug in
`internal/k8stest` it's failing on the end to end test. This change
still fixes a bug, so I believe it's worth merging even without `goleak`
at this point. Note that this change does reduce the number and
frequency of goleak failures.

**Testing:** <Describe what testing was performed and which tests were
added.>
All existing tests are passing.
  • Loading branch information
crobert-1 committed Apr 2, 2024
1 parent 4e9baa7 commit 9d8b55c
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 5 deletions.
27 changes: 27 additions & 0 deletions .chloggen/goleak_k8sobjectsreceiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sobjectsreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix memory leak caused by the pull mode's interval ticker

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31919]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
24 changes: 19 additions & 5 deletions receiver/k8sobjectsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type k8sobjectsreceiver struct {
consumer consumer.Logs
obsrecv *receiverhelper.ObsReport
mu sync.Mutex
cancel context.CancelFunc
}

func newReceiver(params receiver.CreateSettings, config *Config, consumer consumer.Logs) (receiver.Logs, error) {
Expand Down Expand Up @@ -72,14 +73,21 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, _ component.Host) error
kr.client = client
kr.setting.Logger.Info("Object Receiver started")

cctx, cancel := context.WithCancel(ctx)
kr.cancel = cancel

for _, object := range kr.config.Objects {
kr.start(ctx, object)
kr.start(cctx, object)
}
return nil
}

func (kr *k8sobjectsreceiver) Shutdown(context.Context) error {
kr.setting.Logger.Info("Object Receiver stopped")
if kr.cancel != nil {
kr.cancel()
}

kr.mu.Lock()
for _, stopperChan := range kr.stopperChanList {
close(stopperChan)
Expand Down Expand Up @@ -118,7 +126,7 @@ func (kr *k8sobjectsreceiver) startPull(ctx context.Context, config *K8sObjectsC
kr.mu.Lock()
kr.stopperChanList = append(kr.stopperChanList, stopperChan)
kr.mu.Unlock()
ticker := newTicker(config.Interval)
ticker := newTicker(ctx, config.Interval)
listOption := metav1.ListOptions{
FieldSelector: config.FieldSelector,
LabelSelector: config.LabelSelector,
Expand Down Expand Up @@ -263,16 +271,22 @@ func getResourceVersion(ctx context.Context, config *K8sObjectsConfig, resource

// Start ticking immediately.
// Ref: https://stackoverflow.com/questions/32705582/how-to-get-time-tick-to-tick-immediately
func newTicker(repeat time.Duration) *time.Ticker {
func newTicker(ctx context.Context, repeat time.Duration) *time.Ticker {
ticker := time.NewTicker(repeat)
oc := ticker.C
nc := make(chan time.Time, 1)
go func() {
nc <- time.Now()
for tm := range oc {
nc <- tm
for {
select {
case tm := <-oc:
nc <- tm
case <-ctx.Done():
return
}
}
}()

ticker.C = nc
return ticker
}

0 comments on commit 9d8b55c

Please sign in to comment.