Skip to content

Commit

Permalink
Add DisruptedPod map to PodDisruptionBudgetStatus
Browse files Browse the repository at this point in the history
  • Loading branch information
mwielgus committed Nov 6, 2016
1 parent b7512d9 commit 47a1458
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 36 deletions.
13 changes: 13 additions & 0 deletions pkg/apis/policy/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ type PodDisruptionBudgetStatus struct {

// total number of pods counted by this disruption budget
ExpectedPods int32 `json:"expectedPods"`

// DisruptedPods contains information about pods whose eviction was
// processed by the API server eviction subresource handler but has not
// yet been observed by the PodDisruptionBudget controller.
// A pod will be in this map from the time when the API server processed the
// eviction request to the time when the pod is seen by PDB controller
// as having been marked for deletion (or after a timeout). The key in the map is the name of the pod
// and the value is the time when the API server processed the eviction request. If
// the deletion didn't occur and a pod is still there it will be removed from
// the list automatically by PodDisruptionBudget controller after some time.
// If everything goes smooth this map should be empty for the most of the time.
// Large number of entries in the map may indicate problems with pod deletions.
DisruptedPods map[string]unversioned.Time `json:"disruptedPods" protobuf:"bytes,5,rep,name=disruptedPods"`
}

// +genclient=true
Expand Down
13 changes: 13 additions & 0 deletions pkg/apis/policy/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ type PodDisruptionBudgetStatus struct {

// total number of pods counted by this disruption budget
ExpectedPods int32 `json:"expectedPods" protobuf:"varint,4,opt,name=expectedPods"`

// DisruptedPods contains information about pods whose eviction was
// processed by the API server eviction subresource handler but has not
// yet been observed by the PodDisruptionBudget controller.
// A pod will be in this map from the time when the API server processed the
// eviction request to the time when the pod is seen by PDB controller
// as having been marked for deletion (or after a timeout). The key in the map is the name of the pod
// and the value is the time when the API server processed the eviction request. If
// the deletion didn't occur and a pod is still there it will be removed from
// the list automatically by PodDisruptionBudget controller after some time.
// If everything goes smooth this map should be empty for the most of the time.
// Large number of entries in the map may indicate problems with pod deletions.
DisruptedPods map[string]unversioned.Time `json:"disruptedPods" protobuf:"bytes,5,rep,name=disruptedPods"`
}

// +genclient=true
Expand Down
117 changes: 106 additions & 11 deletions pkg/controller/disruption/disruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package disruption

import (
"fmt"
"reflect"
"time"

"k8s.io/kubernetes/pkg/api"
Expand All @@ -43,6 +44,17 @@ import (

const statusUpdateRetries = 2

// DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status
// to the time when the pod is expected to be seen by PDB controller as having been marked for deletion.
// If the pod was not marked for deletion during that time it is assumed that it won't be deleted at
// all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that
// pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should
// be more than enough.
// If the cotroller is running on a different node it is important that the two nodes have synced
// clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough
// protection against unwanted pod disruptions.
const DeletionTimeout = 2 * 60 * time.Second

type updater func(*policy.PodDisruptionBudget) error

type DisruptionController struct {
Expand All @@ -68,7 +80,8 @@ type DisruptionController struct {
dLister cache.StoreToDeploymentLister

// PodDisruptionBudget keys that need to be synced.
queue workqueue.RateLimitingInterface
queue workqueue.RateLimitingInterface
recheckQueue workqueue.DelayingInterface

broadcaster record.EventBroadcaster
recorder record.EventRecorder
Expand All @@ -92,6 +105,7 @@ func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient i
kubeClient: kubeClient,
podController: podInformer.GetController(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"),
recheckQueue: workqueue.NewNamedDelayingQueue("disruption-recheck"),
broadcaster: record.NewBroadcaster(),
}
dc.recorder = dc.broadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
Expand Down Expand Up @@ -270,6 +284,8 @@ func (dc *DisruptionController) Run(stopCh <-chan struct{}) {
go dc.rsController.Run(stopCh)
go dc.dController.Run(stopCh)
go wait.Until(dc.worker, time.Second, stopCh)
go wait.Until(dc.recheckWorker, time.Second, stopCh)

<-stopCh
glog.V(0).Infof("Shutting down disruption controller")
}
Expand Down Expand Up @@ -355,6 +371,15 @@ func (dc *DisruptionController) enqueuePdb(pdb *policy.PodDisruptionBudget) {
dc.queue.Add(key)
}

func (dc *DisruptionController) enqueuePdbForRecheck(pdb *policy.PodDisruptionBudget, delay time.Duration) {
key, err := controller.KeyFunc(pdb)
if err != nil {
glog.Errorf("Cound't get key for PodDisruptionBudget object %+v: %v", pdb, err)
return
}
dc.recheckQueue.AddAfter(key, delay)
}

func (dc *DisruptionController) getPdbForPod(pod *api.Pod) *policy.PodDisruptionBudget {
// GetPodPodDisruptionBudgets returns an error only if no
// PodDisruptionBudgets are found. We don't return that as an error to the
Expand Down Expand Up @@ -417,6 +442,21 @@ func (dc *DisruptionController) processNextWorkItem() bool {
return true
}

func (dc *DisruptionController) recheckWorker() {
for dc.processNextRecheckWorkItem() {
}
}

func (dc *DisruptionController) processNextRecheckWorkItem() bool {
dKey, quit := dc.recheckQueue.Get()
if quit {
return false
}
defer dc.recheckQueue.Done(dKey)
dc.queue.AddRateLimited(dKey)
return true
}

func (dc *DisruptionController) sync(key string) error {
startTime := time.Now()
defer func() {
Expand Down Expand Up @@ -452,9 +492,17 @@ func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error {
return err
}

currentHealthy := countHealthyPods(pods)
err = dc.updatePdbSpec(pdb, currentHealthy, desiredHealthy, expectedCount)
currentTime := time.Now()
disruptedPods, recheckTime := buildDisruptedPodMap(pods, pdb, currentTime)
currentHealthy := countHealthyPods(pods, disruptedPods, currentTime)
err = dc.updatePdbStatus(pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods)

if err == nil && recheckTime != nil {
// There is always at most one PDB waiting with a particular name in the queue,
// and each PDB in the queue is associated with the lowest timestamp
// that was supplied when a PDB with that name was added.
dc.enqueuePdbForRecheck(pdb, recheckTime.Sub(currentTime))
}
return err
}

Expand Down Expand Up @@ -527,20 +575,60 @@ func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBud
return
}

func countHealthyPods(pods []*api.Pod) (currentHealthy int32) {
func countHealthyPods(pods []*api.Pod, disruptedPods map[string]unversioned.Time, currentTime time.Time) (currentHealthy int32) {
Pod:
for _, pod := range pods {
for _, c := range pod.Status.Conditions {
if c.Type == api.PodReady && c.Status == api.ConditionTrue {
currentHealthy++
continue Pod
}
// Pod is beeing deleted.
if pod.DeletionTimestamp != nil {
continue
}
// Pod is expected to be deleted soon.
if disruptionTime, found := disruptedPods[pod.Name]; found && disruptionTime.Time.Add(DeletionTimeout).After(currentTime) {
continue
}
if api.IsPodReady(pod) {
currentHealthy++
continue Pod
}
}

return
}

// Builds new PodDisruption map, possibly removing items that refer to non-existing, already deleted
// or not-deleted at all items. Also returns an information when this check should be repeated.
func buildDisruptedPodMap(pods []*api.Pod, pdb *policy.PodDisruptionBudget, currentTime time.Time) (map[string]unversioned.Time, *time.Time) {
disruptedPods := pdb.Status.DisruptedPods
result := make(map[string]unversioned.Time)
var recheckTime *time.Time

if disruptedPods == nil || len(disruptedPods) == 0 {
return result, recheckTime
}
for _, pod := range pods {
if pod.DeletionTimestamp != nil {
// Already being deleted.
continue
}
disruptionTime, found := disruptedPods[pod.Name]
if !found {
// Pod not on the list.
continue
}
expectedDeletion := disruptionTime.Time.Add(DeletionTimeout)
if expectedDeletion.Before(currentTime) {
glog.V(1).Infof("Pod %s/%s was expected to be deleted at %s but it wasn't, updating pdb %s/%s",
pod.Namespace, pod.Name, disruptionTime.String(), pdb.Namespace, pdb.Name)
} else {
if recheckTime == nil || expectedDeletion.Before(*recheckTime) {
recheckTime = &expectedDeletion
}
result[pod.Name] = disruptionTime
}
}
return result, recheckTime
}

// failSafe is an attempt to at least update the PodDisruptionsAllowed field to
// 0 if everything else has failed. This is one place we
// implement the "fail open" part of the design since if we manage to update
Expand All @@ -557,7 +645,9 @@ func (dc *DisruptionController) failSafe(pdb *policy.PodDisruptionBudget) error
return dc.getUpdater()(&newPdb)
}

func (dc *DisruptionController) updatePdbSpec(pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32) error {
func (dc *DisruptionController) updatePdbStatus(pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32,
disruptedPods map[string]unversioned.Time) error {

// We require expectedCount to be > 0 so that PDBs which currently match no
// pods are in a safe state when their first pods appear but this controller
// has not updated their status yet. This isn't the only race, but it's a
Expand All @@ -567,7 +657,11 @@ func (dc *DisruptionController) updatePdbSpec(pdb *policy.PodDisruptionBudget, c
disruptionsAllowed = 0
}

if pdb.Status.CurrentHealthy == currentHealthy && pdb.Status.DesiredHealthy == desiredHealthy && pdb.Status.ExpectedPods == expectedCount && pdb.Status.PodDisruptionsAllowed == disruptionsAllowed {
if pdb.Status.CurrentHealthy == currentHealthy &&
pdb.Status.DesiredHealthy == desiredHealthy &&
pdb.Status.ExpectedPods == expectedCount &&
pdb.Status.PodDisruptionsAllowed == disruptionsAllowed &&
reflect.DeepEqual(pdb.Status.DisruptedPods, disruptedPods) {
return nil
}

Expand All @@ -582,6 +676,7 @@ func (dc *DisruptionController) updatePdbSpec(pdb *policy.PodDisruptionBudget, c
DesiredHealthy: desiredHealthy,
ExpectedPods: expectedCount,
PodDisruptionsAllowed: disruptionsAllowed,
DisruptedPods: disruptedPods,
}

return dc.getUpdater()(&newPdb)
Expand Down

0 comments on commit 47a1458

Please sign in to comment.