Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract kubelet network code into its own file #28511

Merged
merged 1 commit into from
Jul 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
271 changes: 0 additions & 271 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,40 +545,6 @@ func NewMainKubelet(
return klet, nil
}

// effectiveHairpinMode determines the effective hairpin mode given the
// configured mode, container runtime, and whether cbr0 should be configured.
func effectiveHairpinMode(hairpinMode componentconfig.HairpinMode, containerRuntime string, configureCBR0 bool, networkPlugin string) (componentconfig.HairpinMode, error) {
// The hairpin mode setting doesn't matter if:
// - We're not using a bridge network. This is hard to check because we might
// be using a plugin. It matters if --configure-cbr0=true, and we currently
// don't pipe it down to any plugins.
// - It's set to hairpin-veth for a container runtime that doesn't know how
// to set the hairpin flag on the veth's of containers. Currently the
// docker runtime is the only one that understands this.
// - It's set to "none".
if hairpinMode == componentconfig.PromiscuousBridge || hairpinMode == componentconfig.HairpinVeth {
// Only on docker.
if containerRuntime != "docker" {
glog.Warningf("Hairpin mode set to %q but container runtime is %q, ignoring", hairpinMode, containerRuntime)
return componentconfig.HairpinNone, nil
}
if hairpinMode == componentconfig.PromiscuousBridge && !configureCBR0 && networkPlugin != "kubenet" {
// This is not a valid combination. Users might be using the
// default values (from before the hairpin-mode flag existed) and we
// should keep the old behavior.
glog.Warningf("Hairpin mode set to %q but configureCBR0 is false, falling back to %q", hairpinMode, componentconfig.HairpinVeth)
return componentconfig.HairpinVeth, nil
}
} else if hairpinMode == componentconfig.HairpinNone {
if configureCBR0 {
glog.Warningf("Hairpin mode set to %q and configureCBR0 is true, this might result in loss of hairpin packets", hairpinMode)
}
} else {
return "", fmt.Errorf("unknown value: %q", hairpinMode)
}
return hairpinMode, nil
}

type serviceLister interface {
List() (api.ServiceList, error)
}
Expand Down Expand Up @@ -853,39 +819,6 @@ type Kubelet struct {
enableControllerAttachDetach bool
}

// Validate given node IP belongs to the current host
func (kl *Kubelet) validateNodeIP() error {
if kl.nodeIP == nil {
return nil
}

// Honor IP limitations set in setNodeStatus()
if kl.nodeIP.IsLoopback() {
return fmt.Errorf("nodeIP can't be loopback address")
}
if kl.nodeIP.To4() == nil {
return fmt.Errorf("nodeIP must be IPv4 address")
}

addrs, err := net.InterfaceAddrs()
if err != nil {
return err
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip != nil && ip.Equal(kl.nodeIP) {
return nil
}
}
return fmt.Errorf("Node IP: %q not found in the host's network interfaces", kl.nodeIP.String())
}

// dirExists returns true if the path exists and represents a directory.
func dirExists(path string) bool {
s, err := os.Stat(path)
Expand Down Expand Up @@ -1141,18 +1074,6 @@ func (kl *Kubelet) initialNodeStatus() (*api.Node, error) {
return node, nil
}

func (kl *Kubelet) providerRequiresNetworkingConfiguration() bool {
// TODO: We should have a mechanism to say whether native cloud provider
// is used or whether we are using overlay networking. We should return
// true for cloud providers if they implement Routes() interface and
// we are not using overlay networking.
if kl.cloud == nil || kl.cloud.ProviderName() != "gce" || kl.flannelExperimentalOverlay {
return false
}
_, supported := kl.cloud.Routes()
return supported
}

// registerWithApiserver registers the node with the cluster master. It is safe
// to call multiple times, but not concurrently (kl.registrationCompleted is
// not locked).
Expand Down Expand Up @@ -1718,61 +1639,6 @@ func (kl *Kubelet) GetClusterDNS(pod *api.Pod) ([]string, []string, error) {
return dns, dnsSearch, nil
}

// Returns the list of DNS servers and DNS search domains.
func (kl *Kubelet) parseResolvConf(reader io.Reader) (nameservers []string, searches []string, err error) {
var scrubber dnsScrubber
if kl.cloud != nil {
scrubber = kl.cloud
}
return parseResolvConf(reader, scrubber)
}

// A helper for testing.
type dnsScrubber interface {
ScrubDNS(nameservers, searches []string) (nsOut, srchOut []string)
}

// parseResolveConf reads a resolv.conf file from the given reader, and parses
// it into nameservers and searches, possibly returning an error. The given
// dnsScrubber allows cloud providers to post-process dns names.
// TODO: move to utility package
func parseResolvConf(reader io.Reader, dnsScrubber dnsScrubber) (nameservers []string, searches []string, err error) {
file, err := ioutil.ReadAll(reader)
if err != nil {
return nil, nil, err
}

// Lines of the form "nameserver 1.2.3.4" accumulate.
nameservers = []string{}

// Lines of the form "search example.com" overrule - last one wins.
searches = []string{}

lines := strings.Split(string(file), "\n")
for l := range lines {
trimmed := strings.TrimSpace(lines[l])
if strings.HasPrefix(trimmed, "#") {
continue
}
fields := strings.Fields(trimmed)
if len(fields) == 0 {
continue
}
if fields[0] == "nameserver" {
nameservers = append(nameservers, fields[1:]...)
}
if fields[0] == "search" {
searches = fields[1:]
}
}

// Give the cloud-provider a chance to post-process DNS settings.
if dnsScrubber != nil {
nameservers, searches = dnsScrubber.ScrubDNS(nameservers, searches)
}
return nameservers, searches, nil
}

// One of the following aruguements must be non-nil: runningPod, status.
// TODO: Modify containerRuntime.KillPod() to accept the right arguments.
func (kl *Kubelet) killPod(pod *api.Pod, runningPod *kubecontainer.Pod, status *kubecontainer.PodStatus, gracePeriodOverride *int64) error {
Expand Down Expand Up @@ -2039,51 +1905,6 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(
return utilerrors.NewAggregate(errlist)
}

// cleanupBandwidthLimits updates the status of bandwidth-limited containers
// and ensures that only the the appropriate CIDRs are active on the node.
func (kl *Kubelet) cleanupBandwidthLimits(allPods []*api.Pod) error {
if kl.shaper == nil {
return nil
}
currentCIDRs, err := kl.shaper.GetCIDRs()
if err != nil {
return err
}
possibleCIDRs := sets.String{}
for ix := range allPods {
pod := allPods[ix]
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations)
if err != nil {
return err
}
if ingress == nil && egress == nil {
glog.V(8).Infof("Not a bandwidth limited container...")
continue
}
status, found := kl.statusManager.GetPodStatus(pod.UID)
if !found {
// TODO(random-liu): Cleanup status get functions. (issue #20477)
s, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
if err != nil {
return err
}
status = kl.generateAPIPodStatus(pod, s)
}
if status.Phase == api.PodRunning {
possibleCIDRs.Insert(fmt.Sprintf("%s/32", status.PodIP))
}
}
for _, cidr := range currentCIDRs {
if !possibleCIDRs.Has(cidr) {
glog.V(2).Infof("Removing CIDR: %s (%v)", cidr, possibleCIDRs)
if err := kl.shaper.Reset(cidr); err != nil {
return err
}
}
}
return nil
}

// Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
// * pod whose work is ready.
// * internal modules that request sync of a pod.
Expand Down Expand Up @@ -2820,33 +2641,6 @@ func (kl *Kubelet) updateRuntimeUp() {
kl.runtimeState.setRuntimeSync(kl.clock.Now())
}

// TODO: remove when kubenet plugin is ready
// NOTE!!! if you make changes here, also make them to kubenet
func (kl *Kubelet) reconcileCBR0(podCIDR string) error {
if podCIDR == "" {
glog.V(5).Info("PodCIDR not set. Will not configure cbr0.")
return nil
}
glog.V(5).Infof("PodCIDR is set to %q", podCIDR)
_, cidr, err := net.ParseCIDR(podCIDR)
if err != nil {
return err
}
// Set cbr0 interface address to first address in IPNet
cidr.IP.To4()[3] += 1
if err := ensureCbr0(cidr, kl.hairpinMode == componentconfig.PromiscuousBridge, kl.babysitDaemons); err != nil {
return err
}
if kl.shapingEnabled() {
if kl.shaper == nil {
glog.V(5).Info("Shaper is nil, creating")
kl.shaper = bandwidth.NewTCShaper("cbr0")
}
return kl.shaper.ReconcileInterface()
}
return nil
}

// updateNodeStatus updates node status to master with retries.
func (kl *Kubelet) updateNodeStatus() error {
for i := 0; i < nodeStatusUpdateRetry; i++ {
Expand All @@ -2868,45 +2662,6 @@ func (kl *Kubelet) recordNodeStatusEvent(eventtype, event string) {
kl.recorder.Eventf(kl.nodeRef, eventtype, event, "Node %s status is now: %s", kl.nodeName, event)
}

// syncNetworkStatus updates the network state, ensuring that the network is
// configured correctly if the kubelet is set to configure cbr0:
// * handshake flannel helper if the flannel experimental overlay is being used.
// * ensure that iptables masq rules are setup
// * reconcile cbr0 with the pod CIDR
func (kl *Kubelet) syncNetworkStatus() {
var err error
if kl.configureCBR0 {
if kl.flannelExperimentalOverlay {
podCIDR, err := kl.flannelHelper.Handshake()
if err != nil {
glog.Infof("Flannel server handshake failed %v", err)
return
}
kl.updatePodCIDR(podCIDR)
}
if err := ensureIPTablesMasqRule(kl.nonMasqueradeCIDR); err != nil {
err = fmt.Errorf("Error on adding ip table rules: %v", err)
glog.Error(err)
kl.runtimeState.setNetworkState(err)
return
}
podCIDR := kl.runtimeState.podCIDR()
if len(podCIDR) == 0 {
err = fmt.Errorf("ConfigureCBR0 requested, but PodCIDR not set. Will not configure CBR0 right now")
glog.Warning(err)
} else if err = kl.reconcileCBR0(podCIDR); err != nil {
err = fmt.Errorf("Error configuring cbr0: %v", err)
glog.Error(err)
}
if err != nil {
kl.runtimeState.setNetworkState(err)
return
}
}

kl.runtimeState.setNetworkState(kl.networkPlugin.Status())
}

// Set addresses for the node.
func (kl *Kubelet) setNodeAddress(node *api.Node) error {
// Set addresses for the node.
Expand Down Expand Up @@ -3826,29 +3581,3 @@ func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.
func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, kl.containerRuntime)
}

// updatePodCIDR updates the pod CIDR in the runtime state if it is different
// from the current CIDR.
func (kl *Kubelet) updatePodCIDR(cidr string) {
if kl.runtimeState.podCIDR() == cidr {
return
}

glog.Infof("Setting Pod CIDR: %v -> %v", kl.runtimeState.podCIDR(), cidr)
kl.runtimeState.setPodCIDR(cidr)

if kl.networkPlugin != nil {
details := make(map[string]interface{})
details[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = cidr
kl.networkPlugin.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, details)
}
}

// shapingEnabled returns whether traffic shaping is enabled.
func (kl *Kubelet) shapingEnabled() bool {
// Disable shaping if a network plugin is defined and supports shaping
if kl.networkPlugin != nil && kl.networkPlugin.Capabilities().Has(network.NET_PLUGIN_CAPABILITY_SHAPING) {
return false
}
return true
}