diff --git a/contrib/mesos/pkg/controller/node/node.go b/contrib/mesos/pkg/controller/node/node.go index 496cf009b464a..1307cb98bab77 100644 --- a/contrib/mesos/pkg/controller/node/node.go +++ b/contrib/mesos/pkg/controller/node/node.go @@ -26,6 +26,7 @@ import ( log "github.com/golang/glog" "k8s.io/kubernetes/cmd/kubelet/app/options" "k8s.io/kubernetes/contrib/mesos/pkg/node" + "k8s.io/kubernetes/contrib/mesos/pkg/podutil" "k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" @@ -48,26 +49,40 @@ type ( worker func() } + podIndex struct { + nodePods map[string]sets.String // map node to pod + } + Controller struct { - client *clientset.Clientset - nowFunc func() time.Time - monitors []*monitor - addFuncs []func(_ interface{}) - updateFuncs []func(_, _ interface{}) - deleteFuncs []func(_ interface{}) + client *clientset.Clientset + clockFunc func() time.Time + monitors []*monitor // Node framework and store nodeController *framework.Controller nodeStore cache.StoreToNodeLister + + // Pod framework and store + podController *framework.Controller + pods cache.Indexer } // Option is a functional option for Controller Option func(*Controller) ) -func NowFunc(f func() time.Time) Option { +// PodSpecNodeNameIndexFunc is a default index function that indexes based on an pod's node name +func PodSpecNodeNameIndexFunc(obj interface{}) ([]string, error) { + pod, ok := obj.(*api.Pod) + if !ok { + return []string{""}, fmt.Errorf("object was not a pod") + } + return []string{pod.Spec.NodeName}, nil +} + +func ClockFunc(f func() time.Time) Option { return func(c *Controller) { - c.nowFunc = f + c.clockFunc = f } } @@ -80,8 +95,8 @@ func SlaveStatusController() Option { func NewController(client *clientset.Clientset, relistPeriod time.Duration, options ...Option) *Controller { c := &Controller{ - client: client, - nowFunc: time.Now, + client: client, + clockFunc: time.Now, } // client is only optional for unit tests if client != nil { @@ -96,23 +111,21 @@ func NewController(client *clientset.Clientset, relistPeriod time.Duration, opti }, &api.Node{}, relistPeriod, - framework.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - for _, f := range c.addFuncs { - f(obj) - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - for _, f := range c.updateFuncs { - f(oldObj, newObj) - } + framework.ResourceEventHandlerFuncs{}, + ) + c.pods, c.podController = framework.NewIndexerInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (kuberuntime.Object, error) { + return c.client.Core().Pods(api.NamespaceAll).List(options) }, - DeleteFunc: func(obj interface{}) { - for _, f := range c.deleteFuncs { - f(obj) - } + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return c.client.Core().Pods(api.NamespaceAll).Watch(options) }, }, + &api.Pod{}, + relistPeriod, + framework.ResourceEventHandlerFuncs{}, + cache.Indexers{api.PodHostField: PodSpecNodeNameIndexFunc}, ) } // apply functional options @@ -132,64 +145,13 @@ func (c *Controller) Run(terminate <-chan struct{}) { <-terminate } -// watch for changes to RunningExecutorCondition -type mesosContainerController struct { - store cache.Store - gracePeriod time.Duration // nodes w/ RunningExecutorCondition updates older than lastHeartbeat+gracePeriod are considered stale -} - -func (c *Controller) newMesosContainerController() *mesosContainerController { - controller := &mesosContainerController{ - store: c.nodeStore.Store, - } -} - -func (mcc *mesosContainerController) checkCondition() { - // it's possible that some number of nodes are no longer part of the cluster. in this - // case k8s should take care of pod state cleanup in apiserver, so we don't worry about - // that here. - // we're primarily interested in cleaning up stale pod records from apiserver: - // - after a pod is bound, kubelet becomes the source of truth re: pod state - // - kubelet isn't guanteed to shut down cleanly - // - may leave lingering docker containers (solved by mesos orphan CT GC?) - // - may leave lingering pod records in apiserver (want to solve this here!) - // - our slaveStatusController simulates NodeReady condition for **all** known slaves - // - this prevents k8s pod GC/cleanup from properly identifying stale pod state for - // nodes that exist but aren't running kubelet - nodes := ssc.store.List() - for _, obj := range nodes { - n := obj.(*api.Node) - cond := getCondition(&n.Status, node.RunningExecutorCondition) - if cond != nil { - // 1) heartbeat is current --> uuid is possibly stale - // ?? kubelet cycled but we haven't observed a status update yet - // a) delete all pods on the node, except non-matching latest newer UUID;timestamp within grace period or else matching UUID's - // 2) heartbeat != current --> uuid is stale - // ?? kubelet is busy/deadlocked - // ?? kubelet is network-partitioned - // ?? kubelet died "uncleanly" - // ?? new kubelet launched (cycled/upgraded) but we haven't observed a status update yet - // a) delete all pods on the node, except non-matching latest newer UUID;timestamp within grace period - } else { - // 1) new slave on old node (node bounced)? start evicting pods - // ?? node bounced/cycled? - // .. if it was gone long enough k8s may have removed it - // .. it could come back before all pods assigned to it were deleted - // ?? kubelet launched but we haven't observed a status update yet - // a) delete all pods on the node, except newest UUID;timestamp within grace period - // 2) new slave on new node? .. no garbage, no kubelet - // ?? kubelet launched but we haven't observed a status update yet - // a) delete all pods on the node, except newest UUID;timestamp within grace period - } - } -} - type slaveStatusController struct { *monitor gracePeriod time.Duration - store cache.Store - nowFunc func() time.Time + nodeCache cache.Store + clockFunc func() time.Time nodes unversionedcore.NodesGetter + podsForNode func(string) ([]interface{}, error) // field for easier unit testing } func (c *Controller) newSlaveStatusController() *slaveStatusController { @@ -198,14 +160,18 @@ func (c *Controller) newSlaveStatusController() *slaveStatusController { monitor: &monitor{ heartbeatPeriod: kubecfg.NodeStatusUpdateFrequency.Duration, }, - nowFunc: c.nowFunc, - store: c.nodeStore.Store, + clockFunc: c.clockFunc, + nodeCache: c.nodeStore.Store, } // optional for easier unit testing if c.client != nil { controller.nodes = c.client.Core() } + controller.podsForNode = func(nodeName string) ([]interface{}, error) { + key := &api.Pod{Spec: api.PodSpec{NodeName: nodeName}} + return c.pods.Index(api.PodHostField, key) + } // avoid flapping by waiting at least twice the kubetlet update frequency, i.e. // give the kubelet the chance twice to update the heartbeat. This is necessary @@ -228,25 +194,141 @@ func (ssc *slaveStatusController) checkStatus() { // update status for nodes which do not have a kubelet running and // which are still existing as slave. This status update must be done // before the node controller counts down the NodeMonitorGracePeriod - nodes := ssc.store.List() + nodes := ssc.nodeCache.List() for _, n := range nodes { node := n.(*api.Node) - if !slavesWithoutKubelet.Has(node.Spec.ExternalID) { - // let the kubelet do its job updating the status, or the - // node controller will remove this node if the node does not even - // exist anymore + + updated, err := ssc.checkWithoutKubelet(node, slavesWithoutKubelet) + if err != nil { + log.Errorf("Error updating node status: %v", err) + } + if updated { continue } - err := ssc.updateStatus(node) + _, err = ssc.checkStalePods(node) if err != nil { - log.Errorf("Error updating node status: %v", err) + log.Errorf("failed stale pod check: %v", err) + } + } +} + +// it's possible that some number of nodes are no longer part of the cluster. in this +// case k8s should take care of pod state cleanup in apiserver, so we don't worry about +// that here. +// we're primarily interested in cleaning up stale pod records from apiserver: +// - after a pod is bound, kubelet becomes the source of truth re: pod state +// - kubelet isn't guanteed to shut down cleanly +// - may leave lingering docker containers (solved by mesos orphan CT GC?) +// - may leave lingering pod records in apiserver (want to solve this here!) +// - our slaveStatusController simulates NodeReady condition for **all** known slaves +// - this prevents k8s pod GC/cleanup from properly identifying stale pod state for +// nodes that exist but aren't running kubelet +func (ssc *slaveStatusController) checkStalePods(n *api.Node) (bool, error) { + cond := getCondition(&n.Status, node.RunningExecutorCondition) + if cond != nil && cond.Status == api.ConditionTrue { + // 1) heartbeat is current --> uuid is possibly stale + // ?? kubelet cycled but we haven't observed a status update yet + // a) delete all pods on the node, except non-matching latest newer UUID;timestamp within grace period or else matching UUID's + parsed, ok := node.ParseConditionReason(*cond) + if !ok { + log.Warningf("unable to parse RunningExecutorCondition reason %q", cond.Reason) + return false, nil + } + + pods, err := ssc.podsForNode(n.Name) + if err != nil { + return false, err + } + + type nonmatch struct { + *api.Pod + containerTimestamp time.Time + } + var ( + execContainerID = parsed.(string) + removePods []*api.Pod + matchingPods = make([]*api.Pod, 0, len(pods)) // the most likely case + matchingTimestamp time.Time + newestNonMatch time.Time + nonmatchPods []nonmatch + ) + for _, obj := range pods { + pod := obj.(*api.Pod) + podContainer, timestamp, ok := podutil.UUIDAnnotation(pod) + if !ok { + // bad pod! missing properly formatted annotation + log.Warningf("pod missing mesos container UUID annotation?! %v/%v", pod.Namespace, pod.Name) + removePods = append(removePods, pod) + } else if podContainer == execContainerID { + matchingPods = append(matchingPods, pod) + matchingTimestamp = timestamp + } else if timestamp.After(cond.LastTransitionTime.Time) { + nonmatchPods = append(nonmatchPods, nonmatch{pod, timestamp}) + if timestamp.After(newestNonMatch) { + newestNonMatch = timestamp + } + } else { + // container ID's don't match, and this is old; delete it! + removePods = append(removePods, pod) + } + } + if len(matchingPods) > 0 { + foundNewerNonMatching := false + for _, pod := range nonmatchPods { + // only keep the newest non-matching pods + // else, check that this is actually newer than matchingPods + if pod.containerTimestamp.Before(newestNonMatch) { + removePods = append(removePods, pod.Pod) + } else if pod.containerTimestamp.After(matchingTimestamp) { + foundNewerNonMatching = true + } + } + if foundNewerNonMatching { + // matchingPods are stale, remove them + removePods = append(removePods, matchingPods...) + } } + ssc.deletePods(removePods) + + // we probably can't take action on the following, because this func is only + // invoked IF the heartbeat is current!!! + // 2) heartbeat != current --> uuid is stale + // ?? kubelet is busy/deadlocked + // ?? kubelet is network-partitioned + // ?? kubelet died "uncleanly" + // ?? new kubelet launched (cycled/upgraded) but we haven't observed a status update yet + // a) delete all pods on the node, except non-matching latest newer UUID;timestamp within grace period + } else { + // 1) new slave on old node (node bounced)? start evicting pods + // ?? node bounced/cycled? + // .. if it was gone long enough k8s may have removed it + // .. it could come back before all pods assigned to it were deleted + // ?? kubelet launched but we haven't observed a status update yet + // a) delete all pods on the node, except newest UUID;timestamp within grace period + // 2) new slave on new node? .. no garbage, no kubelet + // ?? kubelet launched but we haven't observed a status update yet + // a) delete all pods on the node, except newest UUID;timestamp within grace period } + return false, nil } -func (ssc *slaveStatusController) updateStatus(n *api.Node) error { +func (ssc *slaveStatusController) deletePods(_ []*api.Pod) { + // TODO(jdef) implement me +} + +func (ssc *slaveStatusController) checkWithoutKubelet(node *api.Node, slavesWithoutKubelet sets.String) (bool, error) { + if !slavesWithoutKubelet.Has(node.Spec.ExternalID) { + // let the kubelet do its job updating the status, or the + // node controller will remove this node if the node does not even + // exist anymore + return false, nil + } + return true, ssc.setWithoutKubeletStatus(node) +} + +func (ssc *slaveStatusController) setWithoutKubeletStatus(n *api.Node) error { for i := 0; i < nodeStatusUpdateRetry; i++ { if err := ssc.tryUpdateStatus(n); err != nil && !errors.IsConflict(err) { log.Errorf("Error updating node status, will retry: %v", err) @@ -262,7 +344,7 @@ func (ssc *slaveStatusController) updateStatus(n *api.Node) error { // at all. func (ssc *slaveStatusController) nodeWithUpdatedStatus(n *api.Node) (*api.Node, bool, error) { readyCondition := getCondition(&n.Status, api.NodeReady) - currentTime := unversioned.NewTime(ssc.nowFunc()) + currentTime := unversioned.NewTime(ssc.clockFunc()) if readyCondition != nil && !currentTime.After(readyCondition.LastHeartbeatTime.Add(ssc.gracePeriod)) { return n, false, nil @@ -281,28 +363,37 @@ func (ssc *slaveStatusController) nodeWithUpdatedStatus(n *api.Node) (*api.Node, Message: node.SlaveReadyMessage, LastHeartbeatTime: currentTime, } + newRunningExecutorCondition := api.NodeCondition{ + Type: node.RunningExecutorCondition, + Status: api.ConditionFalse, + LastHeartbeatTime: currentTime, + } - found := false - for i := range n.Status.Conditions { - c := &n.Status.Conditions[i] - if c.Type == api.NodeReady { - if c.Status == newNodeReadyCondition.Status { - newNodeReadyCondition.LastTransitionTime = c.LastTransitionTime - } else { - newNodeReadyCondition.LastTransitionTime = currentTime + updateCondition(n, newNodeReadyCondition, newRunningExecutorCondition) + return n, true, nil +} + +// updateCondition assumes that the LastHeartbeatTime has been updated +func updateCondition(n *api.Node, conditions ...api.NodeCondition) { +outerLoop: + for _, targetCondition := range conditions { + for i := range n.Status.Conditions { + c := &n.Status.Conditions[i] + if c.Type == targetCondition.Type { + if c.Status == targetCondition.Status { + // preserve last transition time while the status is unchanged + targetCondition.LastTransitionTime = c.LastTransitionTime + } else { + targetCondition.LastTransitionTime = targetCondition.LastHeartbeatTime + } + n.Status.Conditions[i] = targetCondition + continue outerLoop } - n.Status.Conditions[i] = newNodeReadyCondition - found = true - break } - } - if !found { - newNodeReadyCondition.LastTransitionTime = currentTime - n.Status.Conditions = append(n.Status.Conditions, newNodeReadyCondition) + targetCondition.LastTransitionTime = targetCondition.LastHeartbeatTime + n.Status.Conditions = append(n.Status.Conditions, targetCondition) } - - return n, true, nil } // tryUpdateStatus updates the status of the given node and tries to persist that diff --git a/contrib/mesos/pkg/executor/service/service.go b/contrib/mesos/pkg/executor/service/service.go index ab4bd3af79860..b9edd8fce7c95 100644 --- a/contrib/mesos/pkg/executor/service/service.go +++ b/contrib/mesos/pkg/executor/service/service.go @@ -132,21 +132,17 @@ func (s *KubeletExecutorServer) runExecutor( }), } if s.containerID != "" { - // tag static pods with annotation that reflects the executor container UUID and the - // birth-time of the executor. useful for GC. it's important that this same UUID;ts - // is applied to ALL pods created by this executor. see contrib/mesos/pkg/controller/node - timestamp := time.Now().UTC().Format(time.RFC3339) - uuidAnnotator := podutil.FilterFunc(func(pod *api.Pod) (bool, error) { - podutil.Annotate(&pod.ObjectMeta, map[string]string{ - meta.TimestampedExecutorContainerUUID: s.containerID + ";" + timestamp, + var ( + // tag static pods with annotation that reflects the executor container UUID and the + // birth-time of the executor. useful for GC. it's important that this same UUID;ts + // is applied to ALL pods created by this executor. see contrib/mesos/pkg/controller/node + uuidAnnotator = podutil.UUIDAnnotator(s.containerID, time.Now()) + + // tag all pod containers with the containerID so that they can be properly GC'd by Mesos + userContainerEnv = podutil.Environment([]api.EnvVar{ + {Name: envContainerID, Value: s.containerID}, }) - return true, nil - }) - - // tag all pod containers with the containerID so that they can be properly GC'd by Mesos - userContainerEnv := podutil.Environment([]api.EnvVar{ - {Name: envContainerID, Value: s.containerID}, - }) + ) staticPodFilters = append(staticPodFilters, uuidAnnotator, userContainerEnv) // annotate non-static pods with a timestamped executor container UUID as well. we need to diff --git a/contrib/mesos/pkg/node/condition.go b/contrib/mesos/pkg/node/condition.go index ef5387f93dad7..51f3e0a72f0d7 100644 --- a/contrib/mesos/pkg/node/condition.go +++ b/contrib/mesos/pkg/node/condition.go @@ -17,6 +17,8 @@ limitations under the License. package node import ( + "strings" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/util" @@ -31,6 +33,20 @@ const ( RunningExecutorMessage = "node is running k8sm executor" ) +const uuidPrefix = "uuid_" // prefix for Condition.Reason of RunningExecutorCondition + +func ParseConditionReason(cond api.NodeCondition) (interface{}, bool) { + switch cond.Type { + case RunningExecutorCondition: + if strings.HasPrefix(cond.Reason, uuidPrefix) && len(cond.Reason) > len(uuidPrefix) { + return cond.Reason[len(uuidPrefix):], true + } + default: + // noop + } + return nil, false +} + // SetRunningExecutorCondition serves to associate an executor heartbeat w/ the // node (we only allow a single executor instance running per node anyway). The Reason // field is populated with `uuid={mesosContainerID}` in order to allow an external @@ -42,7 +58,7 @@ func SetRunningExecutorCondition(mesosContainerID string, clock util.Clock) func nodeCondition *api.NodeCondition ) - reasonCode := "uuid_" + mesosContainerID + reasonCode := uuidPrefix + mesosContainerID for i := range node.Status.Conditions { if node.Status.Conditions[i].Type == RunningExecutorCondition { nodeCondition = &node.Status.Conditions[i] diff --git a/contrib/mesos/pkg/podutil/helpers.go b/contrib/mesos/pkg/podutil/helpers.go new file mode 100644 index 0000000000000..b0d9729acca2a --- /dev/null +++ b/contrib/mesos/pkg/podutil/helpers.go @@ -0,0 +1,58 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podutil + +import ( + "strings" + "time" + + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" + "k8s.io/kubernetes/pkg/api" +) + +func UUIDAnnotator(containerID string, timestamp time.Time) FilterFunc { + value := containerID + ";" + timestamp.UTC().Format(time.RFC3339) + return FilterFunc(func(pod *api.Pod) (bool, error) { + Annotate(&pod.ObjectMeta, map[string]string{ + meta.TimestampedExecutorContainerUUID: value, + }) + return true, nil + }) +} + +func UUIDAnnotation(pod *api.Pod) (containerID string, timestamp time.Time, ok bool) { + blob, found := pod.Annotations[meta.TimestampedExecutorContainerUUID] + if !found { + return // missing annotation + } + i := strings.Index(blob, ";") + if i < 1 { + return // mising containerID + } + if i >= len(blob)-1 { + return // missing timestamp + } + t, err := time.Parse(time.RFC3339, blob[i+1:]) + if err != nil { + return // bad timestamp! + } + + containerID = blob[:i] + timestamp = t + ok = true + return +}