Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #62465 upstream release 1.9 #62528

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion test/e2e/framework/rc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -104,7 +105,7 @@ func ScaleRCByLabels(clientset clientset.Interface, internalClientset internalcl
return err
}
if replicas == 0 {
ps, err := podStoreForSelector(clientset, rc.Namespace, labels.SelectorFromSet(rc.Spec.Selector))
ps, err := testutils.NewPodStore(clientset, rc.Namespace, labels.SelectorFromSet(rc.Spec.Selector), fields.Everything())
if err != nil {
return err
}
Expand Down
26 changes: 8 additions & 18 deletions test/e2e/framework/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2767,9 +2767,12 @@ func WaitForControlledPods(c clientset.Interface, ns, name string, kind schema.G

// Returns true if all the specified pods are scheduled, else returns false.
func podsWithLabelScheduled(c clientset.Interface, ns string, label labels.Selector) (bool, error) {
PodStore := testutil.NewPodStore(c, ns, label, fields.Everything())
defer PodStore.Stop()
pods := PodStore.List()
ps, err := testutil.NewPodStore(c, ns, label, fields.Everything())
if err != nil {
return false, err
}
defer ps.Stop()
pods := ps.List()
if len(pods) == 0 {
return false, nil
}
Expand Down Expand Up @@ -2955,7 +2958,7 @@ func DeleteResourceAndPods(clientset clientset.Interface, internalClientset inte
return err
}

ps, err := podStoreForSelector(clientset, ns, selector)
ps, err := testutil.NewPodStore(clientset, ns, selector, fields.Everything())
if err != nil {
return err
}
Expand Down Expand Up @@ -3009,7 +3012,7 @@ func DeleteResourceAndWaitForGC(c clientset.Interface, kind schema.GroupKind, ns
return err
}

ps, err := podStoreForSelector(c, ns, selector)
ps, err := testutil.NewPodStore(c, ns, selector, fields.Everything())
if err != nil {
return err
}
Expand Down Expand Up @@ -3060,19 +3063,6 @@ func DeleteResourceAndWaitForGC(c clientset.Interface, kind schema.GroupKind, ns
return nil
}

// podStoreForSelector creates a PodStore that monitors pods from given namespace matching given selector.
// It waits until the reflector does a List() before returning.
func podStoreForSelector(c clientset.Interface, ns string, selector labels.Selector) (*testutil.PodStore, error) {
ps := testutil.NewPodStore(c, ns, selector, fields.Everything())
err := wait.Poll(100*time.Millisecond, 2*time.Minute, func() (bool, error) {
if len(ps.Reflector.LastSyncResourceVersion()) != 0 {
return true, nil
}
return false, nil
})
return ps, err
}

// waitForPodsInactive waits until there are no active pods left in the PodStore.
// This is to make a fair comparison of deletion time between DeleteRCAndPods
// and DeleteRCAndWaitForGC, because the RC controller decreases status.replicas
Expand Down
6 changes: 5 additions & 1 deletion test/e2e/lifecycle/reboot.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,11 @@ func printStatusAndLogsForNotReadyPods(c clientset.Interface, ns string, podName
func rebootNode(c clientset.Interface, provider, name, rebootCmd string) bool {
// Setup
ns := metav1.NamespaceSystem
ps := testutils.NewPodStore(c, ns, labels.Everything(), fields.OneTermEqualSelector(api.PodHostField, name))
ps, err := testutils.NewPodStore(c, ns, labels.Everything(), fields.OneTermEqualSelector(api.PodHostField, name))
if err != nil {
framework.Logf("Couldn't initialize pod store: %v", err)
return false
}
defer ps.Stop()

// Get the node initially.
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/lifecycle/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ var _ = SIGDescribe("Restart [Disruptive]", func() {
// This test requires the ability to restart all nodes, so the provider
// check must be identical to that call.
framework.SkipUnlessProviderIs("gce", "gke")
ps = testutils.NewPodStore(f.ClientSet, metav1.NamespaceSystem, labels.Everything(), fields.Everything())
var err error
ps, err = testutils.NewPodStore(f.ClientSet, metav1.NamespaceSystem, labels.Everything(), fields.Everything())
Expect(err).NotTo(HaveOccurred())
numNodes, err = framework.NumberOfRegisteredNodes(f.ClientSet)
Expect(err).NotTo(HaveOccurred())
systemNamespace = metav1.NamespaceSystem
Expand Down
4 changes: 3 additions & 1 deletion test/e2e/scalability/density.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,10 @@ func density30AddonResourceVerifier(numNodes int) map[string]framework.ResourceC

func logPodStartupStatus(c clientset.Interface, expectedPods int, observedLabels map[string]string, period time.Duration, stopCh chan struct{}) {
label := labels.SelectorFromSet(labels.Set(observedLabels))
podStore := testutils.NewPodStore(c, metav1.NamespaceAll, label, fields.Everything())
podStore, err := testutils.NewPodStore(c, metav1.NamespaceAll, label, fields.Everything())
framework.ExpectNoError(err)
defer podStore.Stop()

ticker := time.NewTicker(period)
defer ticker.Stop()
for {
Expand Down
15 changes: 13 additions & 2 deletions test/utils/pod_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ limitations under the License.
package utils

import (
"time"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
Expand All @@ -34,7 +37,7 @@ type PodStore struct {
Reflector *cache.Reflector
}

func NewPodStore(c clientset.Interface, namespace string, label labels.Selector, field fields.Selector) *PodStore {
func NewPodStore(c clientset.Interface, namespace string, label labels.Selector, field fields.Selector) (*PodStore, error) {
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = label.String()
Expand All @@ -52,7 +55,15 @@ func NewPodStore(c clientset.Interface, namespace string, label labels.Selector,
stopCh := make(chan struct{})
reflector := cache.NewReflector(lw, &v1.Pod{}, store, 0)
go reflector.Run(stopCh)
return &PodStore{Store: store, stopCh: stopCh, Reflector: reflector}
if err := wait.PollImmediate(50*time.Millisecond, 2*time.Minute, func() (bool, error) {
if len(reflector.LastSyncResourceVersion()) != 0 {
return true, nil
}
return false, nil
}); err != nil {
return nil, err
}
return &PodStore{Store: store, stopCh: stopCh, Reflector: reflector}, nil
}

func (s *PodStore) List() []*v1.Pod {
Expand Down
27 changes: 18 additions & 9 deletions test/utils/runners.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,11 @@ func (config *RCConfig) start() error {

label := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name}))

PodStore := NewPodStore(config.Client, config.Namespace, label, fields.Everything())
defer PodStore.Stop()
ps, err := NewPodStore(config.Client, config.Namespace, label, fields.Everything())
if err != nil {
return err
}
defer ps.Stop()

interval := config.PollInterval
if interval <= 0 {
Expand All @@ -714,7 +717,7 @@ func (config *RCConfig) start() error {
for oldRunning != config.Replicas {
time.Sleep(interval)

pods := PodStore.List()
pods := ps.List()
startupStatus := ComputeRCStartupStatus(pods, config.Replicas)

pods = startupStatus.Created
Expand Down Expand Up @@ -814,11 +817,14 @@ func StartPods(c clientset.Interface, replicas int, namespace string, podNamePre
// matching pod exists.
func WaitForPodsWithLabelRunning(c clientset.Interface, ns string, label labels.Selector) error {
running := false
PodStore := NewPodStore(c, ns, label, fields.Everything())
defer PodStore.Stop()
ps, err := NewPodStore(c, ns, label, fields.Everything())
if err != nil {
return err
}
defer ps.Stop()
waitLoop:
for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(5 * time.Second) {
pods := PodStore.List()
pods := ps.List()
if len(pods) == 0 {
continue waitLoop
}
Expand Down Expand Up @@ -1258,11 +1264,14 @@ func (config *DaemonConfig) Run() error {
timeout = 5 * time.Minute
}

podStore := NewPodStore(config.Client, config.Namespace, labels.SelectorFromSet(nameLabel), fields.Everything())
defer podStore.Stop()
ps, err := NewPodStore(config.Client, config.Namespace, labels.SelectorFromSet(nameLabel), fields.Everything())
if err != nil {
return err
}
defer ps.Stop()

err = wait.Poll(time.Second, timeout, func() (bool, error) {
pods := podStore.List()
pods := ps.List()

nodeHasDaemon := sets.NewString()
for _, pod := range pods {
Expand Down