Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flush conntrack state for removed/changed UDP Services #22573

Merged
merged 2 commits into from
Apr 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions build/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,28 +103,28 @@ kube::build::get_docker_wrapped_binaries() {
kube-apiserver,busybox
kube-controller-manager,busybox
kube-scheduler,busybox
kube-proxy,gcr.io/google_containers/debian-iptables-amd64:v2
kube-proxy,gcr.io/google_containers/debian-iptables-amd64:v3
);;
"arm")
local targets=(
kube-apiserver,armel/busybox
kube-controller-manager,armel/busybox
kube-scheduler,armel/busybox
kube-proxy,gcr.io/google_containers/debian-iptables-arm:v2
kube-proxy,gcr.io/google_containers/debian-iptables-arm:v3
);;
"arm64")
local targets=(
kube-apiserver,aarch64/busybox
kube-controller-manager,aarch64/busybox
kube-scheduler,aarch64/busybox
kube-proxy,gcr.io/google_containers/debian-iptables-arm64:v2
kube-proxy,gcr.io/google_containers/debian-iptables-arm64:v3
);;
"ppc64le")
local targets=(
kube-apiserver,ppc64le/busybox
kube-controller-manager,ppc64le/busybox
kube-scheduler,ppc64le/busybox
kube-proxy,gcr.io/google_containers/debian-iptables-ppc64le:v2
kube-proxy,gcr.io/google_containers/debian-iptables-ppc64le:v3
);;
esac

Expand Down
1 change: 1 addition & 0 deletions build/debian-iptables/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ CROSS_BUILD_COPY qemu-ARCH-static /usr/bin/
# cleanup has no effect.
RUN DEBIAN_FRONTEND=noninteractive apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get install -y iptables \
&& DEBIAN_FRONTEND=noninteractive apt-get install -y conntrack \
&& rm -rf /var/lib/apt/lists/*
2 changes: 1 addition & 1 deletion build/debian-iptables/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

REGISTRY?="gcr.io/google_containers"
IMAGE=debian-iptables
TAG=v2
TAG=v3
ARCH?=amd64
TEMP_DIR:=$(shell mktemp -d)

Expand Down
79 changes: 79 additions & 0 deletions pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"k8s.io/kubernetes/pkg/types"
utilexec "k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/slice"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
)
Expand Down Expand Up @@ -160,6 +161,7 @@ type Proxier struct {
iptables utiliptables.Interface
masqueradeAll bool
masqueradeMark string
exec utilexec.Interface
}

type localPort struct {
Expand Down Expand Up @@ -220,6 +222,7 @@ func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: exec,
}, nil
}

Expand Down Expand Up @@ -434,15 +437,21 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
}
}

staleUDPServices := sets.NewString()
// Remove services missing from the update.
for name := range proxier.serviceMap {
if !activeServices[name] {
glog.V(1).Infof("Removing service %q", name)
if proxier.serviceMap[name].protocol == api.ProtocolUDP {
staleUDPServices.Insert(proxier.serviceMap[name].clusterIP.String())
}
delete(proxier.serviceMap, name)
}
}

proxier.syncProxyRules()
proxier.deleteServiceConnections(staleUDPServices.List())

}

// OnEndpointsUpdate takes in a slice of updated endpoints.
Expand All @@ -457,6 +466,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
proxier.haveReceivedEndpointsUpdate = true

activeEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set
staleConnections := make(map[endpointServicePair]bool)

// Update endpoints for services.
for i := range allEndpoints {
Expand All @@ -480,7 +490,12 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname}
curEndpoints := proxier.endpointsMap[svcPort]
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])

if len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
removedEndpoints := getRemovedEndpoints(curEndpoints, newEndpoints)
for _, ep := range removedEndpoints {
staleConnections[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true
}
glog.V(1).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints)
proxier.endpointsMap[svcPort] = newEndpoints
}
Expand All @@ -491,12 +506,18 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
// Remove endpoints missing from the update.
for name := range proxier.endpointsMap {
if !activeEndpoints[name] {
// record endpoints of unactive service to stale connections
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we handle the case : no endpoints deleted, but a service got deleted ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good Catch! Added logic to clean up stale service connections

for _, ep := range proxier.endpointsMap[name] {
staleConnections[endpointServicePair{endpoint: ep, servicePortName: name}] = true
}

glog.V(2).Infof("Removing endpoints for %q", name)
delete(proxier.endpointsMap, name)
}
}

proxier.syncProxyRules()
proxier.deleteEndpointConnections(staleConnections)
}

// used in OnEndpointsUpdate
Expand Down Expand Up @@ -552,6 +573,64 @@ func servicePortEndpointChainName(s proxy.ServicePortName, protocol string, endp
return utiliptables.Chain("KUBE-SEP-" + encoded[:16])
}

// getRemovedEndpoints returns the endpoint IPs that are missing in the new endpoints
func getRemovedEndpoints(curEndpoints, newEndpoints []string) []string {
return sets.NewString(curEndpoints...).Difference(sets.NewString(newEndpoints...)).List()
}

type endpointServicePair struct {
endpoint string
servicePortName proxy.ServicePortName
}

const noConnectionToDelete = "0 flow entries have been deleted"

// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// risk sending more traffic to it, all of which will be lost (because UDP).
// This assumes the proxier mutex is held
func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServicePair]bool) {
for epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP {
endpointIP := strings.Split(epSvcPair.endpoint, ":")[0]
glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP)
err := proxier.execConntrackTool("-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp")
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
// is expensive to baby sit all udp connections to kubernetes services.
glog.Errorf("conntrack return with error: %v", err)
}
}
}
}

// deleteServiceConnection use conntrack-tool to delete UDP connection specified by service ip
func (proxier *Proxier) deleteServiceConnections(svcIPs []string) {
for _, ip := range svcIPs {
glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip)
err := proxier.execConntrackTool("-D", "--orig-dst", ip, "-p", "udp")
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
// is expensive to baby sit all udp connections to kubernetes services.
glog.Errorf("conntrack return with error: %v", err)
}
}
}

//execConntrackTool executes conntrack tool using given paramters
func (proxier *Proxier) execConntrackTool(parameters ...string) error {
conntrackPath, err := proxier.exec.LookPath("conntrack")
if err != nil {
return fmt.Errorf("Error looking for path of conntrack: %v", err)
}
output, err := proxier.exec.Command(conntrackPath, parameters...).CombinedOutput()
if err != nil {
return fmt.Errorf("Conntrack command returned: %q, error message: %s", string(output), err)
}
return nil
}

// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// assumes proxier.mu is held
Expand Down