Skip to content

Commit

Permalink
Merge pull request #106583 from hzxuzhonghu/automated-cherry-pick-of-…
Browse files Browse the repository at this point in the history
…#104991-#105031-origin-release-1.21

Automated cherry pick of #104991: Fix workqueue memory leak
#105031: workqueue: fix leak in queue preventing objects from being
  • Loading branch information
k8s-ci-robot committed Jan 15, 2022
2 parents cb06707 + 32ab81f commit 9d7f909
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
5 changes: 4 additions & 1 deletion staging/src/k8s.io/client-go/util/workqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ func (q *Type) Get() (item interface{}, shutdown bool) {
return nil, true
}

item, q.queue = q.queue[0], q.queue[1:]
item = q.queue[0]
// The underlying array still exists and reference this object, so the object will not be garbage collected.
q.queue[0] = nil
q.queue = q.queue[1:]

q.metrics.get(item)

Expand Down
40 changes: 40 additions & 0 deletions staging/src/k8s.io/client-go/util/workqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ limitations under the License.
package workqueue_test

import (
"runtime"
"sync"
"sync/atomic"
"testing"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
)

Expand Down Expand Up @@ -159,3 +162,40 @@ func TestReinsert(t *testing.T) {
t.Errorf("Expected queue to be empty. Has %v items", a)
}
}

// TestGarbageCollection ensures that objects that are added then removed from the queue are
// able to be garbage collected.
func TestGarbageCollection(t *testing.T) {
type bigObject struct {
data []byte
}
leakQueue := workqueue.New()
t.Cleanup(func() {
// Make sure leakQueue doesn't go out of scope too early
runtime.KeepAlive(leakQueue)
})
c := &bigObject{data: []byte("hello")}
mustGarbageCollect(t, c)
leakQueue.Add(c)
o, _ := leakQueue.Get()
leakQueue.Done(o)
}

// mustGarbageCollect asserts than an object was garbage collected by the end of the test.
// The input must be a pointer to an object.
func mustGarbageCollect(t *testing.T, i interface{}) {
t.Helper()
var collected int32 = 0
runtime.SetFinalizer(i, func(x interface{}) {
atomic.StoreInt32(&collected, 1)
})
t.Cleanup(func() {
if err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (done bool, err error) {
// Trigger GC explicitly, otherwise we may need to wait a long time for it to run
runtime.GC()
return atomic.LoadInt32(&collected) == 1, nil
}); err != nil {
t.Errorf("object was not garbage collected")
}
})
}

0 comments on commit 9d7f909

Please sign in to comment.