Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move ActiveDeadlineSeconds implementation into a kubelet sync observer #24705

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
98 changes: 98 additions & 0 deletions pkg/kubelet/active_deadline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
Copyright 2016 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubelet

import (
"fmt"
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/util"
)

const (
reason = "DeadlineExceeded"
message = "Pod was active on the node longer than the specified deadline"
)

// activeDeadlineHandler knows how to enforce active deadlines on pods.
type activeDeadlineHandler struct {
// the clock to use for deadline enforcement
clock util.Clock
// the provider of pod status
podStatusProvider status.PodStatusProvider
// the recorder to dispatch events when we identify a pod has exceeded active deadline
recorder record.EventRecorder
}

// newActiveDeadlineHandler returns an active deadline handler that can enforce pod active deadline
func newActiveDeadlineHandler(
podStatusProvider status.PodStatusProvider,
recorder record.EventRecorder,
clock util.Clock,
) (*activeDeadlineHandler, error) {

// check for all required fields
if clock == nil || podStatusProvider == nil || recorder == nil {
return nil, fmt.Errorf("Required arguments must not be nil: %v, %v, %v", clock, podStatusProvider, recorder)
}
return &activeDeadlineHandler{
clock: clock,
podStatusProvider: podStatusProvider,
recorder: recorder,
}, nil
}

// ShouldSync returns true if the pod is past its active deadline.
func (m *activeDeadlineHandler) ShouldSync(pod *api.Pod) bool {
return m.pastActiveDeadline(pod)
}

// ShouldEvict returns true if the pod is past its active deadline.
// It dispatches an event that the pod should be evicted if it is past its deadline.
func (m *activeDeadlineHandler) ShouldEvict(pod *api.Pod) lifecycle.ShouldEvictResponse {
if !m.pastActiveDeadline(pod) {
return lifecycle.ShouldEvictResponse{Evict: false}
}
m.recorder.Eventf(pod, api.EventTypeNormal, reason, message)
return lifecycle.ShouldEvictResponse{Evict: true, Reason: reason, Message: message}
}

// pastActiveDeadline returns true if the pod has been active for more than its ActiveDeadlineSeconds
func (m *activeDeadlineHandler) pastActiveDeadline(pod *api.Pod) bool {
// no active deadline was specified
if pod.Spec.ActiveDeadlineSeconds == nil {
return false
}
// get the latest status to determine if it was started
podStatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
if !ok {
podStatus = pod.Status
}
// we have no start time so just return
if podStatus.StartTime.IsZero() {
return false
}
// determine if the deadline was exceeded
start := podStatus.StartTime.Time
duration := m.clock.Since(start)
allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second
return duration >= allowedDuration
}
95 changes: 95 additions & 0 deletions pkg/kubelet/active_deadline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
Copyright 2016 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubelet

import (
"testing"
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
)

// mockPodStatusProvider returns the status on the specified pod
type mockPodStatusProvider struct {
pods []*api.Pod
}

// GetPodStatus returns the status on the associated pod with matching uid (if found)
func (m *mockPodStatusProvider) GetPodStatus(uid types.UID) (api.PodStatus, bool) {
for _, pod := range m.pods {
if pod.UID == uid {
return pod.Status, true
}
}
return api.PodStatus{}, false
}

// TestActiveDeadlineHandler verifies the active deadline handler functions as expected.
func TestActiveDeadlineHandler(t *testing.T) {
pods := newTestPods(4)
fakeClock := util.NewFakeClock(time.Now())
podStatusProvider := &mockPodStatusProvider{pods: pods}
fakeRecorder := &record.FakeRecorder{}
handler, err := newActiveDeadlineHandler(podStatusProvider, fakeRecorder, fakeClock)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

now := unversioned.Now()
startTime := unversioned.NewTime(now.Time.Add(-1 * time.Minute))

// this pod has exceeded its active deadline
exceededActiveDeadlineSeconds := int64(30)
pods[0].Status.StartTime = &startTime
pods[0].Spec.ActiveDeadlineSeconds = &exceededActiveDeadlineSeconds

// this pod has not exceeded its active deadline
notYetActiveDeadlineSeconds := int64(120)
pods[1].Status.StartTime = &startTime
pods[1].Spec.ActiveDeadlineSeconds = &notYetActiveDeadlineSeconds

// this pod has no deadline
pods[2].Status.StartTime = &startTime
pods[2].Spec.ActiveDeadlineSeconds = nil

testCases := []struct {
pod *api.Pod
expected bool
}{{pods[0], true}, {pods[1], false}, {pods[2], false}, {pods[3], false}}

for i, testCase := range testCases {
if actual := handler.ShouldSync(testCase.pod); actual != testCase.expected {
t.Errorf("[%d] ShouldSync expected %#v, got %#v", i, testCase.expected, actual)
}
actual := handler.ShouldEvict(testCase.pod)
if actual.Evict != testCase.expected {
t.Errorf("[%d] ShouldEvict.Evict expected %#v, got %#v", i, testCase.expected, actual.Evict)
}
if testCase.expected {
if actual.Reason != reason {
t.Errorf("[%d] ShouldEvict.Reason expected %#v, got %#v", i, message, actual.Reason)
}
if actual.Message != message {
t.Errorf("[%d] ShouldEvict.Message expected %#v, got %#v", i, message, actual.Message)
}
}
}
}
46 changes: 8 additions & 38 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,14 @@ func NewMainKubelet(
klet.evictionManager = evictionManager
klet.AddPodAdmitHandler(evictionAdmitHandler)

// enable active deadline handler
activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, klet.recorder, klet.clock)
if err != nil {
return nil, err
}
klet.AddPodSyncLoopHandler(activeDeadlineHandler)
klet.AddPodSyncHandler(activeDeadlineHandler)

// apply functional Option's
for _, opt := range kubeOptions {
opt(klet)
Expand Down Expand Up @@ -2076,29 +2084,8 @@ func (kl *Kubelet) cleanupBandwidthLimits(allPods []*api.Pod) error {
return nil
}

// pastActiveDeadline returns true if the pod has been active for more than
// ActiveDeadlineSeconds.
func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
if pod.Spec.ActiveDeadlineSeconds != nil {
podStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok {
podStatus = pod.Status
}
if !podStatus.StartTime.IsZero() {
startTime := podStatus.StartTime.Time
duration := kl.clock.Since(startTime)
allowedDuration := time.Duration(*pod.Spec.ActiveDeadlineSeconds) * time.Second
if duration >= allowedDuration {
return true
}
}
}
return false
}

// Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
// * pod whose work is ready.
// * pod past the active deadline.
// * internal modules that request sync of a pod.
func (kl *Kubelet) getPodsToSync() []*api.Pod {
allPods := kl.podManager.GetPods()
Expand All @@ -2109,12 +2096,6 @@ func (kl *Kubelet) getPodsToSync() []*api.Pod {
}
var podsToSync []*api.Pod
for _, pod := range allPods {
// TODO: move active deadline code into a sync/evict pattern
if kl.pastActiveDeadline(pod) {
// The pod has passed the active deadline
podsToSync = append(podsToSync, pod)
continue
}
if podUIDSet.Has(string(pod.UID)) {
// The work of the pod is ready
podsToSync = append(podsToSync, pod)
Expand Down Expand Up @@ -3546,17 +3527,6 @@ func (kl *Kubelet) generateAPIPodStatus(pod *api.Pod, podStatus *kubecontainer.P
}
}

// TODO: Consider include the container information.
// TODO: move this into a sync/evictor
if kl.pastActiveDeadline(pod) {
reason := "DeadlineExceeded"
kl.recorder.Eventf(pod, api.EventTypeNormal, reason, "Pod was active on the node longer than specified deadline")
return api.PodStatus{
Phase: api.PodFailed,
Reason: reason,
Message: "Pod was active on the node longer than specified deadline"}
}

s := kl.convertStatusToAPIStatus(pod, podStatus)

// Assume info is ready to process
Expand Down
38 changes: 10 additions & 28 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func newTestKubeletWithImageList(
fakeRecorder := &record.FakeRecorder{}
fakeKubeClient := &fake.Clientset{}
kubelet := &Kubelet{}
kubelet.recorder = fakeRecorder
kubelet.kubeClient = fakeKubeClient
kubelet.os = &containertest.FakeOS{}

Expand Down Expand Up @@ -305,6 +306,14 @@ func newTestKubeletWithImageList(
t.Fatalf("failed to initialize volume manager: %v", err)
}

// enable active deadline handler
activeDeadlineHandler, err := newActiveDeadlineHandler(kubelet.statusManager, kubelet.recorder, kubelet.clock)
if err != nil {
t.Fatalf("can't initialize active deadline handler: %v", err)
}
kubelet.AddPodSyncLoopHandler(activeDeadlineHandler)
kubelet.AddPodSyncHandler(activeDeadlineHandler)

return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock, nil, plug}
}

Expand Down Expand Up @@ -3891,33 +3900,6 @@ func TestMakePortMappings(t *testing.T) {
}
}

func TestIsPodPastActiveDeadline(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
pods := newTestPods(5)

exceededActiveDeadlineSeconds := int64(30)
notYetActiveDeadlineSeconds := int64(120)
now := unversioned.Now()
startTime := unversioned.NewTime(now.Time.Add(-1 * time.Minute))
pods[0].Status.StartTime = &startTime
pods[0].Spec.ActiveDeadlineSeconds = &exceededActiveDeadlineSeconds
pods[1].Status.StartTime = &startTime
pods[1].Spec.ActiveDeadlineSeconds = &notYetActiveDeadlineSeconds
tests := []struct {
pod *api.Pod
expected bool
}{{pods[0], true}, {pods[1], false}, {pods[2], false}, {pods[3], false}, {pods[4], false}}

kubelet.podManager.SetPods(pods)
for i, tt := range tests {
actual := kubelet.pastActiveDeadline(tt.pod)
if actual != tt.expected {
t.Errorf("[%d] expected %#v, got %#v", i, tt.expected, actual)
}
}
}

func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
fakeRuntime := testKubelet.fakeRuntime
Expand Down Expand Up @@ -3965,7 +3947,7 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
t.Errorf("expected to found status for pod %q", pods[0].UID)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q, ot %q.", api.PodFailed, status.Phase)
t.Fatalf("expected pod status %q, got %q.", api.PodFailed, status.Phase)
}
}

Expand Down
16 changes: 11 additions & 5 deletions pkg/kubelet/status/status_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,22 @@ type manager struct {
apiStatusVersions map[types.UID]uint64
}

// status.Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with
// PodStatusProvider knows how to provide status for a pod. It's intended to be used by other components
// that need to introspect status.
type PodStatusProvider interface {
// GetPodStatus returns the cached status for the provided pod UID, as well as whether it
// was a cache hit.
GetPodStatus(uid types.UID) (api.PodStatus, bool)
}

// Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with
// the latest api.PodStatus. It also syncs updates back to the API server.
type Manager interface {
PodStatusProvider

// Start the API server status sync loop.
Start()

// GetPodStatus returns the cached status for the provided pod UID, as well as whether it
// was a cache hit.
GetPodStatus(uid types.UID) (api.PodStatus, bool)

// SetPodStatus caches updates the cached status for the given pod, and triggers a status update.
SetPodStatus(pod *api.Pod, status api.PodStatus)

Expand Down