Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch statefulset controller to shared informers #41482

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 2 additions & 3 deletions cmd/kube-controller-manager/app/apps.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ func startStatefulSetController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1beta1", Resource: "statefulsets"}] {
return false, nil
}
resyncPeriod := ResyncPeriod(&ctx.Options)()
go statefulset.NewStatefulSetController(
ctx.InformerFactory.Pods().Informer(),
ctx.NewInformerFactory.Core().V1().Pods(),
ctx.NewInformerFactory.Apps().V1beta1().StatefulSets(),
ctx.ClientBuilder.ClientOrDie("statefulset-controller"),
resyncPeriod,
).Run(1, ctx.Stop)
return true, nil
}
16 changes: 12 additions & 4 deletions pkg/controller/statefulset/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@ go_library(
"//pkg/api/v1/pod:go_default_library",
"//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/apps/v1beta1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
"//pkg/client/listers/apps/v1beta1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/client/retry:go_default_library",
"//pkg/controller:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/errors",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
"//vendor:k8s.io/client-go/pkg/api",
"//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/tools/record",
Expand All @@ -56,17 +59,22 @@ go_test(
"//pkg/api/v1/pod:go_default_library",
"//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/apps/v1beta1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/client/listers/apps/v1beta1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/controller:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/api/resource",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/client-go/testing",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/tools/record",
"//vendor:k8s.io/client-go/util/workqueue",
],
)

Expand Down
119 changes: 62 additions & 57 deletions pkg/controller/statefulset/stateful_pod_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
errorutils "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/client/retry"
)

// StatefulPodControlInterface defines the interface that StatefulSetController uses to create, update, and delete Pods,
Expand All @@ -52,15 +56,17 @@ type StatefulPodControlInterface interface {
UpdateStatefulSetReplicas(set *apps.StatefulSet, replicas int32) error
}

func NewRealStatefulPodControl(client clientset.Interface, recorder record.EventRecorder) StatefulPodControlInterface {
return &realStatefulPodControl{client, recorder}
func NewRealStatefulPodControl(client clientset.Interface, setLister appslisters.StatefulSetLister, podLister corelisters.PodLister, recorder record.EventRecorder) StatefulPodControlInterface {
return &realStatefulPodControl{client, setLister, podLister, recorder}
}

// realStatefulPodControl implements StatefulPodControlInterface using a clientset.Interface to communicate with the
// API server. The struct is package private as the internal details are irrelevant to importing packages.
type realStatefulPodControl struct {
client clientset.Interface
recorder record.EventRecorder
client clientset.Interface
setLister appslisters.StatefulSetLister
podLister corelisters.PodLister
recorder record.EventRecorder
}

func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
Expand All @@ -80,54 +86,56 @@ func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod
}

func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
// we make a copy of the Pod on the stack and mutate the copy
// we copy back to pod to notify the caller of successful mutation
obj, err := api.Scheme.Copy(pod)
if err != nil {
return fmt.Errorf("unable to copy pod: %v", err)
}
podCopy := obj.(*v1.Pod)
for attempt := 0; attempt < maxUpdateRetries; attempt++ {
attemptedUpdate := false

err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// assume the Pod is consistent
consistent := true
// if the Pod does not conform to it's identity, update the identity and dirty the Pod
if !identityMatches(set, podCopy) {
updateIdentity(set, podCopy)
// if the Pod does not conform to its identity, update the identity and dirty the Pod
if !identityMatches(set, pod) {
updateIdentity(set, pod)
consistent = false
}
// if the Pod does not conform to the StatefulSet's storage requirements, update the Pod's PVC's,
// dirty the Pod, and create any missing PVCs
if !storageMatches(set, podCopy) {
updateStorage(set, podCopy)
if !storageMatches(set, pod) {
updateStorage(set, pod)
consistent = false
if err := spc.createPersistentVolumeClaims(set, podCopy); err != nil {
if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
spc.recordPodEvent("update", set, pod, err)
return err
}
}
// if the Pod is not dirty do nothing
if consistent {
*pod = *podCopy
return nil
}

attemptedUpdate = true
// commit the update, retrying on conflicts
_, err = spc.client.Core().Pods(set.Namespace).Update(podCopy)
if !apierrors.IsConflict(err) {
if err == nil {
*pod = *podCopy
}
spc.recordPodEvent("update", set, pod, err)
return err
_, err := spc.client.Core().Pods(set.Namespace).Update(pod)
if err == nil {
return nil
}
conflicting, err := spc.client.Core().Pods(set.Namespace).Get(podCopy.Name, metav1.GetOptions{})
if err != nil {
spc.recordPodEvent("update", set, podCopy, err)
return err
updateErr := err

if updated, err := spc.podLister.Pods(set.Namespace).Get(pod.Name); err == nil {
// make a copy so we don't mutate the shared cache
if copy, err := api.Scheme.DeepCopy(updated); err == nil {
pod = copy.(*v1.Pod)
} else {
utilruntime.HandleError(fmt.Errorf("error copying updated Pod: %v", err))
}
} else {
utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s from lister: %v", set.Namespace, pod.Name, err))
}
*podCopy = *conflicting

return updateErr
})
if attemptedUpdate {
spc.recordPodEvent("update", set, pod, err)
}
spc.recordPodEvent("update", set, pod, updateConflictError)
return updateConflictError
return err
}

func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
Expand All @@ -137,31 +145,28 @@ func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod
}

func (spc *realStatefulPodControl) UpdateStatefulSetReplicas(set *apps.StatefulSet, replicas int32) error {
if set.Status.Replicas == replicas {
return nil
}
obj, err := api.Scheme.Copy(set)
if err != nil {
return fmt.Errorf("unable to copy set: %v", err)
}
setCopy := obj.(*apps.StatefulSet)
setCopy.Status.Replicas = replicas
for attempt := 0; attempt < maxUpdateRetries; attempt++ {
_, err := spc.client.Apps().StatefulSets(setCopy.Namespace).UpdateStatus(setCopy)
if !apierrors.IsConflict(err) {
if err == nil {
*set = *setCopy
}
return err
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
set.Status.Replicas = replicas
_, err := spc.client.Apps().StatefulSets(set.Namespace).UpdateStatus(set)
if err == nil {
return nil
}
conflicting, err := spc.client.Apps().StatefulSets(setCopy.Namespace).Get(setCopy.Name, metav1.GetOptions{})
if err != nil {
return err

updateErr := err

if updated, err := spc.setLister.StatefulSets(set.Namespace).Get(set.Name); err == nil {
// make a copy so we don't mutate the shared cache
if copy, err := api.Scheme.DeepCopy(updated); err == nil {
set = copy.(*apps.StatefulSet)
} else {
utilruntime.HandleError(fmt.Errorf("error copying updated StatefulSet: %v", err))
}
} else {
utilruntime.HandleError(fmt.Errorf("error getting updated StatefulSet %s/%s from lister: %v", set.Namespace, set.Name, err))
}
conflicting.Status.Replicas = setCopy.Status.Replicas
*setCopy = *conflicting
}
return updateConflictError

return updateErr
})
}

// recordPodEvent records an event for verb applied to a Pod in a StatefulSet. If err is nil the generated event will
Expand Down