Skip to content

Commit

Permalink
kubelet: Prioritize certain pod status updates
Browse files Browse the repository at this point in the history
Some pod status transitions directly impact end-to-end user latency
in the Kubelet, such as pods going ready, going unready, or becoming
Succeeded or Failed.

Prioritize the order that pods are updated in to minimize that
latency.
  • Loading branch information
smarterclayton committed Feb 3, 2022
1 parent 2b1d41f commit 6bb6cb3
Showing 1 changed file with 56 additions and 2 deletions.
58 changes: 56 additions & 2 deletions pkg/kubelet/status/status_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ import (
type versionedPodStatus struct {
// version is a monotonically increasing version number (per pod).
version uint64
// priority describes the relative importance of this update. Higher is more important.
priority int
// Pod name & namespace, for sending updates to API server.
podName string
podNamespace string
Expand Down Expand Up @@ -479,11 +481,17 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
return false // No new status.
}

priority := calculatePriority(&cachedStatus.status, &status)
newStatus := versionedPodStatus{
status: status,
version: cachedStatus.version + 1,
podName: pod.Name,
podNamespace: pod.Namespace,
priority: priority,
}

if cachedStatus.priority > newStatus.priority {
newStatus.priority = cachedStatus.priority
}

if cachedStatus.at.IsZero() {
Expand All @@ -500,6 +508,7 @@ func (m *manager) updateStatusInternal(pod *v1.Pod, status v1.PodStatus, forceUp
"podUID", pod.UID,
"statusVersion", newStatus.version,
"status", newStatus.status,
"priority", newStatus.priority,
)

select {
Expand All @@ -524,6 +533,50 @@ func updateLastTransitionTime(status, oldStatus *v1.PodStatus, conditionType v1.
condition.LastTransitionTime = lastTransitionTime
}

// highestPrioritySyncRequests sorts a slice of sync requests with highest priority first.
// If priority is identical, oldest status date is used (zero date is sorted after set dates).
type highestPrioritySyncRequests []podStatusSyncRequest

func (r highestPrioritySyncRequests) Len() int { return len(r) }
func (r highestPrioritySyncRequests) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
func (r highestPrioritySyncRequests) Less(i, j int) bool {
a, b := r[i], r[j]
if a.status.priority > b.status.priority {
return true
}
if a.status.priority < b.status.priority {
return false
}
aSet, bSet := !a.status.at.IsZero(), !b.status.at.IsZero()
if aSet && !bSet {
return true
}
if !aSet && bSet {
return false
}
return a.status.at.Sub(b.status.at) <= 0
}

// calculatePriority returns the relative priority of this pod status change. Higher priority changes
// should be processed more quickly.
func calculatePriority(oldStatus, newStatus *v1.PodStatus) int {
oldTerminalPhase := oldStatus.Phase == v1.PodSucceeded || oldStatus.Phase == v1.PodFailed
newTerminalPhase := newStatus.Phase == v1.PodSucceeded || newStatus.Phase == v1.PodFailed
if newTerminalPhase && !oldTerminalPhase {
return 100
}

_, oldReady := podutil.GetPodCondition(oldStatus, v1.PodReady)
_, newReady := podutil.GetPodCondition(newStatus, v1.PodReady)
isOldReady := oldReady != nil && oldReady.Status == v1.ConditionTrue
isNewReady := newReady != nil && newReady.Status == v1.ConditionTrue
if isOldReady != isNewReady {
return 100
}

return 0
}

// deletePodStatus simply removes the given pod from the status cache.
func (m *manager) deletePodStatus(uid types.UID) {
m.podStatusesLock.Lock()
Expand Down Expand Up @@ -595,6 +648,7 @@ func (m *manager) syncBatch(clean bool) {
}()

// process all pods in priority order
sort.Sort(highestPrioritySyncRequests(updatedStatuses))
var total, update, reconcile int
for _, updatedStatus := range updatedStatuses {
total++
Expand All @@ -619,7 +673,7 @@ func (m *manager) syncBatch(clean bool) {
continue
}

klog.V(3).InfoS("syncBatch will sync pod", "clean", clean, "podUID", updatedStatus.podUID, "mirrorUID", pod.UID, "reason", reason, "terminalStatus", terminalStatus)
klog.V(3).InfoS("syncBatch will sync pod", "clean", clean, "podUID", updatedStatus.podUID, "mirrorUID", pod.UID, "priority", updatedStatus.status.priority, "reason", reason, "terminalStatus", terminalStatus)
m.syncPod(updatedStatus.podUID, pod, updatedStatus.status, terminalStatus)
}
}
Expand Down Expand Up @@ -677,7 +731,7 @@ func (m *manager) syncPod(uid types.UID, pod *v1.Pod, status versionedPodStatus,
} else {
duration = time.Now().Sub(status.at).Truncate(time.Millisecond)
}
metrics.PodStatusSyncDuration.WithLabelValues(strconv.Itoa(0)).Observe(duration.Seconds())
metrics.PodStatusSyncDuration.WithLabelValues(strconv.Itoa(status.priority)).Observe(duration.Seconds())

m.rememberAPIStatus(kubetypes.MirrorPodUID(pod.UID), status.version)

Expand Down

0 comments on commit 6bb6cb3

Please sign in to comment.