Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #31388 #35572 #35816

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 14 additions & 13 deletions pkg/kubelet/network/hostport/hostport.go
Expand Up @@ -42,11 +42,11 @@ const (
)

type HostportHandler interface {
OpenPodHostportsAndSync(newPod *RunningPod, natInterfaceName string, runningPods []*RunningPod) error
SyncHostports(natInterfaceName string, runningPods []*RunningPod) error
OpenPodHostportsAndSync(newPod *ActivePod, natInterfaceName string, activePods []*ActivePod) error
SyncHostports(natInterfaceName string, activePods []*ActivePod) error
}

type RunningPod struct {
type ActivePod struct {
Pod *api.Pod
IP net.IP
}
Expand Down Expand Up @@ -131,9 +131,9 @@ func (h *handler) openHostports(pod *api.Pod) error {
// gatherAllHostports returns all hostports that should be presented on node,
// given the list of pods running on that node and ignoring host network
// pods (which don't need hostport <-> container port mapping).
func gatherAllHostports(runningPods []*RunningPod) (map[api.ContainerPort]targetPod, error) {
func gatherAllHostports(activePods []*ActivePod) (map[api.ContainerPort]targetPod, error) {
podHostportMap := make(map[api.ContainerPort]targetPod)
for _, r := range runningPods {
for _, r := range activePods {
if r.IP.To4() == nil {
return nil, fmt.Errorf("Invalid or missing pod %s IP", kubecontainer.GetPodFullName(r.Pod))
}
Expand Down Expand Up @@ -171,36 +171,37 @@ func hostportChainName(cp api.ContainerPort, podFullName string) utiliptables.Ch
}

// OpenPodHostportsAndSync opens hostports for a new pod, gathers all hostports on
// node, sets up iptables rules enable them. And finally clean up stale hostports
func (h *handler) OpenPodHostportsAndSync(newPod *RunningPod, natInterfaceName string, runningPods []*RunningPod) error {
// node, sets up iptables rules enable them. And finally clean up stale hostports.
// 'newPod' must also be present in 'activePods'.
func (h *handler) OpenPodHostportsAndSync(newPod *ActivePod, natInterfaceName string, activePods []*ActivePod) error {
// try to open pod host port if specified
if err := h.openHostports(newPod.Pod); err != nil {
return err
}

// Add the new pod to running pods if it's not running already (e.g. in rkt's case).
// Add the new pod to active pods if it's not present.
var found bool
for _, p := range runningPods {
for _, p := range activePods {
if p.Pod.UID == newPod.Pod.UID {
found = true
break
}
}
if !found {
runningPods = append(runningPods, newPod)
activePods = append(activePods, newPod)
}

return h.SyncHostports(natInterfaceName, runningPods)
return h.SyncHostports(natInterfaceName, activePods)
}

// SyncHostports gathers all hostports on node and setup iptables rules enable them. And finally clean up stale hostports
func (h *handler) SyncHostports(natInterfaceName string, runningPods []*RunningPod) error {
func (h *handler) SyncHostports(natInterfaceName string, activePods []*ActivePod) error {
start := time.Now()
defer func() {
glog.V(4).Infof("syncHostportsRules took %v", time.Since(start))
}()

containerPortMap, err := gatherAllHostports(runningPods)
containerPortMap, err := gatherAllHostports(activePods)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kubelet/network/hostport/hostport_test.go
Expand Up @@ -158,7 +158,7 @@ func TestOpenPodHostports(t *testing.T) {
},
}

runningPods := make([]*RunningPod, 0)
activePods := make([]*ActivePod, 0)

// Fill in any match rules missing chain names
for _, test := range tests {
Expand All @@ -179,13 +179,13 @@ func TestOpenPodHostports(t *testing.T) {
}
}
}
runningPods = append(runningPods, &RunningPod{
activePods = append(activePods, &ActivePod{
Pod: test.pod,
IP: net.ParseIP(test.ip),
})
}

err := h.OpenPodHostportsAndSync(&RunningPod{Pod: tests[0].pod, IP: net.ParseIP(tests[0].ip)}, "br0", runningPods)
err := h.OpenPodHostportsAndSync(&ActivePod{Pod: tests[0].pod, IP: net.ParseIP(tests[0].ip)}, "br0", activePods)
if err != nil {
t.Fatalf("Failed to OpenPodHostportsAndSync: %v", err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kubelet/network/hostport/testing/fake.go
Expand Up @@ -29,12 +29,12 @@ func NewFakeHostportHandler() hostport.HostportHandler {
return &fakeHandler{}
}

func (h *fakeHandler) OpenPodHostportsAndSync(newPod *hostport.RunningPod, natInterfaceName string, runningPods []*hostport.RunningPod) error {
return h.SyncHostports(natInterfaceName, append(runningPods, newPod))
func (h *fakeHandler) OpenPodHostportsAndSync(newPod *hostport.ActivePod, natInterfaceName string, activePods []*hostport.ActivePod) error {
return h.SyncHostports(natInterfaceName, activePods)
}

func (h *fakeHandler) SyncHostports(natInterfaceName string, runningPods []*hostport.RunningPod) error {
for _, r := range runningPods {
func (h *fakeHandler) SyncHostports(natInterfaceName string, activePods []*hostport.ActivePod) error {
for _, r := range activePods {
if r.IP.To4() == nil {
return fmt.Errorf("Invalid or missing pod %s IP", kubecontainer.GetPodFullName(r.Pod))
}
Expand Down
142 changes: 128 additions & 14 deletions pkg/kubelet/network/kubenet/kubenet_linux.go
Expand Up @@ -20,7 +20,9 @@ package kubenet

import (
"fmt"
"io/ioutil"
"net"
"path/filepath"
"strings"
"sync"
"syscall"
Expand All @@ -45,8 +47,9 @@ import (
utilsets "k8s.io/kubernetes/pkg/util/sets"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"

"k8s.io/kubernetes/pkg/kubelet/network/hostport"
"strconv"

"k8s.io/kubernetes/pkg/kubelet/network/hostport"
)

const (
Expand All @@ -67,6 +70,10 @@ const (

// ebtables Chain to store dedup rules
dedupChain = utilebtables.Chain("KUBE-DEDUP")

// defaultIPAMDir is the default location for the checkpoint files stored by host-local ipam
// https://github.com/containernetworking/cni/tree/master/plugins/ipam/host-local#backends
defaultIPAMDir = "/var/lib/cni/networks"
)

// CNI plugins required by kubenet in /opt/cni/bin or vendor directory
Expand Down Expand Up @@ -394,13 +401,13 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube
plugin.podIPs[id] = ip4.String()

// Open any hostports the pod's containers want
runningPods, err := plugin.getRunningPods()
activePods, err := plugin.getActivePods()
if err != nil {
return err
}

newPod := &hostport.RunningPod{Pod: pod, IP: ip4}
if err := plugin.hostportHandler.OpenPodHostportsAndSync(newPod, BridgeName, runningPods); err != nil {
newPod := &hostport.ActivePod{Pod: pod, IP: ip4}
if err := plugin.hostportHandler.OpenPodHostportsAndSync(newPod, BridgeName, activePods); err != nil {
return err
}

Expand Down Expand Up @@ -432,6 +439,16 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
// Not a hard error or warning
glog.V(4).Infof("Failed to clean up %s/%s after SetUpPod failure: %v", namespace, name, err)
}

// TODO: Remove this hack once we've figured out how to retrieve the netns
// of an exited container. Currently, restarting docker will leak a bunch of
// ips. This will exhaust available ip space unless we cleanup old ips. At the
// same time we don't want to try GC'ing them periodically as that could lead
// to a performance regression in starting pods. So on each setup failure, try
// GC on the assumption that the kubelet is going to retry pod creation, and
// when it does, there will be ips.
plugin.ipamGarbageCollection()

return err
}

Expand Down Expand Up @@ -468,9 +485,9 @@ func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id k
}
}

runningPods, err := plugin.getRunningPods()
activePods, err := plugin.getActivePods()
if err == nil {
err = plugin.hostportHandler.SyncHostports(BridgeName, runningPods)
err = plugin.hostportHandler.SyncHostports(BridgeName, activePods)
}
if err != nil {
errList = append(errList, err)
Expand Down Expand Up @@ -571,15 +588,31 @@ func (plugin *kubenetNetworkPlugin) checkCNIPluginInDir(dir string) bool {
return true
}

// Returns a list of pods running on this node and each pod's IP address. Assumes
// PodSpecs retrieved from the runtime include the name and ID of containers in
// each pod.
func (plugin *kubenetNetworkPlugin) getRunningPods() ([]*hostport.RunningPod, error) {
pods, err := plugin.host.GetRuntime().GetPods(false)
// getNonExitedPods returns a list of pods that have at least one running container.
func (plugin *kubenetNetworkPlugin) getNonExitedPods() ([]*kubecontainer.Pod, error) {
ret := []*kubecontainer.Pod{}
pods, err := plugin.host.GetRuntime().GetPods(true)
if err != nil {
return nil, fmt.Errorf("Failed to retrieve pods from runtime: %v", err)
}
runningPods := make([]*hostport.RunningPod, 0)
for _, p := range pods {
if podIsExited(p) {
continue
}
ret = append(ret, p)
}
return ret, nil
}

// Returns a list of pods running or ready to run on this node and each pod's IP address.
// Assumes PodSpecs retrieved from the runtime include the name and ID of containers in
// each pod.
func (plugin *kubenetNetworkPlugin) getActivePods() ([]*hostport.ActivePod, error) {
pods, err := plugin.getNonExitedPods()
if err != nil {
return nil, err
}
activePods := make([]*hostport.ActivePod, 0)
for _, p := range pods {
containerID, err := plugin.host.GetRuntime().GetPodContainerID(p)
if err != nil {
Expand All @@ -594,13 +627,94 @@ func (plugin *kubenetNetworkPlugin) getRunningPods() ([]*hostport.RunningPod, er
continue
}
if pod, ok := plugin.host.GetPodByName(p.Namespace, p.Name); ok {
runningPods = append(runningPods, &hostport.RunningPod{
activePods = append(activePods, &hostport.ActivePod{
Pod: pod,
IP: podIP,
})
}
}
return runningPods, nil
return activePods, nil
}

// ipamGarbageCollection will release unused IP.
// kubenet uses the CNI bridge plugin, which stores allocated ips on file. Each
// file created under defaultIPAMDir has the format: ip/container-hash. So this
// routine looks for hashes that are not reported by the currently running docker,
// and invokes DelNetwork on each one. Note that this will only work for the
// current CNI bridge plugin, because we have no way of finding the NetNs.
func (plugin *kubenetNetworkPlugin) ipamGarbageCollection() {
glog.V(2).Infof("Starting IP garbage collection")

ipamDir := filepath.Join(defaultIPAMDir, KubenetPluginName)
files, err := ioutil.ReadDir(ipamDir)
if err != nil {
glog.Errorf("Failed to list files in %q: %v", ipamDir, err)
return
}

// gather containerIDs for allocated ips
ipContainerIdMap := make(map[string]string)
for _, file := range files {
// skip non checkpoint file
if ip := net.ParseIP(file.Name()); ip == nil {
continue
}

content, err := ioutil.ReadFile(filepath.Join(ipamDir, file.Name()))
if err != nil {
glog.Errorf("Failed to read file %v: %v", file, err)
}
ipContainerIdMap[file.Name()] = strings.TrimSpace(string(content))
}

// gather infra container IDs of current running Pods
runningContainerIDs := utilsets.String{}
pods, err := plugin.getNonExitedPods()
if err != nil {
glog.Errorf("Failed to get pods: %v", err)
return
}
for _, pod := range pods {
containerID, err := plugin.host.GetRuntime().GetPodContainerID(pod)
if err != nil {
glog.Warningf("Failed to get infra containerID of %q/%q: %v", pod.Namespace, pod.Name, err)
continue
}

runningContainerIDs.Insert(strings.TrimSpace(containerID.ID))
}

// release leaked ips
for ip, containerID := range ipContainerIdMap {
// if the container is not running, release IP
if runningContainerIDs.Has(containerID) {
continue
}
// CNI requires all config to be presented, although only containerID is needed in this case
rt := &libcni.RuntimeConf{
ContainerID: containerID,
IfName: network.DefaultInterfaceName,
// TODO: How do we find the NetNs of an exited container? docker inspect
// doesn't show us the pid, so we probably need to checkpoint
NetNS: "",
}

glog.V(2).Infof("Releasing IP %q allocated to %q.", ip, containerID)
// CNI bridge plugin should try to release IP and then return
if err := plugin.cniConfig.DelNetwork(plugin.netConfig, rt); err != nil {
glog.Errorf("Error while releasing IP: %v", err)
}
}
}

// podIsExited returns true if the pod is exited (all containers inside are exited).
func podIsExited(p *kubecontainer.Pod) bool {
for _, c := range p.Containers {
if c.State != kubecontainer.ContainerStateExited {
return false
}
}
return true
}

func (plugin *kubenetNetworkPlugin) buildCNIRuntimeConf(ifName string, id kubecontainer.ContainerID) (*libcni.RuntimeConf, error) {
Expand Down