Skip to content

Commit

Permalink
Merge pull request #69595 from jiayingz/1.10-preempt-fix
Browse files Browse the repository at this point in the history
Always reconcile extended resource capacity after kubelet restart.
  • Loading branch information
k8s-ci-robot committed Oct 14, 2018
2 parents 555b4d0 + 5a6581f commit e6ab4ee
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 12 deletions.
36 changes: 25 additions & 11 deletions pkg/kubelet/kubelet_node_status.go
Expand Up @@ -66,6 +66,8 @@ func (kl *Kubelet) registerWithAPIServer() {
} }
step := 100 * time.Millisecond step := 100 * time.Millisecond


var previousNode *v1.Node = nil
var registered bool
for { for {
time.Sleep(step) time.Sleep(step)
step = step * 2 step = step * 2
Expand All @@ -78,9 +80,18 @@ func (kl *Kubelet) registerWithAPIServer() {
glog.Errorf("Unable to construct v1.Node object for kubelet: %v", err) glog.Errorf("Unable to construct v1.Node object for kubelet: %v", err)
continue continue
} }
if previousNode != nil {
for k := range previousNode.Status.Capacity {
if v1helper.IsExtendedResourceName(k) {
glog.Infof("Zero out resource %s capacity in new node.", k)
node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
}
}
}


glog.Infof("Attempting to register node %s", node.Name) glog.Infof("Attempting to register node %s", node.Name)
registered := kl.tryRegisterWithAPIServer(node) registered, previousNode = kl.tryRegisterWithAPIServer(node)
if registered { if registered {
glog.Infof("Successfully registered node %s", node.Name) glog.Infof("Successfully registered node %s", node.Name)
kl.registrationCompleted = true kl.registrationCompleted = true
Expand All @@ -96,49 +107,51 @@ func (kl *Kubelet) registerWithAPIServer() {
// persistent volumes for the node. If a node of the same name exists but has // persistent volumes for the node. If a node of the same name exists but has
// a different externalID value, it attempts to delete that node so that a // a different externalID value, it attempts to delete that node so that a
// later attempt can recreate it. // later attempt can recreate it.
func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool { func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) (bool, *v1.Node) {
_, err := kl.kubeClient.CoreV1().Nodes().Create(node) _, err := kl.kubeClient.CoreV1().Nodes().Create(node)
if err == nil { if err == nil {
return true return true, nil
} }


if !apierrors.IsAlreadyExists(err) { if !apierrors.IsAlreadyExists(err) {
glog.Errorf("Unable to register node %q with API server: %v", kl.nodeName, err) glog.Errorf("Unable to register node %q with API server: %v", kl.nodeName, err)
return false return false, nil
} }


existingNode, err := kl.kubeClient.CoreV1().Nodes().Get(string(kl.nodeName), metav1.GetOptions{}) existingNode, err := kl.kubeClient.CoreV1().Nodes().Get(string(kl.nodeName), metav1.GetOptions{})
if err != nil { if err != nil {
glog.Errorf("Unable to register node %q with API server: error getting existing node: %v", kl.nodeName, err) glog.Errorf("Unable to register node %q with API server: error getting existing node: %v", kl.nodeName, err)
return false return false, nil
} }
if existingNode == nil { if existingNode == nil {
glog.Errorf("Unable to register node %q with API server: no node instance returned", kl.nodeName) glog.Errorf("Unable to register node %q with API server: no node instance returned", kl.nodeName)
return false return false, nil
} }


originalNode := existingNode.DeepCopy() originalNode := existingNode.DeepCopy()
if originalNode == nil { if originalNode == nil {
glog.Errorf("Nil %q node object", kl.nodeName) glog.Errorf("Nil %q node object", kl.nodeName)
return false return false, nil
} }


exported_extended_resource := kl.reconcileExtendedResource(node, existingNode)

if existingNode.Spec.ExternalID == node.Spec.ExternalID { if existingNode.Spec.ExternalID == node.Spec.ExternalID {
glog.Infof("Node %s was previously registered", kl.nodeName) glog.Infof("Node %s was previously registered", kl.nodeName)
// Edge case: the node was previously registered; reconcile // Edge case: the node was previously registered; reconcile
// the value of the controller-managed attach-detach // the value of the controller-managed attach-detach
// annotation. // annotation.
requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode) requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode)
requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate requiresUpdate = exported_extended_resource || requiresUpdate
if requiresUpdate { if requiresUpdate {
if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil { if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
glog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err) glog.Errorf("Unable to reconcile node %q with API server: error updating node: %v", kl.nodeName, err)
return false return false, nil
} }
} }


return true return true, nil
} }


glog.Errorf("Previously node %q had externalID %q; now it is %q; will delete and recreate.", glog.Errorf("Previously node %q had externalID %q; now it is %q; will delete and recreate.",
Expand All @@ -150,14 +163,15 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
glog.Infof("Deleted old node object %q", kl.nodeName) glog.Infof("Deleted old node object %q", kl.nodeName)
} }


return false return false, existingNode
} }


// Zeros out extended resource capacity during reconciliation. // Zeros out extended resource capacity during reconciliation.
func (kl *Kubelet) reconcileExtendedResource(initialNode, node *v1.Node) bool { func (kl *Kubelet) reconcileExtendedResource(initialNode, node *v1.Node) bool {
requiresUpdate := false requiresUpdate := false
for k := range node.Status.Capacity { for k := range node.Status.Capacity {
if v1helper.IsExtendedResourceName(k) { if v1helper.IsExtendedResourceName(k) {
glog.Infof("Zero out resource %s capacity in existing node.", k)
node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI) node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI) node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
requiresUpdate = true requiresUpdate = true
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/kubelet_node_status_test.go
Expand Up @@ -1202,7 +1202,7 @@ func TestTryRegisterWithApiServer(t *testing.T) {
return notImplemented(action) return notImplemented(action)
}) })


result := kubelet.tryRegisterWithAPIServer(tc.newNode) result, _ := kubelet.tryRegisterWithAPIServer(tc.newNode)
require.Equal(t, tc.expectedResult, result, "test [%s]", tc.name) require.Equal(t, tc.expectedResult, result, "test [%s]", tc.name)


actions := kubeClient.Actions() actions := kubeClient.Actions()
Expand Down

0 comments on commit e6ab4ee

Please sign in to comment.