Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define interfaces for kubelet pod admission and eviction #24344

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 47 additions & 1 deletion pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/pleg"
Expand Down Expand Up @@ -778,6 +779,16 @@ type Kubelet struct {

// handlers called during the tryUpdateNodeStatus cycle
setNodeStatusFuncs []func(*api.Node) error

// TODO: think about moving this to be centralized in PodWorkers in follow-on.
// the list of handlers to call during pod admission.
lifecycle.PodAdmitHandlers

// the list of handlers to call during pod sync loop.
lifecycle.PodSyncLoopHandlers

// the list of handlers to call during pod sync.
lifecycle.PodSyncHandlers
}

// Validate given node IP belongs to the current host
Expand Down Expand Up @@ -1839,7 +1850,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecont
kl.statusManager.SetPodStatus(pod, apiPodStatus)

// Kill pod if it should not be running
if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil {
if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil || apiPodStatus.Phase == api.PodFailed {
if err := kl.killPod(pod, nil, podStatus); err != nil {
utilruntime.HandleError(err)
}
Expand Down Expand Up @@ -2142,6 +2153,7 @@ func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
// Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
// * pod whose work is ready.
// * pod past the active deadline.
// * internal modules that request sync of a pod.
func (kl *Kubelet) getPodsToSync() []*api.Pod {
allPods := kl.podManager.GetPods()
podUIDs := kl.workQueue.GetWork()
Expand All @@ -2151,6 +2163,7 @@ func (kl *Kubelet) getPodsToSync() []*api.Pod {
}
var podsToSync []*api.Pod
for _, pod := range allPods {
// TODO: move active deadline code into a sync/evict pattern
if kl.pastActiveDeadline(pod) {
// The pod has passed the active deadline
podsToSync = append(podsToSync, pod)
Expand All @@ -2159,6 +2172,13 @@ func (kl *Kubelet) getPodsToSync() []*api.Pod {
if podUIDSet.Has(string(pod.UID)) {
// The work of the pod is ready
podsToSync = append(podsToSync, pod)
continue
}
for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers {
if podSyncLoopHandler.ShouldSync(pod) {
podsToSync = append(podsToSync, pod)
break
}
}
}
return podsToSync
Expand Down Expand Up @@ -2468,6 +2488,18 @@ func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, str
otherPods = append(otherPods, p)
}
}

// the kubelet will invoke each pod admit handler in sequence
// if any handler rejects, the pod is rejected.
// TODO: move predicate check into a pod admitter
// TODO: move out of disk check into a pod admitter
// TODO: out of resource eviction should have a pod admitter call-out
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: otherPods}
for _, podAdmitHandler := range kl.PodAdmitHandlers {
if result := podAdmitHandler.Admit(attrs); !result.Admit {
return false, result.Reason, result.Message
}
}
nodeInfo := schedulercache.NewNodeInfo(otherPods...)
nodeInfo.SetNode(node)
fit, err := predicates.GeneralPredicates(pod, kl.nodeName, nodeInfo)
Expand Down Expand Up @@ -2495,6 +2527,7 @@ func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, str
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), "predicate fails due to isOutOfDisk")
return false, "OutOfDisk", "cannot be started due to lack of disk space."
}

return true, "", ""
}

Expand Down Expand Up @@ -3461,7 +3494,20 @@ func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase {
// internal pod status.
func (kl *Kubelet) generateAPIPodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) api.PodStatus {
glog.V(3).Infof("Generating status for %q", format.Pod(pod))

// check if an internal module has requested the pod is evicted.
for _, podSyncHandler := range kl.PodSyncHandlers {
if result := podSyncHandler.ShouldEvict(pod); result.Evict {
return api.PodStatus{
Phase: api.PodFailed,
Reason: result.Reason,
Message: result.Message,
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another potential issue (without knowing how pod eviction is implemented) is that the pod workers still don't know that they need to kill the pod. Ignore me if this is not the right PR to discuss this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this PR, I think we should look at this as a pattern for active deadline being modularized. I think how low resource does eviction can be discussed in that PR as latency there is important.


// TODO: Consider include the container information.
// TODO: move this into a sync/evictor
if kl.pastActiveDeadline(pod) {
reason := "DeadlineExceeded"
kl.recorder.Eventf(pod, api.EventTypeNormal, reason, "Pod was active on the node longer than specified deadline")
Expand Down
189 changes: 189 additions & 0 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/network"
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
"k8s.io/kubernetes/pkg/kubelet/pleg"
Expand Down Expand Up @@ -4693,3 +4694,191 @@ func TestGenerateAPIPodStatusWithReasonCache(t *testing.T) {
verifyStatus(t, i, apiStatus, test.expectedState, test.expectedLastTerminationState)
}
}

// testPodAdmitHandler is a lifecycle.PodAdmitHandler for testing.
type testPodAdmitHandler struct {
// list of pods to reject.
podsToReject []*api.Pod
}

// Admit rejects all pods in the podsToReject list with a matching UID.
func (a *testPodAdmitHandler) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
for _, podToReject := range a.podsToReject {
if podToReject.UID == attrs.Pod.UID {
return lifecycle.PodAdmitResult{Admit: false, Reason: "Rejected", Message: "Pod is rejected"}
}
}
return lifecycle.PodAdmitResult{Admit: true}
}

// Test verifies that the kubelet invokes an admission handler during HandlePodAdditions.
func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) {
testKubelet := newTestKubelet(t)
kl := testKubelet.kubelet
kl.nodeLister = testNodeLister{nodes: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: kl.nodeName},
Status: api.NodeStatus{
Allocatable: api.ResourceList{
api.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI),
},
},
},
}}
kl.nodeInfo = testNodeInfo{nodes: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: kl.nodeName},
Status: api.NodeStatus{
Allocatable: api.ResourceList{
api.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI),
},
},
},
}}
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)

pods := []*api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "123456789",
Name: "podA",
Namespace: "foo",
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "987654321",
Name: "podB",
Namespace: "foo",
},
},
}
podToReject := pods[0]
podToAdmit := pods[1]
podsToReject := []*api.Pod{podToReject}

kl.AddPodAdmitHandler(&testPodAdmitHandler{podsToReject: podsToReject})

kl.HandlePodAdditions(pods)
// Check pod status stored in the status map.
// podToReject should be Failed
status, found := kl.statusManager.GetPodStatus(podToReject.UID)
if !found {
t.Fatalf("status of pod %q is not found in the status map", podToReject.UID)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
}
// podToAdmit should be Pending
status, found = kl.statusManager.GetPodStatus(podToAdmit.UID)
if !found {
t.Fatalf("status of pod %q is not found in the status map", podToAdmit.UID)
}
if status.Phase != api.PodPending {
t.Fatalf("expected pod status %q. Got %q.", api.PodPending, status.Phase)
}
}

// testPodSyncLoopHandler is a lifecycle.PodSyncLoopHandler that is used for testing.
type testPodSyncLoopHandler struct {
// list of pods to sync
podsToSync []*api.Pod
}

// ShouldSync evaluates if the pod should be synced from the kubelet.
func (a *testPodSyncLoopHandler) ShouldSync(pod *api.Pod) bool {
for _, podToSync := range a.podsToSync {
if podToSync.UID == pod.UID {
return true
}
}
return false
}

// TestGetPodsToSyncInvokesPodSyncLoopHandlers ensures that the get pods to sync routine invokes the handler.
func TestGetPodsToSyncInvokesPodSyncLoopHandlers(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
pods := newTestPods(5)
podUIDs := []types.UID{}
for _, pod := range pods {
podUIDs = append(podUIDs, pod.UID)
}
podsToSync := []*api.Pod{pods[0]}
kubelet.AddPodSyncLoopHandler(&testPodSyncLoopHandler{podsToSync})

kubelet.podManager.SetPods(pods)

expectedPodsUID := []types.UID{pods[0].UID}

podsToSync = kubelet.getPodsToSync()

if len(podsToSync) == len(expectedPodsUID) {
var rightNum int
for _, podUID := range expectedPodsUID {
for _, podToSync := range podsToSync {
if podToSync.UID == podUID {
rightNum++
break
}
}
}
if rightNum != len(expectedPodsUID) {
// Just for report error
podsToSyncUID := []types.UID{}
for _, podToSync := range podsToSync {
podsToSyncUID = append(podsToSyncUID, podToSync.UID)
}
t.Errorf("expected pods %v to sync, got %v", expectedPodsUID, podsToSyncUID)
}

} else {
t.Errorf("expected %d pods to sync, got %d", 3, len(podsToSync))
}
}

// testPodSyncHandler is a lifecycle.PodSyncHandler that is used for testing.
type testPodSyncHandler struct {
// list of pods to evict.
podsToEvict []*api.Pod
// the reason for the eviction
reason string
// the mesage for the eviction
message string
}

// ShouldEvict evaluates if the pod should be evicted from the kubelet.
func (a *testPodSyncHandler) ShouldEvict(pod *api.Pod) lifecycle.ShouldEvictResponse {
for _, podToEvict := range a.podsToEvict {
if podToEvict.UID == pod.UID {
return lifecycle.ShouldEvictResponse{Evict: true, Reason: a.reason, Message: a.message}
}
}
return lifecycle.ShouldEvictResponse{Evict: false}
}

// TestGenerateAPIPodStatusInvokesPodSyncHandlers invokes the handlers and reports the proper status
func TestGenerateAPIPodStatusInvokesPodSyncHandlers(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
pod := newTestPods(1)[0]
podsToEvict := []*api.Pod{pod}
kubelet.AddPodSyncHandler(&testPodSyncHandler{podsToEvict, "Evicted", "because"})
status := &kubecontainer.PodStatus{
ID: pod.UID,
Name: pod.Name,
Namespace: pod.Namespace,
}
apiStatus := kubelet.generateAPIPodStatus(pod, status)
if apiStatus.Phase != api.PodFailed {
t.Fatalf("Expected phase %v, but got %v", api.PodFailed, apiStatus.Phase)
}
if apiStatus.Reason != "Evicted" {
t.Fatalf("Expected reason %v, but got %v", "Evicted", apiStatus.Reason)
}
if apiStatus.Message != "because" {
t.Fatalf("Expected message %v, but got %v", "because", apiStatus.Message)
}
}
3 changes: 2 additions & 1 deletion pkg/kubelet/lifecycle/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Handlers for pod lifecycle events.
// Handlers for pod lifecycle events and interfaces to integrate
// with kubelet admission, synchronization, and eviction of pods.
package lifecycle