Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove pods from failed node #4241

Merged
merged 2 commits into from
Feb 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ func startComponents(manifestURL string) (apiServerURL string) {
controllerManager.Run(10 * time.Minute)

nodeResources := &api.NodeResources{}
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{})
nodeController.Run(5*time.Second, 10, true)

nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute)
nodeController.Run(5*time.Second, true)

// Kubelet (localhost)
testRootDir := makeTempDirOrDie("kubelet_integ_1.")
Expand Down
5 changes: 3 additions & 2 deletions cmd/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,9 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
},
}
kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort}
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient)
nodeController.Run(10*time.Second, 10, true)

nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute)
nodeController.Run(10*time.Second, true)

endpoints := service.NewEndpointController(cl)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
Expand Down
1 change: 1 addition & 0 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ const (
type NodeCondition struct {
Kind NodeConditionKind `json:"kind"`
Status NodeConditionStatus `json:"status"`
LastProbeTime util.Time `json:"lastProbeTime,omitempty"`
LastTransitionTime util.Time `json:"lastTransitionTime,omitempty"`
Reason string `json:"reason,omitempty"`
Message string `json:"message,omitempty"`
Expand Down
1 change: 1 addition & 0 deletions pkg/api/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ const (
type NodeCondition struct {
Kind NodeConditionKind `json:"kind" description:"kind of the condition, one of reachable, ready"`
Status NodeConditionStatus `json:"status" description:"status of the condition, one of full, none, unknown"`
LastProbeTime util.Time `json:"lastProbeTime,omitempty" description:"last time the condition was probed"`
LastTransitionTime util.Time `json:"lastTransitionTime,omitempty" description:"last time the condition transit from one status to another"`
Reason string `json:"reason,omitempty" description:"(brief) reason for the condition's last transition"`
Message string `json:"message,omitempty" description:"human readable message indicating details about last transition"`
Expand Down
1 change: 1 addition & 0 deletions pkg/api/v1beta2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ const (
type NodeCondition struct {
Kind NodeConditionKind `json:"kind" description:"kind of the condition, one of reachable, ready"`
Status NodeConditionStatus `json:"status" description:"status of the condition, one of full, none, unknown"`
LastProbeTime util.Time `json:"lastProbeTime,omitempty" description:"last time the condition was probed"`
LastTransitionTime util.Time `json:"lastTransitionTime,omitempty" description:"last time the condition transit from one status to another"`
Reason string `json:"reason,omitempty" description:"(brief) reason for the condition's last transition"`
Message string `json:"message,omitempty" description:"human readable message indicating details about last transition"`
Expand Down
1 change: 1 addition & 0 deletions pkg/api/v1beta3/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ const (
type NodeCondition struct {
Kind NodeConditionKind `json:"kind"`
Status NodeConditionStatus `json:"status"`
LastProbeTime util.Time `json:"lastProbeTime,omitempty"`
LastTransitionTime util.Time `json:"lastTransitionTime,omitempty"`
Reason string `json:"reason,omitempty"`
Message string `json:"message,omitempty"`
Expand Down
108 changes: 74 additions & 34 deletions pkg/cloudprovider/controller/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"net"
"reflect"
"strings"
"sync"
"time"
Expand All @@ -29,6 +28,7 @@ import (
apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
Expand All @@ -41,12 +41,14 @@ var (
)

type NodeController struct {
cloud cloudprovider.Interface
matchRE string
staticResources *api.NodeResources
nodes []string
kubeClient client.Interface
kubeletClient client.KubeletHealthChecker
cloud cloudprovider.Interface
matchRE string
staticResources *api.NodeResources
nodes []string
kubeClient client.Interface
kubeletClient client.KubeletHealthChecker
registerRetryCount int
podEvictionTimeout time.Duration
}

// NewNodeController returns a new node controller to sync instances from cloudprovider.
Expand All @@ -58,20 +60,24 @@ func NewNodeController(
nodes []string,
staticResources *api.NodeResources,
kubeClient client.Interface,
kubeletClient client.KubeletHealthChecker) *NodeController {
kubeletClient client.KubeletHealthChecker,
registerRetryCount int,
podEvictionTimeout time.Duration) *NodeController {
return &NodeController{
cloud: cloud,
matchRE: matchRE,
nodes: nodes,
staticResources: staticResources,
kubeClient: kubeClient,
kubeletClient: kubeletClient,
cloud: cloud,
matchRE: matchRE,
nodes: nodes,
staticResources: staticResources,
kubeClient: kubeClient,
kubeletClient: kubeletClient,
registerRetryCount: registerRetryCount,
podEvictionTimeout: podEvictionTimeout,
}
}

// Run creates initial node list and start syncing instances from cloudprovider if any.
// It also starts syncing cluster node status.
func (s *NodeController) Run(period time.Duration, retryCount int, syncNodeList bool) {
func (s *NodeController) Run(period time.Duration, syncNodeList bool) {
// Register intial set of nodes with their status set.
var nodes *api.NodeList
var err error
Expand All @@ -95,7 +101,7 @@ func (s *NodeController) Run(period time.Duration, retryCount int, syncNodeList
if err != nil {
glog.Errorf("Error getting nodes ips: %v", err)
}
if err = s.RegisterNodes(nodes, retryCount, period); err != nil {
if err = s.RegisterNodes(nodes, s.registerRetryCount, period); err != nil {
glog.Errorf("Error registrying node list %+v: %v", nodes, err)
}

Expand Down Expand Up @@ -180,6 +186,7 @@ func (s *NodeController) SyncCloud() error {
if err != nil {
glog.Errorf("Delete node error: %s", nodeID)
}
s.deletePods(nodeID)
}

return nil
Expand All @@ -191,20 +198,15 @@ func (s *NodeController) SyncNodeStatus() error {
if err != nil {
return err
}
oldNodes := make(map[string]api.Node)
for _, node := range nodes.Items {
oldNodes[node.Name] = node
}
nodes = s.DoChecks(nodes)
nodes, err = s.PopulateIPs(nodes)
if err != nil {
return err
}
for _, node := range nodes.Items {
if reflect.DeepEqual(node, oldNodes[node.Name]) {
glog.V(2).Infof("skip updating node %v", node.Name)
continue
}
// We used to skip updating node when node status doesn't change, this is no longer
// useful after we introduce per-probe status field, e.g. 'LastProbeTime', which will
// differ in every call of the sync loop.
glog.V(2).Infof("updating node %v", node.Name)
_, err = s.kubeClient.Nodes().Update(&node)
if err != nil {
Expand Down Expand Up @@ -273,40 +275,78 @@ func (s *NodeController) DoCheck(node *api.Node) []api.NodeCondition {
oldReadyCondition := s.getCondition(node, api.NodeReady)
newReadyCondition := s.checkNodeReady(node)
if oldReadyCondition != nil && oldReadyCondition.Status == newReadyCondition.Status {
// If node status doesn't change, transition time is same as last time.
newReadyCondition.LastTransitionTime = oldReadyCondition.LastTransitionTime
} else {
// Set transition time to Now() if node status changes or `oldReadyCondition` is nil, which
// happens only when the node is checked for the first time.
newReadyCondition.LastTransitionTime = util.Now()
}

if newReadyCondition.Status != api.ConditionFull {
// Node is not ready for this probe, we need to check if pods need to be deleted.
if newReadyCondition.LastProbeTime.After(newReadyCondition.LastTransitionTime.Add(s.podEvictionTimeout)) {
// As long as the node fails, we call delete pods to delete all pods. Node controller sync
// is not a closed loop process, there is no feedback from other components regarding pod
// status. Keep listing pods to sanity check if pods are all deleted makes more sense.
s.deletePods(node.Name)
}
}

conditions = append(conditions, *newReadyCondition)

return conditions
}

// checkNodeReady checks raw node ready condition, without timestamp set.
// checkNodeReady checks raw node ready condition, without transition timestamp set.
func (s *NodeController) checkNodeReady(node *api.Node) *api.NodeCondition {
switch status, err := s.kubeletClient.HealthCheck(node.Name); {
case err != nil:
glog.V(2).Infof("NodeController: node %s health check error: %v", node.Name, err)
return &api.NodeCondition{
Kind: api.NodeReady,
Status: api.ConditionUnknown,
Reason: fmt.Sprintf("Node health check error: %v", err),
Kind: api.NodeReady,
Status: api.ConditionUnknown,
Reason: fmt.Sprintf("Node health check error: %v", err),
LastProbeTime: util.Now(),
}
case status == probe.Failure:
return &api.NodeCondition{
Kind: api.NodeReady,
Status: api.ConditionNone,
Reason: fmt.Sprintf("Node health check failed: kubelet /healthz endpoint returns not ok"),
Kind: api.NodeReady,
Status: api.ConditionNone,
Reason: fmt.Sprintf("Node health check failed: kubelet /healthz endpoint returns not ok"),
LastProbeTime: util.Now(),
}
default:
return &api.NodeCondition{
Kind: api.NodeReady,
Status: api.ConditionFull,
Reason: fmt.Sprintf("Node health check succeeded: kubelet /healthz endpoint returns ok"),
Kind: api.NodeReady,
Status: api.ConditionFull,
Reason: fmt.Sprintf("Node health check succeeded: kubelet /healthz endpoint returns ok"),
LastProbeTime: util.Now(),
}
}
}

// deletePods will delete all pods from master running on given node.
func (s *NodeController) deletePods(nodeID string) error {
glog.V(2).Infof("Delete all pods from %v", nodeID)
// TODO: We don't yet have field selectors from client, see issue #1362.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduler watches for pods that don't have the Host field set, so that field is supposed as a selector. Could you use that? See:
https://github.com/GoogleCloudPlatform/kubernetes/blob/master/pkg/registry/pod/rest.go#L108

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the link from your comment is to convert pod status to selectable fields, i.e. if we list Pod.Host == "", then we need to have all other fields to match against. It looks like an apiserver thing.

Scheduler seems to do it's own listing:
https://github.com/GoogleCloudPlatform/kubernetes/blob/6d9845361f05eaddb350f7df59efe94ca9d63ec1/pkg/client/cache/listwatch.go#L41

But our standard client only have label select
https://github.com/GoogleCloudPlatform/kubernetes/blob/6d9845361f05eaddb350f7df59efe94ca9d63ec1/pkg/client/pods.go#L58

pods, err := s.kubeClient.Pods(api.NamespaceAll).List(labels.Everything())
if err != nil {
return err
}
for _, pod := range pods.Items {
if pod.Status.Host != nodeID {
continue
}
glog.V(2).Infof("Delete pod %v", pod.Name)
if err := s.kubeClient.Pods(api.NamespaceAll).Delete(pod.Name); err != nil {
glog.Errorf("Error deleting pod %v", pod.Name)
}
}

return nil
}

// StaticNodes constructs and returns api.NodeList for static nodes. If error
// occurs, an empty NodeList will be returned with a non-nil error info.
func (s *NodeController) StaticNodes() (*api.NodeList, error) {
Expand Down