Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix to handle hash collisions correctly for DaemonSets #66476

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/controller/daemon/update.go
Expand Up @@ -330,16 +330,16 @@ func (dsc *DaemonSetsController) snapshot(ds *apps.DaemonSet, revision int64) (*
}

history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Create(history)
if errors.IsAlreadyExists(err) {
if outerErr := err; errors.IsAlreadyExists(outerErr) {
// TODO: Is it okay to get from historyLister?
existedHistory, getErr := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Get(name, metav1.GetOptions{})
if getErr != nil {
return nil, getErr
}
// Check if we already created it
done, err := Match(ds, existedHistory)
if err != nil {
return nil, err
done, matchErr := Match(ds, existedHistory)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this is bound to happen again. Can you do something like

if outerErr := err; errors.IsAlreadExists(outerErr) {
  ...
  return outerErr
}

if matchErr != nil {
return nil, matchErr
}
if done {
return existedHistory, nil
Expand All @@ -360,7 +360,7 @@ func (dsc *DaemonSetsController) snapshot(ds *apps.DaemonSet, revision int64) (*
return nil, updateErr
}
glog.V(2).Infof("Found a hash collision for DaemonSet %q - bumping collisionCount to %d to resolve it", ds.Name, *currDS.Status.CollisionCount)
return nil, err
return nil, outerErr
}
return history, err
}
Expand Down
2 changes: 2 additions & 0 deletions test/integration/daemonset/BUILD
Expand Up @@ -16,12 +16,14 @@ go_test(
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/daemon:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/util/labels:go_default_library",
"//pkg/util/metrics:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
Expand Down
157 changes: 157 additions & 0 deletions test/integration/daemonset/daemonset_test.go
Expand Up @@ -41,13 +41,15 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/legacyscheme"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
"k8s.io/kubernetes/pkg/scheduler/factory"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/test/integration/framework"
)
Expand Down Expand Up @@ -372,6 +374,52 @@ func waitForPodsCreated(podInformer cache.SharedIndexInformer, num int) error {
})
}

func waitForDaemonSetAndControllerRevisionCreated(c clientset.Interface, name string, namespace string) error {
return wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
ds, err := c.AppsV1().DaemonSets(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return false, err
}
if ds == nil {
return false, nil
}

revs, err := c.AppsV1().ControllerRevisions(namespace).List(metav1.ListOptions{})
if err != nil {
return false, err
}
if revs.Size() == 0 {
return false, nil
}

for _, rev := range revs.Items {
for _, oref := range rev.OwnerReferences {
if oref.Kind == "DaemonSet" && oref.UID == ds.UID {
return true, nil
}
}
}
return false, nil
})
}

func hashAndNameForDaemonSet(ds *apps.DaemonSet) (string, string) {
hash := fmt.Sprint(controller.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount))
name := ds.Name + "-" + hash
return hash, name
}

func validateDaemonSetCollisionCount(dsClient appstyped.DaemonSetInterface, dsName string, expCount int32, t *testing.T) {
ds, err := dsClient.Get(dsName, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to look up DaemonSet: %v", err)
}
collisionCount := ds.Status.CollisionCount
if *collisionCount != expCount {
t.Fatalf("Expected collisionCount to be %d, but found %d", expCount, *collisionCount)
}
}

func validateDaemonSetStatus(
dsClient appstyped.DaemonSetInterface,
dsName string,
Expand Down Expand Up @@ -740,3 +788,112 @@ func TestInsufficientCapacityNodeWhenScheduleDaemonSetPodsEnabled(t *testing.T)
validateDaemonSetStatus(dsClient, ds.Name, 1, t)
})
}

// TestLaunchWithHashCollision tests that a DaemonSet can be updated even if there is a
// hash collision with an existing ControllerRevision
func TestLaunchWithHashCollision(t *testing.T) {
server, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateTestingNamespace("one-node-daemonset-test", server, t)
defer framework.DeleteTestingNamespace(ns, server, t)

dsClient := clientset.AppsV1().DaemonSets(ns.Name)
podInformer := informers.Core().V1().Pods().Informer()
nodeClient := clientset.CoreV1().Nodes()

stopCh := make(chan struct{})
defer close(stopCh)

informers.Start(stopCh)
go dc.Run(1, stopCh)

setupScheduler(t, clientset, informers, stopCh)

// Create single node
_, err := nodeClient.Create(newNode("single-node", nil))
if err != nil {
t.Fatalf("Failed to create node: %v", err)
}

// Create new DaemonSet with RollingUpdate strategy
orgDs := newDaemonSet("foo", ns.Name)
oneIntString := intstr.FromInt(1)
orgDs.Spec.UpdateStrategy = apps.DaemonSetUpdateStrategy{
Type: apps.RollingUpdateDaemonSetStrategyType,
RollingUpdate: &apps.RollingUpdateDaemonSet{
MaxUnavailable: &oneIntString,
},
}
ds, err := dsClient.Create(orgDs)
if err != nil {
t.Fatalf("Failed to create DaemonSet: %v", err)
}

// Wait for the DaemonSet to be created before proceeding
err = waitForDaemonSetAndControllerRevisionCreated(clientset, ds.Name, ds.Namespace)
if err != nil {
t.Fatalf("Failed to create DeamonSet: %v", err)
}

ds, err = dsClient.Get(ds.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get DaemonSet: %v", err)
}
var orgCollisionCount int32
if ds.Status.CollisionCount != nil {
orgCollisionCount = *ds.Status.CollisionCount
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this down to L846 after DS's CR is created and a fresh DS is got back from the server. This is because collision count might change after CR creation.


// Look up the ControllerRevision for the DaemonSet
_, name := hashAndNameForDaemonSet(ds)
revision, err := clientset.AppsV1().ControllerRevisions(ds.Namespace).Get(name, metav1.GetOptions{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible that the CR haven't been created yet after the DS is created, so wait for CR to be created to avoid possible test flake.

if err != nil || revision == nil {
t.Fatalf("Failed to look up ControllerRevision: %v", err)
}

// Create a "fake" ControllerRevision that we know will create a hash collision when we make
// the next update
one := int64(1)
ds.Spec.Template.Spec.TerminationGracePeriodSeconds = &one

newHash, newName := hashAndNameForDaemonSet(ds)
newRevision := &apps.ControllerRevision{
ObjectMeta: metav1.ObjectMeta{
Name: newName,
Namespace: ds.Namespace,
Labels: labelsutil.CloneAndAddLabel(ds.Spec.Template.Labels, apps.DefaultDaemonSetUniqueLabelKey, newHash),
Annotations: ds.Annotations,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(ds, apps.SchemeGroupVersion.WithKind("DaemonSet"))},
},
Data: revision.Data,
Revision: revision.Revision + 1,
}
_, err = clientset.AppsV1().ControllerRevisions(ds.Namespace).Create(newRevision)
if err != nil {
t.Fatalf("Failed to create ControllerRevision: %v", err)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: save ds's collision count here for comparison later. The count should be count+1 at the end of this test.

// Make an update of the DaemonSet which we know will create a hash collision when
// the next ControllerRevision is created.
_, err = dsClient.Update(ds)
if err != nil {
t.Fatalf("Failed to update DaemonSet: %v", err)
}

// Wait for any pod with the latest Spec to exist
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
objects := podInformer.GetIndexer().List()
for _, object := range objects {
pod := object.(*v1.Pod)
if *pod.Spec.TerminationGracePeriodSeconds == *ds.Spec.Template.Spec.TerminationGracePeriodSeconds {
return true, nil
}
}
return false, nil
})
if err != nil {
t.Fatalf("Failed to wait for Pods with the latest Spec to be created: %v", err)
}

validateDaemonSetCollisionCount(dsClient, ds.Name, orgCollisionCount+1, t)
}