Skip to content

Commit

Permalink
wait for container cleanup before deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Aug 9, 2017
1 parent 9b6e6f5 commit 1d9e92a
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 75 deletions.
4 changes: 2 additions & 2 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Dep
klet.livenessManager,
containerRefManager,
machineInfo,
klet.podManager,
klet,
kubeDeps.OSInterface,
klet,
httpClient,
Expand Down Expand Up @@ -636,7 +636,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Dep
klet,
kubeDeps.Recorder,
containerRefManager,
klet.podManager,
klet,
klet.livenessManager,
httpClient,
klet.networkPlugin,
Expand Down
27 changes: 27 additions & 0 deletions pkg/kubelet/kubelet_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
Expand Down Expand Up @@ -741,6 +742,22 @@ func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
return status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(status.ContainerStatuses))
}

// podIsDeleted returns true if the pod is deleted. For the pod to be deleted, either:
// 1. The pod object is deleted
// 2. The pod's status is evicted
// 3. The pod's deletion timestamp is set, and containers are not running
func (kl *Kubelet) IsPodDeleted(uid types.UID) bool {
pod, podFound := kl.podManager.GetPodByUID(uid)
if !podFound {
return true
}
status, statusFound := kl.statusManager.GetPodStatus(pod.UID)
if !statusFound {
status = pod.Status
}
return eviction.PodIsEvicted(status) || (pod.DeletionTimestamp != nil && notRunning(status.ContainerStatuses))
}

// PodResourcesAreReclaimed returns true if all required node-level resources that a pod was consuming have
// been reclaimed by the kubelet. Reclaiming resources is a prerequisite to deleting a pod from the API server.
func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
Expand All @@ -749,6 +766,16 @@ func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bo
glog.V(3).Infof("Pod %q is terminated, but some containers are still running", format.Pod(pod))
return false
}
// pod's containers should be deleted
runtimeStatus, err := kl.podCache.Get(pod.UID)
if err != nil {
glog.V(3).Infof("Pod %q is terminated, Error getting runtimeStatus from the podCache: %s", format.Pod(pod), err)
return false
}
if len(runtimeStatus.ContainerStatuses) > 0 {
glog.V(3).Infof("Pod %q is terminated, but some containers have not been cleaned up: %+v", format.Pod(pod), runtimeStatus.ContainerStatuses)
return false
}
if kl.podVolumesExist(pod.UID) && !kl.kubeletConfiguration.KeepTerminatedPodVolumes {
// We shouldnt delete pods whose volumes have not been cleaned up if we are not keeping terminated pod volumes
glog.V(3).Infof("Pod %q is terminated, but some volumes have not been cleaned up", format.Pod(pod))
Expand Down
19 changes: 10 additions & 9 deletions pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
Expand All @@ -43,17 +42,19 @@ func (f *fakeHTTP) Get(url string) (*http.Response, error) {
return nil, f.err
}

type fakePodGetter struct {
pods map[types.UID]*v1.Pod
type fakePodDeletionProvider struct {
pods map[types.UID]struct{}
}

func newFakePodGetter() *fakePodGetter {
return &fakePodGetter{make(map[types.UID]*v1.Pod)}
func newFakePodDeletionProvider() *fakePodDeletionProvider {
return &fakePodDeletionProvider{
pods: make(map[types.UID]struct{}),
}
}

func (f *fakePodGetter) GetPodByUID(uid types.UID) (*v1.Pod, bool) {
pod, found := f.pods[uid]
return pod, found
func (f *fakePodDeletionProvider) IsPodDeleted(uid types.UID) bool {
_, found := f.pods[uid]
return !found
}

func NewFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring) (*kubeGenericRuntimeManager, error) {
Expand All @@ -76,7 +77,7 @@ func NewFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS
return nil, err
}

kubeRuntimeManager.containerGC = NewContainerGC(runtimeService, newFakePodGetter(), kubeRuntimeManager)
kubeRuntimeManager.containerGC = NewContainerGC(runtimeService, newFakePodDeletionProvider(), kubeRuntimeManager)
kubeRuntimeManager.runtimeName = typedVersion.RuntimeName
kubeRuntimeManager.imagePuller = images.NewImageManager(
kubecontainer.FilterEventRecorder(recorder),
Expand Down
15 changes: 2 additions & 13 deletions pkg/kubelet/kuberuntime/kuberuntime_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,12 @@ func (m *kubeGenericRuntimeManager) makeMounts(opts *kubecontainer.RunContainerO
func (m *kubeGenericRuntimeManager) getKubeletContainers(allContainers bool) ([]*runtimeapi.Container, error) {
filter := &runtimeapi.ContainerFilter{}
if !allContainers {
runningState := runtimeapi.ContainerState_CONTAINER_RUNNING
filter.State = &runtimeapi.ContainerStateValue{
State: runningState,
State: runtimeapi.ContainerState_CONTAINER_RUNNING,
}
}

containers, err := m.getContainersHelper(filter)
containers, err := m.runtimeService.ListContainers(filter)
if err != nil {
glog.Errorf("getKubeletContainers failed: %v", err)
return nil, err
Expand All @@ -355,16 +354,6 @@ func (m *kubeGenericRuntimeManager) getKubeletContainers(allContainers bool) ([]
return containers, nil
}

// getContainers lists containers by filter.
func (m *kubeGenericRuntimeManager) getContainersHelper(filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error) {
resp, err := m.runtimeService.ListContainers(filter)
if err != nil {
return nil, err
}

return resp, err
}

// makeUID returns a randomly generated string.
func makeUID() string {
return fmt.Sprintf("%08x", rand.Uint32())
Expand Down
29 changes: 10 additions & 19 deletions pkg/kubelet/kuberuntime/kuberuntime_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ import (

// containerGC is the manager of garbage collection.
type containerGC struct {
client internalapi.RuntimeService
manager *kubeGenericRuntimeManager
podGetter podGetter
client internalapi.RuntimeService
manager *kubeGenericRuntimeManager
podDeletionProvider podDeletionProvider
}

// NewContainerGC creates a new containerGC.
func NewContainerGC(client internalapi.RuntimeService, podGetter podGetter, manager *kubeGenericRuntimeManager) *containerGC {
func NewContainerGC(client internalapi.RuntimeService, podDeletionProvider podDeletionProvider, manager *kubeGenericRuntimeManager) *containerGC {
return &containerGC{
client: client,
manager: manager,
podGetter: podGetter,
client: client,
manager: manager,
podDeletionProvider: podDeletionProvider,
}
}

Expand All @@ -52,8 +52,6 @@ type containerGCInfo struct {
id string
// The name of the container.
name string
// The sandbox ID which this container belongs to
sandboxID string
// Creation time for the container.
createTime time.Time
}
Expand Down Expand Up @@ -159,12 +157,6 @@ func (cgc *containerGC) removeSandbox(sandboxID string) {
}
}

// isPodDeleted returns true if the pod is already deleted.
func (cgc *containerGC) isPodDeleted(podUID types.UID) bool {
_, found := cgc.podGetter.GetPodByUID(podUID)
return !found
}

// evictableContainers gets all containers that are evictable. Evictable containers are: not running
// and created more than MinAge ago.
func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByEvictUnit, error) {
Expand All @@ -191,7 +183,6 @@ func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByE
id: container.Id,
name: container.Metadata.Name,
createTime: createdAt,
sandboxID: container.PodSandboxId,
}
key := evictUnit{
uid: labeledInfo.PodUID,
Expand Down Expand Up @@ -219,7 +210,7 @@ func (cgc *containerGC) evictContainers(gcPolicy kubecontainer.ContainerGCPolicy
// Remove deleted pod containers if all sources are ready.
if allSourcesReady {
for key, unit := range evictUnits {
if cgc.isPodDeleted(key.uid) || evictNonDeletedPods {
if cgc.podDeletionProvider.IsPodDeleted(key.uid) || evictNonDeletedPods {
cgc.removeOldestN(unit, len(unit)) // Remove all.
delete(evictUnits, key)
}
Expand Down Expand Up @@ -307,7 +298,7 @@ func (cgc *containerGC) evictSandboxes(evictNonDeletedPods bool) error {
}

for podUID, sandboxes := range sandboxesByPod {
if cgc.isPodDeleted(podUID) || evictNonDeletedPods {
if cgc.podDeletionProvider.IsPodDeleted(podUID) || evictNonDeletedPods {
// Remove all evictable sandboxes if the pod has been removed.
// Note that the latest dead sandbox is also removed if there is
// already an active one.
Expand All @@ -333,7 +324,7 @@ func (cgc *containerGC) evictPodLogsDirectories(allSourcesReady bool) error {
for _, dir := range dirs {
name := dir.Name()
podUID := types.UID(name)
if !cgc.isPodDeleted(podUID) {
if !cgc.podDeletionProvider.IsPodDeleted(podUID) {
continue
}
err := osInterface.RemoveAll(filepath.Join(podLogsRootDirectory, name))
Expand Down
14 changes: 7 additions & 7 deletions pkg/kubelet/kuberuntime/kuberuntime_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ func TestSandboxGC(t *testing.T) {
fakeRuntime, _, m, err := createTestRuntimeManager()
assert.NoError(t, err)

fakePodGetter := m.containerGC.podGetter.(*fakePodGetter)
podDeletionProvider := m.containerGC.podDeletionProvider.(*fakePodDeletionProvider)
makeGCSandbox := func(pod *v1.Pod, attempt uint32, state runtimeapi.PodSandboxState, withPodGetter bool, createdAt int64) sandboxTemplate {
if withPodGetter {
// initialize the pod getter
fakePodGetter.pods[pod.UID] = pod
podDeletionProvider.pods[pod.UID] = struct{}{}
}
return sandboxTemplate{
pod: pod,
Expand Down Expand Up @@ -162,13 +162,13 @@ func TestContainerGC(t *testing.T) {
fakeRuntime, _, m, err := createTestRuntimeManager()
assert.NoError(t, err)

fakePodGetter := m.containerGC.podGetter.(*fakePodGetter)
podDeletionProvider := m.containerGC.podDeletionProvider.(*fakePodDeletionProvider)
makeGCContainer := func(podName, containerName string, attempt int, createdAt int64, state runtimeapi.ContainerState) containerTemplate {
container := makeTestContainer(containerName, "test-image")
pod := makeTestPod(podName, "test-ns", podName, []v1.Container{container})
if podName != "deleted" {
// initialize the pod getter, explicitly exclude deleted pod
fakePodGetter.pods[pod.UID] = pod
podDeletionProvider.pods[pod.UID] = struct{}{}
}
return containerTemplate{
pod: pod,
Expand Down Expand Up @@ -361,11 +361,11 @@ func TestPodLogDirectoryGC(t *testing.T) {
_, _, m, err := createTestRuntimeManager()
assert.NoError(t, err)
fakeOS := m.osInterface.(*containertest.FakeOS)
fakePodGetter := m.containerGC.podGetter.(*fakePodGetter)
podDeletionProvider := m.containerGC.podDeletionProvider.(*fakePodDeletionProvider)

// pod log directories without corresponding pods should be removed.
fakePodGetter.pods["123"] = makeTestPod("foo1", "new", "123", nil)
fakePodGetter.pods["456"] = makeTestPod("foo2", "new", "456", nil)
podDeletionProvider.pods["123"] = struct{}{}
podDeletionProvider.pods["456"] = struct{}{}
files := []string{"123", "456", "789", "012"}
removed := []string{filepath.Join(podLogsRootDirectory, "789"), filepath.Join(podLogsRootDirectory, "012")}

Expand Down
10 changes: 5 additions & 5 deletions pkg/kubelet/kuberuntime/kuberuntime_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ var (
ErrVersionNotSupported = errors.New("Runtime api version is not supported")
)

// A subset of the pod.Manager interface extracted for garbage collection purposes.
type podGetter interface {
GetPodByUID(kubetypes.UID) (*v1.Pod, bool)
// podDeletionProvider can determine if a pod is deleted
type podDeletionProvider interface {
IsPodDeleted(kubetypes.UID) bool
}

type kubeGenericRuntimeManager struct {
Expand Down Expand Up @@ -119,7 +119,7 @@ func NewKubeGenericRuntimeManager(
livenessManager proberesults.Manager,
containerRefManager *kubecontainer.RefManager,
machineInfo *cadvisorapi.MachineInfo,
podGetter podGetter,
podDeletionProvider podDeletionProvider,
osInterface kubecontainer.OSInterface,
runtimeHelper kubecontainer.RuntimeHelper,
httpClient types.HttpGetter,
Expand Down Expand Up @@ -182,7 +182,7 @@ func NewKubeGenericRuntimeManager(
imagePullQPS,
imagePullBurst)
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(httpClient, kubeRuntimeManager, kubeRuntimeManager)
kubeRuntimeManager.containerGC = NewContainerGC(runtimeService, podGetter, kubeRuntimeManager)
kubeRuntimeManager.containerGC = NewContainerGC(runtimeService, podDeletionProvider, kubeRuntimeManager)

kubeRuntimeManager.versionCache = cache.NewObjectCache(
func() (interface{}, error) {
Expand Down
17 changes: 9 additions & 8 deletions pkg/kubelet/rkt/fake_rkt_interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
rktapi "github.com/coreos/rkt/api/v1alpha"
"golang.org/x/net/context"
"google.golang.org/grpc"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
kubetypes "k8s.io/apimachinery/pkg/types"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
Expand Down Expand Up @@ -179,17 +178,19 @@ func (f *fakeRktCli) Reset() {
f.err = nil
}

type fakePodGetter struct {
pods map[types.UID]*v1.Pod
type fakePodDeletionProvider struct {
pods map[types.UID]struct{}
}

func newFakePodGetter() *fakePodGetter {
return &fakePodGetter{pods: make(map[types.UID]*v1.Pod)}
func newFakePodDeletionProvider() *fakePodDeletionProvider {
return &fakePodDeletionProvider{
pods: make(map[types.UID]struct{}),
}
}

func (f fakePodGetter) GetPodByUID(uid types.UID) (*v1.Pod, bool) {
p, found := f.pods[uid]
return p, found
func (f *fakePodDeletionProvider) IsPodDeleted(uid types.UID) bool {
_, found := f.pods[uid]
return !found
}

type fakeUnitGetter struct {
Expand Down
15 changes: 7 additions & 8 deletions pkg/kubelet/rkt/rkt.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ type Runtime struct {
dockerKeyring credentialprovider.DockerKeyring

containerRefManager *kubecontainer.RefManager
podGetter podGetter
podDeletionProvider podDeletionProvider
runtimeHelper kubecontainer.RuntimeHelper
recorder record.EventRecorder
livenessManager proberesults.Manager
Expand Down Expand Up @@ -201,9 +201,9 @@ type podServiceDirective struct {
var _ kubecontainer.Runtime = &Runtime{}
var _ kubecontainer.DirectStreamingRuntime = &Runtime{}

// TODO(yifan): This duplicates the podGetter in dockertools.
type podGetter interface {
GetPodByUID(kubetypes.UID) (*v1.Pod, bool)
// podDeletionProvider can determine if a pod is deleted
type podDeletionProvider interface {
IsPodDeleted(kubetypes.UID) bool
}

// cliInterface wrapps the command line calls for testing purpose.
Expand All @@ -228,7 +228,7 @@ func New(
runtimeHelper kubecontainer.RuntimeHelper,
recorder record.EventRecorder,
containerRefManager *kubecontainer.RefManager,
podGetter podGetter,
podDeletionProvider podDeletionProvider,
livenessManager proberesults.Manager,
httpClient types.HttpGetter,
networkPlugin network.NetworkPlugin,
Expand Down Expand Up @@ -285,7 +285,7 @@ func New(
config: config,
dockerKeyring: credentialprovider.NewDockerKeyring(),
containerRefManager: containerRefManager,
podGetter: podGetter,
podDeletionProvider: podDeletionProvider,
runtimeHelper: runtimeHelper,
recorder: recorder,
livenessManager: livenessManager,
Expand Down Expand Up @@ -2020,8 +2020,7 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSo
removeCandidates = append(removeCandidates, pod)
continue
}
_, found := r.podGetter.GetPodByUID(uid)
if !found && allSourcesReady {
if r.podDeletionProvider.IsPodDeleted(uid) && allSourcesReady {
removeCandidates = append(removeCandidates, pod)
continue
}
Expand Down

0 comments on commit 1d9e92a

Please sign in to comment.