Skip to content

Commit

Permalink
Merge pull request kube-vip#777 from thebsdbox/egress_fix
Browse files Browse the repository at this point in the history
Fixes conntrack deleting wrong connections and cleaning old SNAT rules
  • Loading branch information
thebsdbox committed Mar 10, 2024
2 parents 2c4ff69 + 76cf59c commit 1ea277f
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 11 deletions.
4 changes: 2 additions & 2 deletions pkg/manager/service_egress.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (sm *Manager) configureEgress(vipIP, podIP, destinationPorts, namespace str
}
}
//_ = i.DumpChain(vip.MangleChainName)
err = vip.DeleteExistingSessions(podIP, false)
err = vip.DeleteExistingSessions(podIP, false, destinationPorts, "")
if err != nil {
return err
}
Expand Down Expand Up @@ -237,7 +237,7 @@ func (sm *Manager) TeardownEgress(podIP, vipIP, destinationPorts, namespace stri
return fmt.Errorf("error changing iptables rules for egress [%s]", err)
}
}
err = vip.DeleteExistingSessions(podIP, false)
err = vip.DeleteExistingSessions(podIP, false, destinationPorts, "")
if err != nil {
return fmt.Errorf("error changing iptables rules for egress [%s]", err)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/manager/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,14 @@ func (sm *Manager) addService(svc *v1.Service) error {

// Check if we need to flush any conntrack connections (due to some dangling conntrack connections)
if svc.Annotations[flushContrack] == "true" {

log.Debugf("Flushing conntrack rules for service [%s]", svc.Name)
for _, serviceIP := range serviceIPs {
err = vip.DeleteExistingSessions(serviceIP, false)
err = vip.DeleteExistingSessions(serviceIP, false, svc.Annotations[egressDestinationPorts], svc.Annotations[egressSourcePorts])
if err != nil {
log.Errorf("Error flushing any remaining egress connections [%s]", err)
}
err = vip.DeleteExistingSessions(serviceIP, true)
err = vip.DeleteExistingSessions(serviceIP, true, svc.Annotations[egressDestinationPorts], svc.Annotations[egressSourcePorts])
if err != nil {
log.Errorf("Error flushing any remaining ingress connections [%s]", err)
}
Expand Down
144 changes: 144 additions & 0 deletions pkg/vip/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package vip

// The below is taken from the net/internal package (why this is internal i'm not 100% sure)

// Protocol Numbers, Updated: 2017-10-13
const (
ProtocolIP = 0 // IPv4 encapsulation, pseudo protocol number
ProtocolHOPOPT = 0 // IPv6 Hop-by-Hop Option
ProtocolICMP = 1 // Internet Control Message
ProtocolIGMP = 2 // Internet Group Management
ProtocolGGP = 3 // Gateway-to-Gateway
ProtocolIPv4 = 4 // IPv4 encapsulation
ProtocolST = 5 // Stream
ProtocolTCP = 6 // Transmission Control
ProtocolCBT = 7 // CBT
ProtocolEGP = 8 // Exterior Gateway Protocol
ProtocolIGP = 9 // any private interior gateway (used by Cisco for their IGRP)
ProtocolBBNRCCMON = 10 // BBN RCC Monitoring
ProtocolNVPII = 11 // Network Voice Protocol
ProtocolPUP = 12 // PUP
ProtocolEMCON = 14 // EMCON
ProtocolXNET = 15 // Cross Net Debugger
ProtocolCHAOS = 16 // Chaos
ProtocolUDP = 17 // User Datagram
ProtocolMUX = 18 // Multiplexing
ProtocolDCNMEAS = 19 // DCN Measurement Subsystems
ProtocolHMP = 20 // Host Monitoring
ProtocolPRM = 21 // Packet Radio Measurement
ProtocolXNSIDP = 22 // XEROX NS IDP
ProtocolTRUNK1 = 23 // Trunk-1
ProtocolTRUNK2 = 24 // Trunk-2
ProtocolLEAF1 = 25 // Leaf-1
ProtocolLEAF2 = 26 // Leaf-2
ProtocolRDP = 27 // Reliable Data Protocol
ProtocolIRTP = 28 // Internet Reliable Transaction
ProtocolISOTP4 = 29 // ISO Transport Protocol Class 4
ProtocolNETBLT = 30 // Bulk Data Transfer Protocol
ProtocolMFENSP = 31 // MFE Network Services Protocol
ProtocolMERITINP = 32 // MERIT Internodal Protocol
ProtocolDCCP = 33 // Datagram Congestion Control Protocol
Protocol3PC = 34 // Third Party Connect Protocol
ProtocolIDPR = 35 // Inter-Domain Policy Routing Protocol
ProtocolXTP = 36 // XTP
ProtocolDDP = 37 // Datagram Delivery Protocol
ProtocolIDPRCMTP = 38 // IDPR Control Message Transport Proto
ProtocolTPPP = 39 // TP++ Transport Protocol
ProtocolIL = 40 // IL Transport Protocol
ProtocolIPv6 = 41 // IPv6 encapsulation
ProtocolSDRP = 42 // Source Demand Routing Protocol
ProtocolIPv6Route = 43 // Routing Header for IPv6
ProtocolIPv6Frag = 44 // Fragment Header for IPv6
ProtocolIDRP = 45 // Inter-Domain Routing Protocol
ProtocolRSVP = 46 // Reservation Protocol
ProtocolGRE = 47 // Generic Routing Encapsulation
ProtocolDSR = 48 // Dynamic Source Routing Protocol
ProtocolBNA = 49 // BNA
ProtocolESP = 50 // Encap Security Payload
ProtocolAH = 51 // Authentication Header
ProtocolINLSP = 52 // Integrated Net Layer Security TUBA
ProtocolNARP = 54 // NBMA Address Resolution Protocol
ProtocolMOBILE = 55 // IP Mobility
ProtocolTLSP = 56 // Transport Layer Security Protocol using Kryptonet key management
ProtocolSKIP = 57 // SKIP
ProtocolIPv6ICMP = 58 // ICMP for IPv6
ProtocolIPv6NoNxt = 59 // No Next Header for IPv6
ProtocolIPv6Opts = 60 // Destination Options for IPv6
ProtocolCFTP = 62 // CFTP
ProtocolSATEXPAK = 64 // SATNET and Backroom EXPAK
ProtocolKRYPTOLAN = 65 // Kryptolan
ProtocolRVD = 66 // MIT Remote Virtual Disk Protocol
ProtocolIPPC = 67 // Internet Pluribus Packet Core
ProtocolSATMON = 69 // SATNET Monitoring
ProtocolVISA = 70 // VISA Protocol
ProtocolIPCV = 71 // Internet Packet Core Utility
ProtocolCPNX = 72 // Computer Protocol Network Executive
ProtocolCPHB = 73 // Computer Protocol Heart Beat
ProtocolWSN = 74 // Wang Span Network
ProtocolPVP = 75 // Packet Video Protocol
ProtocolBRSATMON = 76 // Backroom SATNET Monitoring
ProtocolSUNND = 77 // SUN ND PROTOCOL-Temporary
ProtocolWBMON = 78 // WIDEBAND Monitoring
ProtocolWBEXPAK = 79 // WIDEBAND EXPAK
ProtocolISOIP = 80 // ISO Internet Protocol
ProtocolVMTP = 81 // VMTP
ProtocolSECUREVMTP = 82 // SECURE-VMTP
ProtocolVINES = 83 // VINES
ProtocolTTP = 84 // Transaction Transport Protocol
ProtocolIPTM = 84 // Internet Protocol Traffic Manager
ProtocolNSFNETIGP = 85 // NSFNET-IGP
ProtocolDGP = 86 // Dissimilar Gateway Protocol
ProtocolTCF = 87 // TCF
ProtocolEIGRP = 88 // EIGRP
ProtocolOSPFIGP = 89 // OSPFIGP
ProtocolSpriteRPC = 90 // Sprite RPC Protocol
ProtocolLARP = 91 // Locus Address Resolution Protocol
ProtocolMTP = 92 // Multicast Transport Protocol
ProtocolAX25 = 93 // AX.25 Frames
ProtocolIPIP = 94 // IP-within-IP Encapsulation Protocol
ProtocolSCCSP = 96 // Semaphore Communications Sec. Pro.
ProtocolETHERIP = 97 // Ethernet-within-IP Encapsulation
ProtocolENCAP = 98 // Encapsulation Header
ProtocolGMTP = 100 // GMTP
ProtocolIFMP = 101 // Ipsilon Flow Management Protocol
ProtocolPNNI = 102 // PNNI over IP
ProtocolPIM = 103 // Protocol Independent Multicast
ProtocolARIS = 104 // ARIS
ProtocolSCPS = 105 // SCPS
ProtocolQNX = 106 // QNX
ProtocolAN = 107 // Active Networks
ProtocolIPComp = 108 // IP Payload Compression Protocol
ProtocolSNP = 109 // Sitara Networks Protocol
ProtocolCompaqPeer = 110 // Compaq Peer Protocol
ProtocolIPXinIP = 111 // IPX in IP
ProtocolVRRP = 112 // Virtual Router Redundancy Protocol
ProtocolPGM = 113 // PGM Reliable Transport Protocol
ProtocolL2TP = 115 // Layer Two Tunneling Protocol
ProtocolDDX = 116 // D-II Data Exchange (DDX)
ProtocolIATP = 117 // Interactive Agent Transfer Protocol
ProtocolSTP = 118 // Schedule Transfer Protocol
ProtocolSRP = 119 // SpectraLink Radio Protocol
ProtocolUTI = 120 // UTI
ProtocolSMP = 121 // Simple Message Protocol
ProtocolPTP = 123 // Performance Transparency Protocol
ProtocolISIS = 124 // ISIS over IPv4
ProtocolFIRE = 125 // FIRE
ProtocolCRTP = 126 // Combat Radio Transport Protocol
ProtocolCRUDP = 127 // Combat Radio User Datagram
ProtocolSSCOPMCE = 128 // SSCOPMCE
ProtocolIPLT = 129 // IPLT
ProtocolSPS = 130 // Secure Packet Shield
ProtocolPIPE = 131 // Private IP Encapsulation within IP
ProtocolSCTP = 132 // Stream Control Transmission Protocol
ProtocolFC = 133 // Fibre Channel
ProtocolRSVPE2EIGNORE = 134 // RSVP-E2E-IGNORE
ProtocolMobilityHeader = 135 // Mobility Header
ProtocolUDPLite = 136 // UDPLite
ProtocolMPLSinIP = 137 // MPLS-in-IP
ProtocolMANET = 138 // MANET Protocols
ProtocolHIP = 139 // Host Identity Protocol
ProtocolShim6 = 140 // Shim6 Protocol
ProtocolWESP = 141 // Wrapped Encapsulating Security Payload
ProtocolROHC = 142 // Robust Header Compression
ProtocolReserved = 255 // Reserved
)
112 changes: 105 additions & 7 deletions pkg/vip/egress.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package vip

import (
"fmt"
"strconv"
"strings"

iptables "github.com/kube-vip/kube-vip/pkg/iptables"
Expand Down Expand Up @@ -149,6 +150,19 @@ func (e *Egress) InsertSourceNat(vip, podIP string) error {

func (e *Egress) InsertSourceNatForDestinationPort(vip, podIP, port, proto string) error {
log.Infof("[egress] Adding source nat from [%s] => [%s], with destination port [%s]", podIP, vip, port)
natRules, err := e.ipTablesClient.List("nat", "POSTROUTING")
if err != nil {
return err
}
foundNatRules := e.findExistingVIP(natRules, vip)
log.Warnf("[egress] Cleaning [%d] existing postrouting nat rules for vip [%s]", len(foundNatRules), vip)
for x := range foundNatRules {
err = e.ipTablesClient.Delete("nat", "POSTROUTING", foundNatRules[x][2:]...)
if err != nil {
log.Errorf("[egress] Error removing rule [%v]", err)
}
}

if exists, err := e.ipTablesClient.Exists("nat", "POSTROUTING", "-s", podIP+"/32", "-m", "mark", "--mark", "64/64", "-j", "SNAT", "--to-source", vip, "-p", proto, "--dport", port, "-m", "comment", "--comment", e.comment); err != nil {
return err
} else if exists {
Expand All @@ -160,7 +174,7 @@ func (e *Egress) InsertSourceNatForDestinationPort(vip, podIP, port, proto strin
return e.ipTablesClient.Insert("nat", "POSTROUTING", 1, "-s", podIP+"/32", "-m", "mark", "--mark", "64/64", "-j", "SNAT", "--to-source", vip, "-p", proto, "--dport", port, "-m", "comment", "--comment", e.comment)
}

func DeleteExistingSessions(sessionIP string, destination bool) error {
func DeleteExistingSessions(sessionIP string, destination bool, destinationPorts, srcPorts string) error {

nfct, err := ct.Open(&ct.Config{})
if err != nil {
Expand All @@ -173,14 +187,75 @@ func DeleteExistingSessions(sessionIP string, destination bool) error {
log.Errorf("could not dump sessions: %v", err)
return err
}
destPortProtocol := make(map[uint16]uint8)
srcPortProtocol := make(map[uint16]uint8)

if destinationPorts != "" {
fixedPorts := strings.Split(destinationPorts, ",")

for _, fixedPort := range fixedPorts {

data := strings.Split(fixedPort, ":")
if len(data) == 0 {
continue
}
port, err := strconv.ParseUint(data[1], 10, 16)
if err != nil {
return fmt.Errorf("[egress] error parsing annotaion [%s]", destinationPorts)
}
switch data[0] {
case strings.ToLower("udp"):
destPortProtocol[uint16(port)] = ProtocolUDP
case strings.ToLower("tcp"):
destPortProtocol[uint16(port)] = ProtocolTCP
case strings.ToLower("sctp"):
destPortProtocol[uint16(port)] = ProtocolSCTP
default:
log.Errorf("[egress] annotation protocol [%s] isn't supported", data[0])
}
}
}

if srcPorts != "" {
fixedPorts := strings.Split(srcPorts, ",")

for _, fixedPort := range fixedPorts {

data := strings.Split(fixedPort, ":")
if len(data) == 0 {
continue
}
port, err := strconv.ParseUint(data[1], 10, 16)
if err != nil {
return fmt.Errorf("[egress] error parsing annotaion [%s]", srcPorts)
}
switch data[0] {
case strings.ToLower("udp"):
srcPortProtocol[uint16(port)] = ProtocolUDP
case strings.ToLower("tcp"):
srcPortProtocol[uint16(port)] = ProtocolTCP
case strings.ToLower("sctp"):
srcPortProtocol[uint16(port)] = ProtocolSCTP
default:
log.Errorf("[egress] annotation protocol [%s] isn't supported", data[0])
}
}
}

// by default we only clear source (i.e. connections going from the vip (egress))
if !destination {
for _, session := range sessions {
//fmt.Printf("Looking for [%s] found [%s]\n", podIP, session.Origin.Dst.String())

//session.Origin.Proto
if session.Origin.Src.String() == sessionIP /*&& *session.Origin.Proto.DstPort == uint16(destinationPort)*/ {
//fmt.Printf("Source -> %s Destination -> %s:%d\n", session.Origin.Src.String(), session.Origin.Dst.String(), *session.Origin.Proto.DstPort)
err = nfct.Delete(ct.Conntrack, ct.IPv4, session)
if destinationPorts != "" {
proto := destPortProtocol[*session.Origin.Proto.DstPort]
if proto == *session.Origin.Proto.Number {
log.Infof("[egress] cleaning existing connection Source [%s] -> [%s:%d] proto: [%d] ", session.Origin.Src.String(), session.Origin.Dst.String(), *session.Origin.Proto.DstPort, *session.Origin.Proto.Number)
err = nfct.Delete(ct.Conntrack, ct.IPv4, session)
}
} else {
err = nfct.Delete(ct.Conntrack, ct.IPv4, session)
}
if err != nil {
log.Errorf("could not delete sessions: %v", err)
}
Expand All @@ -192,8 +267,15 @@ func DeleteExistingSessions(sessionIP string, destination bool) error {
//fmt.Printf("Looking for [%s] found [%s]\n", podIP, session.Origin.Dst.String())

if session.Origin.Dst.String() == sessionIP /*&& *session.Origin.Proto.DstPort == uint16(destinationPort)*/ {
//fmt.Printf("Source -> %s Destination -> %s:%d\n", session.Origin.Src.String(), session.Origin.Dst.String(), *session.Origin.Proto.DstPort)
err = nfct.Delete(ct.Conntrack, ct.IPv4, session)
if srcPorts != "" {
proto := srcPortProtocol[*session.Origin.Proto.DstPort]
if proto == *session.Origin.Proto.Number {
log.Infof("[egress] cleaning existing connection Source [%s] -> [%s:%d] proto: [%d] ", session.Origin.Src.String(), session.Origin.Dst.String(), *session.Origin.Proto.DstPort, *session.Origin.Proto.Number)
err = nfct.Delete(ct.Conntrack, ct.IPv4, session)
}
} else {
err = nfct.Delete(ct.Conntrack, ct.IPv4, session)
}
if err != nil {
log.Errorf("could not delete sessions: %v", err)
}
Expand Down Expand Up @@ -278,3 +360,19 @@ func (e *Egress) findRules(rules []string) [][]string {

return foundRules
}

func (e *Egress) findExistingVIP(rules []string, vip string) [][]string {
var foundRules [][]string

for i := range rules {
r := strings.Split(rules[i], " ")
for x := range r {
// Look for a vip already in a post Routing rule
if r[x] == vip {
foundRules = append(foundRules, r)
}
}
}

return foundRules
}

0 comments on commit 1ea277f

Please sign in to comment.