Skip to content

Commit

Permalink
Merge pull request #107163 from cyclinder/fix_leak_goroutine
Browse files Browse the repository at this point in the history
fix goroutine leaks in TestConfigurationChannels
  • Loading branch information
k8s-ci-robot committed Jan 11, 2022
2 parents ca4af7a + 928e686 commit a0dfd95
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 31 deletions.
5 changes: 3 additions & 2 deletions pkg/kubelet/config/config.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package config

import (
"context"
"fmt"
"reflect"
"sync"
Expand Down Expand Up @@ -81,11 +82,11 @@ func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder)

// Channel creates or returns a config source channel. The channel
// only accepts PodUpdates
func (c *PodConfig) Channel(source string) chan<- interface{} {
func (c *PodConfig) Channel(ctx context.Context, source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
return c.mux.Channel(source)
return c.mux.ChannelWithContext(ctx, source)
}

// SeenAllSources returns true if seenSources contains all sources in the
Expand Down
75 changes: 59 additions & 16 deletions pkg/kubelet/config/config_test.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package config

import (
"context"
"math/rand"
"reflect"
"sort"
Expand Down Expand Up @@ -85,10 +86,10 @@ func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod)
return kubetypes.PodUpdate{Pods: pods, Op: op, Source: source}
}

func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
func createPodConfigTester(ctx context.Context, mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
eventBroadcaster := record.NewBroadcaster()
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}))
channel := config.Channel(TestSource)
channel := config.Channel(ctx, TestSource)
ch := config.Updates()
return channel, ch, config
}
Expand Down Expand Up @@ -129,7 +130,10 @@ func expectNoPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate) {
}

func TestNewPodAdded(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)

// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
Expand All @@ -141,7 +145,10 @@ func TestNewPodAdded(t *testing.T) {
}

func TestNewPodAddedInvalidNamespace(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)

// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", ""))
Expand All @@ -153,7 +160,10 @@ func TestNewPodAddedInvalidNamespace(t *testing.T) {
}

func TestNewPodAddedDefaultNamespace(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)

// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default"))
Expand All @@ -165,7 +175,10 @@ func TestNewPodAddedDefaultNamespace(t *testing.T) {
}

func TestNewPodAddedDifferentNamespaces(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)

// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default"))
Expand All @@ -182,7 +195,10 @@ func TestNewPodAddedDifferentNamespaces(t *testing.T) {
}

func TestInvalidPodFiltered(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)

// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
Expand All @@ -196,7 +212,10 @@ func TestInvalidPodFiltered(t *testing.T) {
}

func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshotAndUpdates)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshotAndUpdates)

// see an set
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
Expand All @@ -214,7 +233,10 @@ func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
}

func TestNewPodAddedSnapshot(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshot)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshot)

// see an set
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
Expand All @@ -232,7 +254,10 @@ func TestNewPodAddedSnapshot(t *testing.T) {
}

func TestNewPodAddedUpdatedRemoved(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)

// should register an add
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
Expand All @@ -255,7 +280,10 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
}

func TestNewPodAddedDelete(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)

// should register an add
addedPod := CreateValidPod("foo", "new")
Expand All @@ -274,7 +302,10 @@ func TestNewPodAddedDelete(t *testing.T) {
}

func TestNewPodAddedUpdatedSet(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)

// should register an add
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"), CreateValidPod("foo2", "new"), CreateValidPod("foo3", "new"))
Expand All @@ -296,6 +327,9 @@ func TestNewPodAddedUpdatedSet(t *testing.T) {
}

func TestNewPodAddedSetReconciled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Create and touch new test pods, return the new pods and touched pod. We should create new pod list
// before touching to avoid data race.
newTestPods := func(touchStatus, touchSpec bool) ([]*v1.Pod, *v1.Pod) {
Expand All @@ -318,7 +352,7 @@ func TestNewPodAddedSetReconciled(t *testing.T) {
} {
var podWithStatusChange *v1.Pod
pods, _ := newTestPods(false, false)
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)

// Use SET to initialize the config, especially initialize the source set
channel <- CreatePodUpdate(kubetypes.SET, TestSource, pods...)
Expand All @@ -341,6 +375,9 @@ func TestNewPodAddedSetReconciled(t *testing.T) {
}

func TestInitialEmptySet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for _, test := range []struct {
mode PodConfigNotificationMode
op kubetypes.PodOperation
Expand All @@ -349,7 +386,7 @@ func TestInitialEmptySet(t *testing.T) {
{PodConfigNotificationSnapshot, kubetypes.SET},
{PodConfigNotificationSnapshotAndUpdates, kubetypes.SET},
} {
channel, ch, _ := createPodConfigTester(test.mode)
channel, ch, _ := createPodConfigTester(ctx, test.mode)

// should register an empty PodUpdate operation
podUpdate := CreatePodUpdate(kubetypes.SET, TestSource)
Expand All @@ -366,7 +403,10 @@ func TestInitialEmptySet(t *testing.T) {
}

func TestPodUpdateAnnotations(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)

pod := CreateValidPod("foo2", "new")
pod.Annotations = make(map[string]string)
Expand Down Expand Up @@ -395,7 +435,10 @@ func TestPodUpdateAnnotations(t *testing.T) {
}

func TestPodUpdateLabels(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)

pod := CreateValidPod("foo2", "new")
pod.Labels = make(map[string]string)
Expand Down
9 changes: 6 additions & 3 deletions pkg/kubelet/kubelet.go
Expand Up @@ -270,21 +270,24 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)

// TODO: it needs to be replaced by a proper context in the future
ctx := context.TODO()

// define file config source
if kubeCfg.StaticPodPath != "" {
klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.FileSource))
}

// define url config source
if kubeCfg.StaticPodURL != "" {
klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.HTTPSource))
}

if kubeDeps.KubeClient != nil {
klog.InfoS("Adding apiserver pod source")
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(kubetypes.ApiserverSource))
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource))
}
return cfg, nil
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/util/config/config.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package config

import (
"context"
"sync"

"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -57,12 +58,12 @@ func NewMux(merger Merger) *Mux {
return mux
}

// Channel returns a channel where a configuration source
// ChannelWithContext returns a channel where a configuration source
// can send updates of new configurations. Multiple calls with the same
// source will return the same channel. This allows change and state based sources
// to use the same channel. Different source names however will be treated as a
// union.
func (m *Mux) Channel(source string) chan interface{} {
func (m *Mux) ChannelWithContext(ctx context.Context, source string) chan interface{} {
if len(source) == 0 {
panic("Channel given an empty name")
}
Expand All @@ -74,7 +75,8 @@ func (m *Mux) Channel(source string) chan interface{} {
}
newChannel := make(chan interface{})
m.sources[source] = newChannel
go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)

go wait.Until(func() { m.listen(source, newChannel) }, 0, ctx.Done())
return newChannel
}

Expand Down
27 changes: 20 additions & 7 deletions pkg/util/config/config_test.go
Expand Up @@ -17,17 +17,21 @@ limitations under the License.
package config

import (
"context"
"reflect"
"testing"
)

func TestConfigurationChannels(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mux := NewMux(nil)
channelOne := mux.Channel("one")
if channelOne != mux.Channel("one") {
channelOne := mux.ChannelWithContext(ctx, "one")
if channelOne != mux.ChannelWithContext(ctx, "one") {
t.Error("Didn't get the same muxuration channel back with the same name")
}
channelTwo := mux.Channel("two")
channelTwo := mux.ChannelWithContext(ctx, "two")
if channelOne == channelTwo {
t.Error("Got back the same muxuration channel for different names")
}
Expand All @@ -50,12 +54,18 @@ func (m MergeMock) Merge(source string, update interface{}) error {
}

func TestMergeInvoked(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

merger := MergeMock{"one", "test", t}
mux := NewMux(&merger)
mux.Channel("one") <- "test"
mux.ChannelWithContext(ctx, "one") <- "test"
}

func TestMergeFuncInvoked(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch := make(chan bool)
mux := NewMux(MergeFunc(func(source string, update interface{}) error {
if source != "one" {
Expand All @@ -67,11 +77,14 @@ func TestMergeFuncInvoked(t *testing.T) {
ch <- true
return nil
}))
mux.Channel("one") <- "test"
mux.ChannelWithContext(ctx, "one") <- "test"
<-ch
}

func TestSimultaneousMerge(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch := make(chan bool, 2)
mux := NewMux(MergeFunc(func(source string, update interface{}) error {
switch source {
Expand All @@ -89,8 +102,8 @@ func TestSimultaneousMerge(t *testing.T) {
ch <- true
return nil
}))
source := mux.Channel("one")
source2 := mux.Channel("two")
source := mux.ChannelWithContext(ctx, "one")
source2 := mux.ChannelWithContext(ctx, "two")
source <- "test"
source2 <- "test2"
<-ch
Expand Down

0 comments on commit a0dfd95

Please sign in to comment.