Skip to content

Commit

Permalink
Merge pull request kubernetes#110182 from kerthcet/cleanup/remove-pot…
Browse files Browse the repository at this point in the history
…ential-goroutine-leak-in-metric-recorder

Remove potential goroutine leak in testing framework
  • Loading branch information
k8s-ci-robot committed Aug 6, 2022
2 parents 64ed914 + 97e3e50 commit 985c920
Show file tree
Hide file tree
Showing 19 changed files with 189 additions and 113 deletions.
6 changes: 4 additions & 2 deletions pkg/scheduler/extender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,10 @@ func TestSchedulerWithExtenders(t *testing.T) {
for _, name := range test.nodes {
cache.AddNode(createNode(name))
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fwk, err := st.NewFramework(
test.registerPlugins, "",
test.registerPlugins, "", ctx.Done(),
runtime.WithClientSet(client),
runtime.WithInformerFactory(informerFactory),
runtime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
Expand All @@ -304,7 +306,7 @@ func TestSchedulerWithExtenders(t *testing.T) {
emptySnapshot,
schedulerapi.DefaultPercentageOfNodesToScore)
podIgnored := &v1.Pod{}
result, err := scheduler.SchedulePod(context.Background(), fwk, framework.NewCycleState(), podIgnored)
result, err := scheduler.SchedulePod(ctx, fwk, framework.NewCycleState(), podIgnored)
if test.expectsErr {
if err == nil {
t.Errorf("Unexpected non-error, result %+v", result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
Expand Down Expand Up @@ -53,6 +52,9 @@ func TestDefaultBinder(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var gotBinding *v1.Binding
client := fake.NewSimpleClientset(testPod)
client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
Expand All @@ -66,12 +68,12 @@ func TestDefaultBinder(t *testing.T) {
return true, gotBinding, nil
})

fh, err := frameworkruntime.NewFramework(nil, nil, wait.NeverStop, frameworkruntime.WithClientSet(client))
fh, err := frameworkruntime.NewFramework(nil, nil, ctx.Done(), frameworkruntime.WithClientSet(client))
if err != nil {
t.Fatal(err)
}
binder := &DefaultBinder{handle: fh}
status := binder.Bind(context.Background(), nil, testPod, "foohost.kubernetes.mydomain.com")
status := binder.Bind(ctx, nil, testPod, "foohost.kubernetes.mydomain.com")
if got := status.AsError(); (tt.injectErr != nil) != (got != nil) {
t.Errorf("got error %q, want %q", got, tt.injectErr)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,9 @@ func TestPostFilter(t *testing.T) {
if tt.extender != nil {
extenders = append(extenders, tt.extender)
}
f, err := st.NewFramework(registeredPlugins, "",
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := st.NewFramework(registeredPlugins, "", ctx.Done(),
frameworkruntime.WithClientSet(cs),
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithInformerFactory(informerFactory),
Expand All @@ -371,11 +373,11 @@ func TestPostFilter(t *testing.T) {

state := framework.NewCycleState()
// Ensure <state> is populated.
if _, status := f.RunPreFilterPlugins(context.Background(), state, tt.pod); !status.IsSuccess() {
if _, status := f.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
t.Errorf("Unexpected PreFilter Status: %v", status)
}

gotResult, gotStatus := p.PostFilter(context.TODO(), state, tt.pod, tt.filteredNodesStatuses)
gotResult, gotStatus := p.PostFilter(ctx, state, tt.pod, tt.filteredNodesStatuses)
// As we cannot compare two errors directly due to miss the equal method for how to compare two errors, so just need to compare the reasons.
if gotStatus.Code() == framework.Error {
if diff := cmp.Diff(tt.wantStatus.Reasons(), gotStatus.Reasons()); diff != "" {
Expand Down Expand Up @@ -1083,8 +1085,11 @@ func TestDryRunPreemption(t *testing.T) {
// or minCandidateNodesAbsolute. This is only done in a handful of tests.
parallelism = 1
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fwk, err := st.NewFramework(
registeredPlugins, "",
registeredPlugins, "", ctx.Done(),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithInformerFactory(informerFactory),
Expand All @@ -1094,8 +1099,6 @@ func TestDryRunPreemption(t *testing.T) {
t.Fatal(err)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())

Expand Down Expand Up @@ -1125,7 +1128,7 @@ func TestDryRunPreemption(t *testing.T) {
for cycle, pod := range tt.testPods {
state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState.
if _, status := fwk.RunPreFilterPlugins(context.Background(), state, pod); !status.IsSuccess() {
if _, status := fwk.RunPreFilterPlugins(ctx, state, pod); !status.IsSuccess() {
t.Errorf("cycle %d: Unexpected PreFilter Status: %v", cycle, status)
}
pe := preemption.Evaluator{
Expand All @@ -1137,7 +1140,7 @@ func TestDryRunPreemption(t *testing.T) {
Interface: pl,
}
offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos)))
got, _, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, tt.pdbs, offset, numCandidates)
got, _, _ := pe.DryRunPreemption(ctx, pod, nodeInfos, tt.pdbs, offset, numCandidates)
// Sort the values (inner victims) and the candidate itself (by its NominatedNodeName).
for i := range got {
victims := got[i].Victims().Pods
Expand Down Expand Up @@ -1334,13 +1337,16 @@ func TestSelectBestCandidate(t *testing.T) {
cs := clientsetfake.NewSimpleClientset(objs...)
informerFactory := informers.NewSharedInformerFactory(cs, 0)
snapshot := internalcache.NewSnapshot(tt.pods, nodes)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fwk, err := st.NewFramework(
[]st.RegisterPluginFunc{
tt.registerPlugin,
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
"",
ctx.Done(),
frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
frameworkruntime.WithSnapshotSharedLister(snapshot),
)
Expand All @@ -1350,7 +1356,7 @@ func TestSelectBestCandidate(t *testing.T) {

state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState.
if _, status := fwk.RunPreFilterPlugins(context.Background(), state, tt.pod); !status.IsSuccess() {
if _, status := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
t.Errorf("Unexpected PreFilter Status: %v", status)
}
nodeInfos, err := snapshot.NodeInfos().List()
Expand All @@ -1373,7 +1379,7 @@ func TestSelectBestCandidate(t *testing.T) {
Interface: pl,
}
offset, numCandidates := pl.GetOffsetAndNumCandidates(int32(len(nodeInfos)))
candidates, _, _ := pe.DryRunPreemption(context.Background(), tt.pod, nodeInfos, nil, offset, numCandidates)
candidates, _, _ := pe.DryRunPreemption(ctx, tt.pod, nodeInfos, nil, offset, numCandidates)
s := pe.SelectCandidate(candidates)
if s == nil || len(s.Name()) == 0 {
return
Expand Down Expand Up @@ -1445,7 +1451,9 @@ func TestPodEligibleToPreemptOthers(t *testing.T) {
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
}
f, err := st.NewFramework(registeredPlugins, "",
stopCh := make(chan struct{})
defer close(stopCh)
f, err := st.NewFramework(registeredPlugins, "", stopCh,
frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(test.pods, nodes)),
)
if err != nil {
Expand Down Expand Up @@ -1639,10 +1647,10 @@ func TestPreempt(t *testing.T) {
return true, nil, nil
})

stop := make(chan struct{})
defer close(stop)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cache := internalcache.New(time.Duration(0), stop)
cache := internalcache.New(time.Duration(0), ctx.Done())
for _, pod := range test.pods {
cache.AddPod(pod)
}
Expand Down Expand Up @@ -1678,6 +1686,7 @@ func TestPreempt(t *testing.T) {
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
"",
ctx.Done(),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(&events.FakeRecorder{}),
frameworkruntime.WithExtenders(extenders),
Expand All @@ -1691,7 +1700,7 @@ func TestPreempt(t *testing.T) {

state := framework.NewCycleState()
// Some tests rely on PreFilter plugin to compute its CycleState.
if _, s := fwk.RunPreFilterPlugins(context.Background(), state, test.pod); !s.IsSuccess() {
if _, s := fwk.RunPreFilterPlugins(ctx, state, test.pod); !s.IsSuccess() {
t.Errorf("Unexpected preFilterStatus: %v", s)
}
// Call preempt and check the expected results.
Expand All @@ -1710,7 +1719,7 @@ func TestPreempt(t *testing.T) {
State: state,
Interface: &pl,
}
res, status := pe.Preempt(context.Background(), test.pod, make(framework.NodeToStatusMap))
res, status := pe.Preempt(ctx, test.pod, make(framework.NodeToStatusMap))
if !status.IsSuccess() && !status.IsUnschedulable() {
t.Errorf("unexpected error in preemption: %v", status.AsError())
}
Expand Down Expand Up @@ -1746,7 +1755,7 @@ func TestPreempt(t *testing.T) {
}

// Call preempt again and make sure it doesn't preempt any more pods.
res, status = pe.Preempt(context.Background(), test.pod, make(framework.NodeToStatusMap))
res, status = pe.Preempt(ctx, test.pod, make(framework.NodeToStatusMap))
if !status.IsSuccess() && !status.IsUnschedulable() {
t.Errorf("unexpected error in preemption: %v", status.AsError())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
Expand Down Expand Up @@ -332,16 +331,18 @@ func TestImageLocalityPriority(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
snapshot := cache.NewSnapshot(nil, test.nodes)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

snapshot := cache.NewSnapshot(nil, test.nodes)
state := framework.NewCycleState()
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot))

p, _ := New(nil, fh)
var gotList framework.NodeScoreList
for _, n := range test.nodes {
nodeName := n.ObjectMeta.Name
score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.pod, nodeName)
score, status := p.(framework.ScorePlugin).Score(ctx, state, test.pod, nodeName)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
Expand Down Expand Up @@ -1133,30 +1132,33 @@ func TestNodeAffinityPriority(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

state := framework.NewCycleState()
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes)))
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(cache.NewSnapshot(nil, test.nodes)))
p, err := New(&test.args, fh)
if err != nil {
t.Fatalf("Creating plugin: %v", err)
}
var status *framework.Status
if !test.disablePreScore {
status = p.(framework.PreScorePlugin).PreScore(context.Background(), state, test.pod, test.nodes)
status = p.(framework.PreScorePlugin).PreScore(ctx, state, test.pod, test.nodes)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
}
var gotList framework.NodeScoreList
for _, n := range test.nodes {
nodeName := n.ObjectMeta.Name
score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.pod, nodeName)
score, status := p.(framework.ScorePlugin).Score(ctx, state, test.pod, nodeName)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
gotList = append(gotList, framework.NodeScore{Name: nodeName, Score: score})
}

status = p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(context.Background(), state, test.pod, gotList)
status = p.(framework.ScorePlugin).ScoreExtensions().NormalizeScore(ctx, state, test.pod, gotList)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
Expand Down Expand Up @@ -351,10 +350,12 @@ func TestNodeResourcesBalancedAllocation(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
snapshot := cache.NewSnapshot(test.pods, test.nodes)
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot))
p, _ := NewBalancedAllocation(&test.args, fh, feature.Features{})
for i := range test.nodes {
hostResult, err := p.(framework.ScorePlugin).Score(context.Background(), nil, test.pod, test.nodes[i].Name)
hostResult, err := p.(framework.ScorePlugin).Score(ctx, nil, test.pod, test.nodes[i].Name)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/scheduler/framework/plugins/noderesources/fit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
Expand Down Expand Up @@ -746,9 +745,12 @@ func TestFitScore(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

state := framework.NewCycleState()
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot))
args := test.nodeResourcesFitArgs
p, err := NewFit(&args, fh, plfeature.Features{})
if err != nil {
Expand All @@ -757,7 +759,7 @@ func TestFitScore(t *testing.T) {

var gotPriorities framework.NodeScoreList
for _, n := range test.nodes {
score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.requestedPod, n.Name)
score, status := p.(framework.ScorePlugin).Score(ctx, state, test.requestedPod, n.Name)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
Expand Down Expand Up @@ -375,9 +374,12 @@ func TestLeastAllocatedScoringStrategy(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

state := framework.NewCycleState()
snapshot := cache.NewSnapshot(test.existingPods, test.nodes)
fh, _ := runtime.NewFramework(nil, nil, wait.NeverStop, runtime.WithSnapshotSharedLister(snapshot))
fh, _ := runtime.NewFramework(nil, nil, ctx.Done(), runtime.WithSnapshotSharedLister(snapshot))

p, err := NewFit(
&config.NodeResourcesFitArgs{
Expand All @@ -396,7 +398,7 @@ func TestLeastAllocatedScoringStrategy(t *testing.T) {

var gotScores framework.NodeScoreList
for _, n := range test.nodes {
score, status := p.(framework.ScorePlugin).Score(context.Background(), state, test.requestedPod, n.Name)
score, status := p.(framework.ScorePlugin).Score(ctx, state, test.requestedPod, n.Name)
if !status.IsSuccess() {
t.Errorf("unexpected error: %v", status)
}
Expand Down

0 comments on commit 985c920

Please sign in to comment.