Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node controller supports disabling node probes. #4808

Merged
merged 1 commit into from
Feb 27, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
nodeResources := &api.NodeResources{}

nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute)
nodeController.Run(5*time.Second, true)
nodeController.Run(5*time.Second, true, true)

// Kubelet (localhost)
testRootDir := makeTempDirOrDie("kubelet_integ_1.")
Expand Down
5 changes: 4 additions & 1 deletion cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type CMServer struct {
RegisterRetryCount int
MachineList util.StringList
SyncNodeList bool
SyncNodeStatus bool
PodEvictionTimeout time.Duration

// TODO: Discover these by pinging the host machines, and rip out these params.
Expand All @@ -75,6 +76,7 @@ func NewCMServer() *CMServer {
NodeMilliCPU: 1000,
NodeMemory: resource.MustParse("3Gi"),
SyncNodeList: true,
SyncNodeStatus: true,
KubeletConfig: client.KubeletConfig{
Port: ports.KubeletPort,
EnableHttps: false,
Expand All @@ -100,6 +102,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
"The number of retries for initial node registration. Retry interval equals node_sync_period.")
fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.")
fs.BoolVar(&s.SyncNodeList, "sync_nodes", s.SyncNodeList, "If true, and --cloud_provider is specified, sync nodes from the cloud provider. Default true.")
fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, "Should node controler send probes to kubelets and update NodeStatus.")
// TODO: Discover these by pinging the host machines, and rip out these flags.
// TODO: in the meantime, use resource.QuantityFlag() instead of these
fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node")
Expand Down Expand Up @@ -158,7 +161,7 @@ func (s *CMServer) Run(_ []string) error {

nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources,
kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout)
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList)
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList, s.SyncNodeStatus)

resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod)
Expand Down
2 changes: 1 addition & 1 deletion cmd/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort}

nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute)
nodeController.Run(10*time.Second, true)
nodeController.Run(10*time.Second, true, true)

endpoints := service.NewEndpointController(cl)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
Expand Down
51 changes: 43 additions & 8 deletions pkg/cloudprovider/controller/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewNodeController(

// Run creates initial node list and start syncing instances from cloudprovider if any.
// It also starts syncing cluster node status.
func (s *NodeController) Run(period time.Duration, syncNodeList bool) {
func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus bool) {
// Register intial set of nodes with their status set.
var nodes *api.NodeList
var err error
Expand All @@ -96,7 +96,6 @@ func (s *NodeController) Run(period time.Duration, syncNodeList bool) {
glog.Errorf("Error loading initial static nodes: %v", err)
}
}
nodes = s.DoChecks(nodes)
nodes, err = s.PopulateIPs(nodes)
if err != nil {
glog.Errorf("Error getting nodes ips: %v", err)
Expand All @@ -114,12 +113,21 @@ func (s *NodeController) Run(period time.Duration, syncNodeList bool) {
}, period)
}

// Start syncing node status.
go util.Forever(func() {
if err = s.SyncNodeStatus(); err != nil {
glog.Errorf("Error syncing status: %v", err)
}
}, period)
if syncNodeStatus {
// Start syncing node status.
go util.Forever(func() {
if err = s.SyncNodeStatus(); err != nil {
glog.Errorf("Error syncing status: %v", err)
}
}, period)
} else {
// Start checking node reachability and evicting timeouted pods.
go util.Forever(func() {
if err = s.EvictTimeoutedPods(); err != nil {
glog.Errorf("Error evicting timeouted pods: %v", err)
}
}, period)
}
}

// RegisterNodes registers the given list of nodes, it keeps retrying for `retryCount` times.
Expand Down Expand Up @@ -216,6 +224,33 @@ func (s *NodeController) SyncNodeStatus() error {
return nil
}

// EvictTimeoutedPods verifies if nodes are reachable by checking the time of last probe
// and deletes pods from not reachable nodes.
func (s *NodeController) EvictTimeoutedPods() error {
nodes, err := s.kubeClient.Nodes().List()
if err != nil {
return err
}
for _, node := range nodes.Items {
if util.Now().After(latestReadyTime(&node).Add(s.podEvictionTimeout)) {
s.deletePods(node.Name)
}
}
return nil
}

func latestReadyTime(node *api.Node) util.Time {
readyTime := node.ObjectMeta.CreationTimestamp
for _, condition := range node.Status.Conditions {
if condition.Type == api.NodeReady &&
condition.Status == api.ConditionFull &&
condition.LastProbeTime.After(readyTime.Time) {
readyTime = condition.LastProbeTime
}
}
return readyTime
}

// PopulateIPs queries IPs for given list of nodes.
func (s *NodeController) PopulateIPs(nodes *api.NodeList) (*api.NodeList, error) {
if s.isRunningCloudProvider() {
Expand Down
112 changes: 112 additions & 0 deletions pkg/cloudprovider/controller/nodecontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,118 @@ func TestSyncNodeStatusTransitionTime(t *testing.T) {
}
}

func TestEvictTimeoutedPods(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
expectedRequestCount int
expectedActions []client.FakeAction
}{
// Node created long time ago, with no status.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
expectedRequestCount: 1, // List
expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}},
},
// Node created recently, with no status.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: util.Now(),
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
expectedRequestCount: 1, // List
expectedActions: nil,
},
// Node created long time ago, with status updated long time ago.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionFull,
LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
expectedRequestCount: 1, // List
expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}},
},
// Node created long time ago, with status updated recently.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionFull,
LastProbeTime: util.Now(),
},
},
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
expectedRequestCount: 1, // List
expectedActions: nil,
},
}

for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute)
if err := nodeController.EvictTimeoutedPods(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item.expectedRequestCount != item.fakeNodeHandler.RequestCount {
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
}
if !reflect.DeepEqual(item.expectedActions, item.fakeNodeHandler.Actions) {
t.Errorf("actions differs, expected %+v, got %+v", item.expectedActions, item.fakeNodeHandler.Actions)
}
}
}

func TestSyncNodeStatusDeletePods(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
Expand Down