Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GC pod ips #35572

Merged
merged 1 commit into from
Oct 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
108 changes: 103 additions & 5 deletions pkg/kubelet/network/kubenet/kubenet_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package kubenet
import (
"fmt"
"net"
"path/filepath"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -69,6 +70,10 @@ const (

// ebtables Chain to store dedup rules
dedupChain = utilebtables.Chain("KUBE-DEDUP")

// defaultIPAMDir is the default location for the checkpoint files stored by host-local ipam
// https://github.com/containernetworking/cni/tree/master/plugins/ipam/host-local#backends
defaultIPAMDir = "/var/lib/cni/networks"
)

// CNI plugins required by kubenet in /opt/cni/bin or vendor directory
Expand Down Expand Up @@ -434,6 +439,16 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
// Not a hard error or warning
glog.V(4).Infof("Failed to clean up %s/%s after SetUpPod failure: %v", namespace, name, err)
}

// TODO: Remove this hack once we've figured out how to retrieve the netns
// of an exited container. Currently, restarting docker will leak a bunch of
// ips. This will exhaust available ip space unless we cleanup old ips. At the
// same time we don't want to try GC'ing them periodically as that could lead
// to a performance regression in starting pods. So on each setup failure, try
// GC on the assumption that the kubelet is going to retry pod creation, and
// when it does, there will be ips.
plugin.ipamGarbageCollection()

return err
}

Expand Down Expand Up @@ -572,20 +587,32 @@ func (plugin *kubenetNetworkPlugin) checkCNIPluginInDir(dir string) bool {
return true
}

// Returns a list of pods running or ready to run on this node and each pod's IP address.
// Assumes PodSpecs retrieved from the runtime include the name and ID of containers in
// each pod.
func (plugin *kubenetNetworkPlugin) getActivePods() ([]*hostport.ActivePod, error) {
// getNonExitedPods returns a list of pods that have at least one running container.
func (plugin *kubenetNetworkPlugin) getNonExitedPods() ([]*kubecontainer.Pod, error) {
ret := []*kubecontainer.Pod{}
pods, err := plugin.host.GetRuntime().GetPods(true)
if err != nil {
return nil, fmt.Errorf("Failed to retrieve pods from runtime: %v", err)
}
activePods := make([]*hostport.ActivePod, 0)
for _, p := range pods {
if podIsExited(p) {
continue
}
ret = append(ret, p)
}
return ret, nil
}

// Returns a list of pods running or ready to run on this node and each pod's IP address.
// Assumes PodSpecs retrieved from the runtime include the name and ID of containers in
// each pod.
func (plugin *kubenetNetworkPlugin) getActivePods() ([]*hostport.ActivePod, error) {
pods, err := plugin.getNonExitedPods()
if err != nil {
return nil, err
}
activePods := make([]*hostport.ActivePod, 0)
for _, p := range pods {
containerID, err := plugin.host.GetRuntime().GetPodContainerID(p)
if err != nil {
continue
Expand All @@ -608,6 +635,77 @@ func (plugin *kubenetNetworkPlugin) getActivePods() ([]*hostport.ActivePod, erro
return activePods, nil
}

// ipamGarbageCollection will release unused IP.
// kubenet uses the CNI bridge plugin, which stores allocated ips on file. Each
// file created under defaultIPAMDir has the format: ip/container-hash. So this
// routine looks for hashes that are not reported by the currently running docker,
// and invokes DelNetwork on each one. Note that this will only work for the
// current CNI bridge plugin, because we have no way of finding the NetNs.
func (plugin *kubenetNetworkPlugin) ipamGarbageCollection() {
glog.V(2).Infof("Starting IP garbage collection")

ipamDir := filepath.Join(defaultIPAMDir, KubenetPluginName)
files, err := ioutil.ReadDir(ipamDir)
if err != nil {
glog.Errorf("Failed to list files in %q: %v", ipamDir, err)
return
}

// gather containerIDs for allocated ips
ipContainerIdMap := make(map[string]string)
for _, file := range files {
// skip non checkpoint file
if ip := net.ParseIP(file.Name()); ip == nil {
continue
}

content, err := ioutil.ReadFile(filepath.Join(ipamDir, file.Name()))
if err != nil {
glog.Errorf("Failed to read file %v: %v", file, err)
}
ipContainerIdMap[file.Name()] = strings.TrimSpace(string(content))
}

// gather infra container IDs of current running Pods
runningContainerIDs := utilsets.String{}
pods, err := plugin.getNonExitedPods()
if err != nil {
glog.Errorf("Failed to get pods: %v", err)
return
}
for _, pod := range pods {
containerID, err := plugin.host.GetRuntime().GetPodContainerID(pod)
if err != nil {
glog.Warningf("Failed to get infra containerID of %q/%q: %v", pod.Namespace, pod.Name, err)
continue
}

runningContainerIDs.Insert(strings.TrimSpace(containerID.ID))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the infra container has already terminated? You probably want to check the container state before inserting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a race we can't easily avoid. If it exits after getNonExitedPods and this line, assume we get a teardown. It's more important that we detect the gargabe in the ip dir.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comment wasn't about the race condition. getNonExitedPods returns pods with at least one running container, which may include a pod with a running user container and a dead infra container. I don't see the state of the infra container being checked anywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that a problem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than the IP used by those infra containers wouldn't be recycled, there is no problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And that pod will get cleaned up the normal way (teardown)? there's no way we restarted an old container because we must've tried the infra container first and failed, so this must be a crashing infra container of a current user pods that will at some point in the future naturally die. no?

}

// release leaked ips
for ip, containerID := range ipContainerIdMap {
// if the container is not running, release IP
if runningContainerIDs.Has(containerID) {
continue
}
// CNI requires all config to be presented, although only containerID is needed in this case
rt := &libcni.RuntimeConf{
ContainerID: containerID,
IfName: network.DefaultInterfaceName,
// TODO: How do we find the NetNs of an exited container? docker inspect
// doesn't show us the pid, so we probably need to checkpoint
NetNS: "",
}

glog.V(2).Infof("Releasing IP %q allocated to %q.", ip, containerID)
// CNI bridge plugin should try to release IP and then return
if err := plugin.cniConfig.DelNetwork(plugin.netConfig, rt); err != nil {
glog.Errorf("Error while releasing IP: %v", err)
}
}
}

// podIsExited returns true if the pod is exited (all containers inside are exited).
func podIsExited(p *kubecontainer.Pod) bool {
for _, c := range p.Containers {
Expand Down
2 changes: 2 additions & 0 deletions test/e2e_node/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ go_test(
"memory_eviction_test.go",
"mirror_pod_test.go",
"resource_usage_test.go",
"restart_test.go",
"runtime_conformance_test.go",
"summary_test.go",
],
Expand Down Expand Up @@ -95,6 +96,7 @@ go_test(
"//test/e2e/common:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/e2e_node/services:go_default_library",
"//test/utils:go_default_library",
"//vendor:github.com/davecgh/go-spew/spew",
"//vendor:github.com/golang/glog",
"//vendor:github.com/onsi/ginkgo",
Expand Down
117 changes: 117 additions & 0 deletions test/e2e_node/restart_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
Copyright 2015 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2e_node

import (
"k8s.io/kubernetes/test/e2e/framework"
"time"

"fmt"
. "github.com/onsi/ginkgo"
"k8s.io/kubernetes/pkg/api"
testutils "k8s.io/kubernetes/test/utils"
"os/exec"
)

// waitForPods waits for timeout duration, for pod_count.
// If the timeout is hit, it returns the list of currently running pods.
func waitForPods(f *framework.Framework, pod_count int, timeout time.Duration) (runningPods []*api.Pod) {
for start := time.Now(); time.Since(start) < timeout; time.Sleep(10 * time.Second) {
podList, err := f.PodClient().List(api.ListOptions{})
if err != nil {
framework.Logf("Failed to list pods on node: %v", err)
continue
}

runningPods = []*api.Pod{}
for _, pod := range podList.Items {
if r, err := testutils.PodRunningReady(&pod); err != nil || !r {
continue
}
runningPods = append(runningPods, &pod)
}
framework.Logf("Running pod count %d", len(runningPods))
if len(runningPods) >= pod_count {
break
}
}
return runningPods
}

var _ = framework.KubeDescribe("Restart [Serial] [Slow] [Disruptive]", func() {
const (
// Saturate the node. It's not necessary that all these pods enter
// Running/Ready, because we don't know the number of cores in the
// test node or default limits applied (if any). It's is essential
// that no containers end up in terminated. 100 was chosen because
// it's the max pods per node.
podCount = 100
podCreationInterval = 100 * time.Millisecond
recoverTimeout = 5 * time.Minute
startTimeout = 3 * time.Minute
// restartCount is chosen so even with minPods we exhaust the default
// allocation of a /24.
minPods = 50
restartCount = 6
)

f := framework.NewDefaultFramework("restart-test")
Context("Docker Daemon", func() {
Context("Network", func() {
It("should recover from ip leak", func() {

pods := newTestPods(podCount, framework.GetPauseImageNameForHostArch(), "restart-docker-test")
By(fmt.Sprintf("Trying to create %d pods on node", len(pods)))
createBatchPodWithRateControl(f, pods, podCreationInterval)
defer deletePodsSync(f, pods)

// Give the node some time to stabilize, assume pods that enter RunningReady within
// startTimeout fit on the node and the node is now saturated.
runningPods := waitForPods(f, podCount, startTimeout)
if len(runningPods) < minPods {
framework.Failf("Failed to start %d pods, cannot test that restarting docker doesn't leak IPs", minPods)
}

for i := 0; i < restartCount; i += 1 {
By(fmt.Sprintf("Restarting Docker Daemon iteration %d", i))

// TODO: Find a uniform way to deal with systemctl/initctl/service operations. #34494
if stdout, err := exec.Command("sudo", "systemctl", "restart", "docker").CombinedOutput(); err != nil {
framework.Logf("Failed to trigger docker restart with systemd/systemctl: %v, stdout: %q", err, string(stdout))
if stdout, err = exec.Command("sudo", "service", "docker", "restart").CombinedOutput(); err != nil {
framework.Failf("Failed to trigger docker restart with upstart/service: %v, stdout: %q", err, string(stdout))
}
}
time.Sleep(20 * time.Second)
}

By("Checking currently Running/Ready pods")
postRestartRunningPods := waitForPods(f, len(runningPods), recoverTimeout)
if len(postRestartRunningPods) == 0 {
framework.Failf("Failed to start *any* pods after docker restart, this might indicate an IP leak")
}
By("Confirm no containers have terminated")
for _, pod := range postRestartRunningPods {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't there always be terminated containers because of the docker restart?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that's lastState, im checking state which shouldn't be terminated at the end of this experiment (it is in step 3 below):

  1. schedule new pods on a full ip node, they end up with state waiting (containerCreating, no available ips).
  2. If a pod gets a chance to run, we have state running (startedAt blah).
  3. If docker gets bounced and we end up with no available ips, we have state terminated (finishedAt blah), Ready=false.
  4. Now GC runs and frees up some ips (in fact with 100 pods gc keeps running after the 3rd restart).
  5. And we get state running (startedAt blah) with lastState terminated (finishedAt blah), Ready=true.

If GC hadn't run, we would be stuck at (3) a 100 containers with state terminated (finishedAt blah), ready=false.

if c := testutils.TerminatedContainers(pod); len(c) != 0 {
framework.Failf("Pod %q has failed containers %+v after docker restart, this might indicate an IP leak", pod.Name, c)
}
}
By(fmt.Sprintf("Docker restart test passed with %d pods", len(postRestartRunningPods)))
})
})
})
})
1 change: 1 addition & 0 deletions test/test_owners.csv
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ ResourceQuota should create a ResourceQuota and capture the life of a service.,t
ResourceQuota should create a ResourceQuota and ensure its status is promptly calculated.,krousey,1
ResourceQuota should verify ResourceQuota with best effort scope.,mml,1
ResourceQuota should verify ResourceQuota with terminating scopes.,ncdc,1
Restart Docker Daemon Network should recover from ip leak,bprashanth,0
Restart should restart all nodes and ensure all nodes and pods recover,andyzheng0831,1
RethinkDB should create and stop rethinkdb servers,mwielgus,1
SSH should SSH to all nodes and run commands,quinton-hoole,0
Expand Down
17 changes: 17 additions & 0 deletions test/utils/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,23 @@ func FailedContainers(pod *api.Pod) map[string]ContainerFailures {
return states
}

// TerminatedContainers inspects all containers in a pod and returns a map
// of "container name: termination reason", for all currently terminated
// containers.
func TerminatedContainers(pod *api.Pod) map[string]string {
states := make(map[string]string)
statuses := pod.Status.ContainerStatuses
if len(statuses) == 0 {
return states
}
for _, status := range statuses {
if status.State.Terminated != nil {
states[status.Name] = status.State.Terminated.Reason
}
}
return states
}

// PodNotReady checks whether pod p's has a ready condition of status false.
func PodNotReady(p *api.Pod) (bool, error) {
// Check the ready condition is false.
Expand Down