Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hooks for cluster health detection #28829

Merged
merged 1 commit into from
Jul 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
Expand All @@ -36,6 +35,20 @@ import (
"github.com/golang/glog"
)

// This function is expected to get a slice of NodeReadyConditions for all Nodes in a given zone.
func ComputeZoneState(nodeReadyConditions []*api.NodeCondition) zoneState {
seenReady := false
for i := range nodeReadyConditions {
if nodeReadyConditions[i] != nil && nodeReadyConditions[i].Status == api.ConditionTrue {
seenReady = true
}
}
if seenReady {
return stateNormal
}
return stateFullSegmentation
}

// cleanupOrphanedPods deletes pods that are bound to nodes that don't
// exist.
func cleanupOrphanedPods(pods []*api.Pod, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) {
Expand Down Expand Up @@ -124,16 +137,6 @@ func forcefullyDeleteNode(kubeClient clientset.Interface, nodeName string, force
return nil
}

// forceUpdateAllProbeTimes bumps all observed timestamps in saved nodeStatuses to now. This makes
// all eviction timer to reset.
func forceUpdateAllProbeTimes(now unversioned.Time, statusData map[string]nodeStatusData) {
for k, v := range statusData {
v.probeTimestamp = now
v.readyTransitionTimestamp = now
statusData[k] = v
}
}

// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating
// that should not be gracefully terminated.
func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) {
Expand Down
156 changes: 92 additions & 64 deletions pkg/controller/node/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/metrics"
utilnode "k8s.io/kubernetes/pkg/util/node"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/system"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
Expand All @@ -58,6 +58,14 @@ const (
nodeEvictionPeriod = 100 * time.Millisecond
)

type zoneState string

const (
stateNormal = zoneState("Normal")
stateFullSegmentation = zoneState("FullSegmentation")
statePartialSegmentation = zoneState("PartialSegmentation")
)

type nodeStatusData struct {
probeTimestamp unversioned.Time
readyTransitionTimestamp unversioned.Time
Expand All @@ -70,7 +78,7 @@ type NodeController struct {
clusterCIDR *net.IPNet
serviceCIDR *net.IPNet
deletingPodsRateLimiter flowcontrol.RateLimiter
knownNodeSet sets.String
knownNodeSet map[string]*api.Node
kubeClient clientset.Interface
// Method for easy mocking in unittest.
lookupIP func(host string) ([]net.IP, error)
Expand Down Expand Up @@ -124,11 +132,9 @@ type NodeController struct {

forcefullyDeletePod func(*api.Pod) error
nodeExistsInCloudProvider func(string) (bool, error)
computeZoneStateFunc func(nodeConditions []*api.NodeCondition) zoneState

// If in network segmentation mode NodeController won't evict Pods from unhealthy Nodes.
// It is enabled when all Nodes observed by the NodeController are NotReady and disabled
// when NC sees any healthy Node. This is a temporary fix for v1.3.
networkSegmentationMode bool
zoneStates map[string]zoneState
}

// NewNodeController returns a new node controller to sync instances from cloudprovider.
Expand Down Expand Up @@ -172,7 +178,7 @@ func NewNodeController(

nc := &NodeController{
cloud: cloud,
knownNodeSet: make(sets.String),
knownNodeSet: make(map[string]*api.Node),
kubeClient: kubeClient,
recorder: recorder,
podEvictionTimeout: podEvictionTimeout,
Expand All @@ -191,6 +197,8 @@ func NewNodeController(
allocateNodeCIDRs: allocateNodeCIDRs,
forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
computeZoneStateFunc: ComputeZoneState,
zoneStates: make(map[string]zoneState),
}

nc.podStore.Indexer, nc.podController = framework.NewIndexerInformer(
Expand Down Expand Up @@ -360,31 +368,22 @@ func (nc *NodeController) monitorNodeStatus() error {
if err != nil {
return err
}
for _, node := range nodes.Items {
if !nc.knownNodeSet.Has(node.Name) {
glog.V(1).Infof("NodeController observed a new Node: %#v", node)
recordNodeEvent(nc.recorder, node.Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", node.Name))
nc.cancelPodEviction(node.Name)
nc.knownNodeSet.Insert(node.Name)
}
added, deleted := nc.checkForNodeAddedDeleted(nodes)
for i := range added {
glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name)
recordNodeEvent(nc.recorder, added[i].Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name))
nc.cancelPodEviction(added[i])
nc.knownNodeSet[added[i].Name] = added[i]
}
// If there's a difference between lengths of known Nodes and observed nodes
// we must have removed some Node.
if len(nc.knownNodeSet) != len(nodes.Items) {
observedSet := make(sets.String)
for _, node := range nodes.Items {
observedSet.Insert(node.Name)
}
deleted := nc.knownNodeSet.Difference(observedSet)
for nodeName := range deleted {
glog.V(1).Infof("NodeController observed a Node deletion: %v", nodeName)
recordNodeEvent(nc.recorder, nodeName, api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", nodeName))
nc.evictPods(nodeName)
nc.knownNodeSet.Delete(nodeName)
}

for i := range deleted {
glog.V(1).Infof("NodeController observed a Node deletion: %v", deleted[i].Name)
recordNodeEvent(nc.recorder, deleted[i].Name, api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name))
nc.evictPods(deleted[i])
delete(nc.knownNodeSet, deleted[i].Name)
}

seenReady := false
zoneToNodeConditions := map[string][]*api.NodeCondition{}
for i := range nodes.Items {
var gracePeriod time.Duration
var observedReadyCondition api.NodeCondition
Expand All @@ -407,29 +406,28 @@ func (nc *NodeController) monitorNodeStatus() error {
"Skipping - no pods will be evicted.", node.Name)
continue
}
// We do not treat a master node as a part of the cluster for network segmentation checking.
if !system.IsMasterNode(node) {
zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
}

decisionTimestamp := nc.now()

if currentReadyCondition != nil {
// Check eviction timeout against decisionTimestamp
if observedReadyCondition.Status == api.ConditionFalse &&
decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node.Name) {
if nc.evictPods(node) {
glog.V(4).Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
}
}
if observedReadyCondition.Status == api.ConditionUnknown &&
decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) {
if nc.evictPods(node.Name) {
if nc.evictPods(node) {
glog.V(4).Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
}
}
if observedReadyCondition.Status == api.ConditionTrue {
// We do not treat a master node as a part of the cluster for network segmentation checking.
if !system.IsMasterNode(node) {
seenReady = true
}
if nc.cancelPodEviction(node.Name) {
if nc.cancelPodEviction(node) {
glog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name)
}
}
Expand Down Expand Up @@ -468,19 +466,35 @@ func (nc *NodeController) monitorNodeStatus() error {
}
}

// NC don't see any Ready Node. We assume that the network is segmented and Nodes cannot connect to API server and
// update their statuses. NC enteres network segmentation mode and cancels all evictions in progress.
if !seenReady {
nc.networkSegmentationMode = true
nc.stopAllPodEvictions()
glog.V(2).Info("NodeController is entering network segmentation mode.")
} else {
if nc.networkSegmentationMode {
forceUpdateAllProbeTimes(nc.now(), nc.nodeStatusMap)
nc.networkSegmentationMode = false
glog.V(2).Info("NodeController exited network segmentation mode.")
for k, v := range zoneToNodeConditions {
newState := nc.computeZoneStateFunc(v)
if newState == nc.zoneStates[k] {
continue
}
if newState == stateFullSegmentation {
glog.V(2).Infof("NodeController is entering network segmentation mode in zone %v.", k)
} else if newState == stateNormal {
glog.V(2).Infof("NodeController exited network segmentation mode in zone %v.", k)
}
for i := range nodes.Items {
if utilnode.GetZoneKey(&nodes.Items[i]) == k {
if newState == stateFullSegmentation {
// When zone is fully segmented we stop the eviction all together.
nc.cancelPodEviction(&nodes.Items[i])
}
if newState == stateNormal && nc.zoneStates[k] == stateFullSegmentation {
// When exiting segmentation mode update probe timestamps on all Nodes.
now := nc.now()
v := nc.nodeStatusMap[nodes.Items[i].Name]
v.probeTimestamp = now
v.readyTransitionTimestamp = now
nc.nodeStatusMap[nodes.Items[i].Name] = v
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you update zoneStates here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it had to got lost in last rewriting of this part.

}
nc.zoneStates[k] = newState
}

return nil
}

Expand Down Expand Up @@ -649,36 +663,50 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
return gracePeriod, observedReadyCondition, currentReadyCondition, err
}

func (nc *NodeController) checkForNodeAddedDeleted(nodes *api.NodeList) (added, deleted []*api.Node) {
for i := range nodes.Items {
if _, has := nc.knownNodeSet[nodes.Items[i].Name]; !has {
added = append(added, &nodes.Items[i])
}
}
// If there's a difference between lengths of known Nodes and observed nodes
// we must have removed some Node.
if len(nc.knownNodeSet)+len(added) != len(nodes.Items) {
knowSetCopy := map[string]*api.Node{}
for k, v := range nc.knownNodeSet {
knowSetCopy[k] = v
}
for i := range nodes.Items {
delete(knowSetCopy, nodes.Items[i].Name)
}
for i := range knowSetCopy {
deleted = append(deleted, knowSetCopy[i])
}
}
return
}

// cancelPodEviction removes any queued evictions, typically because the node is available again. It
// returns true if an eviction was queued.
func (nc *NodeController) cancelPodEviction(nodeName string) bool {
func (nc *NodeController) cancelPodEviction(node *api.Node) bool {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
wasDeleting := nc.podEvictor.Remove(nodeName)
wasTerminating := nc.terminationEvictor.Remove(nodeName)
wasDeleting := nc.podEvictor.Remove(node.Name)
wasTerminating := nc.terminationEvictor.Remove(node.Name)
if wasDeleting || wasTerminating {
glog.V(2).Infof("Cancelling pod Eviction on Node: %v", nodeName)
glog.V(2).Infof("Cancelling pod Eviction on Node: %v", node.Name)
return true
}
return false
}

// evictPods queues an eviction for the provided node name, and returns false if the node is already
// queued for eviction.
func (nc *NodeController) evictPods(nodeName string) bool {
if nc.networkSegmentationMode {
func (nc *NodeController) evictPods(node *api.Node) bool {
if nc.zoneStates[utilnode.GetZoneKey(node)] == stateFullSegmentation {
return false
}
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
return nc.podEvictor.Add(nodeName)
}

// stopAllPodEvictions removes any queued evictions for all Nodes.
func (nc *NodeController) stopAllPodEvictions() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
glog.V(3).Infof("Cancelling all pod evictions.")
nc.podEvictor.Clear()
nc.terminationEvictor.Clear()
return nc.podEvictor.Add(node.Name)
}
22 changes: 22 additions & 0 deletions pkg/util/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
)

func GetHostname(hostnameOverride string) string {
Expand Down Expand Up @@ -59,3 +60,24 @@ func GetNodeHostIP(node *api.Node) (net.IP, error) {
}
return nil, fmt.Errorf("host IP unknown; known addresses: %v", addresses)
}

// Helper function that builds a string identifier that is unique per failure-zone
// Returns empty-string for no zone
func GetZoneKey(node *api.Node) string {
labels := node.Labels
if labels == nil {
return ""
}

region, _ := labels[unversioned.LabelZoneRegion]
failureDomain, _ := labels[unversioned.LabelZoneFailureDomain]

if region == "" && failureDomain == "" {
return ""
}

// We include the null character just in case region or failureDomain has a colon
// (We do assume there's no null characters in a region or failureDomain)
// As a nice side-benefit, the null character is not printed by fmt.Print or glog
return region + ":\x00:" + failureDomain
}
26 changes: 3 additions & 23 deletions plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/labels"
utilnode "k8s.io/kubernetes/pkg/util/node"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
Expand Down Expand Up @@ -54,27 +55,6 @@ func NewSelectorSpreadPriority(podLister algorithm.PodLister, serviceLister algo
return selectorSpread.CalculateSpreadPriority
}

// Helper function that builds a string identifier that is unique per failure-zone
// Returns empty-string for no zone
func getZoneKey(node *api.Node) string {
labels := node.Labels
if labels == nil {
return ""
}

region, _ := labels[unversioned.LabelZoneRegion]
failureDomain, _ := labels[unversioned.LabelZoneFailureDomain]

if region == "" && failureDomain == "" {
return ""
}

// We include the null character just in case region or failureDomain has a colon
// (We do assume there's no null characters in a region or failureDomain)
// As a nice side-benefit, the null character is not printed by fmt.Print or glog
return region + ":\x00:" + failureDomain
}

// CalculateSpreadPriority spreads pods across hosts and zones, considering pods belonging to the same service or replication controller.
// When a pod is scheduled, it looks for services or RCs that match the pod, then finds existing pods that match those selectors.
// It favors nodes that have fewer existing matching pods.
Expand Down Expand Up @@ -189,7 +169,7 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma
continue
}

zoneId := getZoneKey(node)
zoneId := utilnode.GetZoneKey(node)
if zoneId == "" {
continue
}
Expand Down Expand Up @@ -220,7 +200,7 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma

// If there is zone information present, incorporate it
if haveZones {
zoneId := getZoneKey(node)
zoneId := utilnode.GetZoneKey(node)
if zoneId != "" {
zoneScore := maxPriority * (float32(maxCountByZone-countsByZone[zoneId]) / float32(maxCountByZone))
fScore = (fScore * (1.0 - zoneWeighting)) + (zoneWeighting * zoneScore)
Expand Down