Skip to content

Commit

Permalink
Rearrange egressip internals, add duplication tests
Browse files Browse the repository at this point in the history
There should never be multiple HostSubnets or multiple NetNamespaces
claiming the same egress IP, but if there are, we need to track them
carefully so we don't get out sync with reality after things are
fixed.
  • Loading branch information
danwinship committed Mar 5, 2018
1 parent b10a138 commit a21509b
Show file tree
Hide file tree
Showing 2 changed files with 295 additions and 124 deletions.
290 changes: 166 additions & 124 deletions pkg/network/node/egressip.go
Expand Up @@ -20,22 +20,25 @@ import (
)

type nodeEgress struct {
nodeIP string

// requestedIPs are the EgressIPs listed on the node's HostSubnet
nodeIP string
requestedIPs sets.String
// assignedIPs are the IPs actually in use on the node
assignedIPs sets.String
}

type namespaceEgress struct {
vnid uint32

// requestedIP is the egress IP it wants (NetNamespace.EgressIPs[0])
vnid uint32
requestedIP string
// assignedIP is an egress IP actually in use on nodeIP
assignedIP string
nodeIP string
}

type egressIPInfo struct {
ip string

nodes []*nodeEgress
namespaces []*namespaceEgress

assignedNodeIP string
assignedIPTablesMark string
assignedVNID uint32
blockedVNIDs map[uint32]bool
}

type egressIPWatcher struct {
Expand All @@ -48,13 +51,9 @@ type egressIPWatcher struct {
networkInformers networkinformers.SharedInformerFactory
iptables *NodeIPTables

// from HostSubnets
nodesByNodeIP map[string]*nodeEgress
nodesByEgressIP map[string]*nodeEgress

// From NetNamespaces
namespacesByVNID map[uint32]*namespaceEgress
namespacesByEgressIP map[string]*namespaceEgress
nodesByNodeIP map[string]*nodeEgress
namespacesByVNID map[uint32]*namespaceEgress
egressIPs map[string]*egressIPInfo

localEgressLink netlink.Link
localEgressNet *net.IPNet
Expand All @@ -67,11 +66,9 @@ func newEgressIPWatcher(oc *ovsController, localIP string, masqueradeBit *int32)
oc: oc,
localIP: localIP,

nodesByNodeIP: make(map[string]*nodeEgress),
nodesByEgressIP: make(map[string]*nodeEgress),

namespacesByVNID: make(map[uint32]*namespaceEgress),
namespacesByEgressIP: make(map[string]*namespaceEgress),
nodesByNodeIP: make(map[string]*nodeEgress),
namespacesByVNID: make(map[uint32]*namespaceEgress),
egressIPs: make(map[string]*egressIPInfo),
}
if masqueradeBit != nil {
eip.masqueradeBit = 1 << uint32(*masqueradeBit)
Expand Down Expand Up @@ -106,6 +103,47 @@ func getMarkForVNID(vnid, masqueradeBit uint32) string {
return fmt.Sprintf("0x%08x", vnid)
}

func (eip *egressIPWatcher) ensureEgressIPInfo(egressIP string) *egressIPInfo {
eg := eip.egressIPs[egressIP]
if eg == nil {
eg = &egressIPInfo{ip: egressIP}
eip.egressIPs[egressIP] = eg
}
return eg
}

func (eg *egressIPInfo) addNode(node *nodeEgress) {
if len(eg.nodes) != 0 {
utilruntime.HandleError(fmt.Errorf("Multiple nodes claiming EgressIP %q (nodes %q, %q)", eg.ip, node.nodeIP, eg.nodes[0].nodeIP))
}
eg.nodes = append(eg.nodes, node)
}

func (eg *egressIPInfo) deleteNode(node *nodeEgress) {
for i := range eg.nodes {
if eg.nodes[i] == node {
eg.nodes = append(eg.nodes[:i], eg.nodes[i+1:]...)
return
}
}
}

func (eg *egressIPInfo) addNamespace(ns *namespaceEgress) {
if len(eg.namespaces) != 0 {
utilruntime.HandleError(fmt.Errorf("Multiple namespaces claiming EgressIP %q (NetIDs %d, %d)", eg.ip, ns.vnid, eg.namespaces[0].vnid))
}
eg.namespaces = append(eg.namespaces, ns)
}

func (eg *egressIPInfo) deleteNamespace(ns *namespaceEgress) {
for i := range eg.namespaces {
if eg.namespaces[i] == ns {
eg.namespaces = append(eg.namespaces[:i], eg.namespaces[i+1:]...)
return
}
}
}

func (eip *egressIPWatcher) watchHostSubnets() {
funcs := common.InformerFuncs(&networkapi.HostSubnet{}, eip.handleAddOrUpdateHostSubnet, eip.handleDeleteHostSubnet)
eip.networkInformers.Network().InternalVersion().HostSubnets().Informer().AddEventHandler(funcs)
Expand Down Expand Up @@ -137,7 +175,6 @@ func (eip *egressIPWatcher) updateNodeEgress(nodeIP string, nodeEgressIPs []stri
node = &nodeEgress{
nodeIP: nodeIP,
requestedIPs: sets.NewString(),
assignedIPs: sets.NewString(),
}
eip.nodesByNodeIP[nodeIP] = node
} else if len(nodeEgressIPs) == 0 {
Expand All @@ -148,89 +185,19 @@ func (eip *egressIPWatcher) updateNodeEgress(nodeIP string, nodeEgressIPs []stri

// Process new EgressIPs
for _, ip := range node.requestedIPs.Difference(oldRequestedIPs).UnsortedList() {
if oldNode := eip.nodesByEgressIP[ip]; oldNode != nil {
utilruntime.HandleError(fmt.Errorf("Multiple nodes claiming EgressIP %q (nodes %q, %q)", ip, node.nodeIP, oldNode.nodeIP))
continue
}

eip.nodesByEgressIP[ip] = node
eip.maybeAddEgressIP(ip)
eg := eip.ensureEgressIPInfo(ip)
eg.addNode(node)
eip.syncEgressIP(eg)
}

// Process removed EgressIPs
for _, ip := range oldRequestedIPs.Difference(node.requestedIPs).UnsortedList() {
if oldNode := eip.nodesByEgressIP[ip]; oldNode != node {
// User removed a duplicate EgressIP
eg := eip.egressIPs[ip]
if eg == nil {
continue
}

eip.deleteEgressIP(ip)
delete(eip.nodesByEgressIP, ip)
}
}

func (eip *egressIPWatcher) maybeAddEgressIP(egressIP string) {
node := eip.nodesByEgressIP[egressIP]
ns := eip.namespacesByEgressIP[egressIP]
if ns == nil {
return
}

mark := getMarkForVNID(ns.vnid, eip.masqueradeBit)
nodeIP := ""

if node != nil && !node.assignedIPs.Has(egressIP) {
node.assignedIPs.Insert(egressIP)
nodeIP = node.nodeIP
if node.nodeIP == eip.localIP {
if err := eip.assignEgressIP(egressIP, mark); err != nil {
utilruntime.HandleError(fmt.Errorf("Error assigning Egress IP %q: %v", egressIP, err))
nodeIP = ""
}
}
}

if ns.assignedIP != egressIP || ns.nodeIP != nodeIP {
ns.assignedIP = egressIP
ns.nodeIP = nodeIP

err := eip.oc.SetNamespaceEgressViaEgressIP(ns.vnid, ns.nodeIP, mark)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error updating Namespace egress rules: %v", err))
}
}
}

func (eip *egressIPWatcher) deleteEgressIP(egressIP string) {
node := eip.nodesByEgressIP[egressIP]
ns := eip.namespacesByEgressIP[egressIP]
if node == nil || ns == nil {
return
}

mark := getMarkForVNID(ns.vnid, eip.masqueradeBit)
if node.nodeIP == eip.localIP {
if err := eip.releaseEgressIP(egressIP, mark); err != nil {
utilruntime.HandleError(fmt.Errorf("Error releasing Egress IP %q: %v", egressIP, err))
}
node.assignedIPs.Delete(egressIP)
}

if ns.assignedIP == egressIP {
ns.assignedIP = ""
ns.nodeIP = ""
}

var err error
if ns.requestedIP == "" {
// Namespace no longer wants EgressIP
err = eip.oc.SetNamespaceEgressNormal(ns.vnid)
} else {
// Namespace still wants EgressIP but no node provides it
err = eip.oc.SetNamespaceEgressDropped(ns.vnid)
}
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error updating Namespace egress rules: %v", err))
eg.deleteNode(node)
eip.syncEgressIP(eg)
}
}

Expand Down Expand Up @@ -266,45 +233,120 @@ func (eip *egressIPWatcher) updateNamespaceEgress(vnid uint32, egressIP string)

ns := eip.namespacesByVNID[vnid]
if ns == nil {
if egressIP == "" {
return
}
ns = &namespaceEgress{vnid: vnid}
eip.namespacesByVNID[vnid] = ns
} else if egressIP == "" {
delete(eip.namespacesByVNID, vnid)
}

if ns.requestedIP == egressIP {
return
}
if oldNS := eip.namespacesByEgressIP[egressIP]; oldNS != nil {
utilruntime.HandleError(fmt.Errorf("Multiple NetNamespaces claiming EgressIP %q (NetIDs %d, %d)", egressIP, ns.vnid, oldNS.vnid))
return
}

if ns.assignedIP != "" {
oldEgressIP := ns.assignedIP
eip.deleteEgressIP(oldEgressIP)
delete(eip.namespacesByEgressIP, oldEgressIP)
ns.assignedIP = ""
ns.nodeIP = ""
if ns.requestedIP != "" {
eg := eip.egressIPs[ns.requestedIP]
if eg != nil {
eg.deleteNamespace(ns)
eip.syncEgressIP(eg)
}
}

ns.requestedIP = egressIP
eip.namespacesByEgressIP[egressIP] = ns
eip.maybeAddEgressIP(egressIP)
if egressIP == "" {
return
}

eg := eip.ensureEgressIPInfo(egressIP)
eg.addNamespace(ns)
eip.syncEgressIP(eg)
}

func (eip *egressIPWatcher) deleteNamespaceEgress(vnid uint32) {
eip.Lock()
defer eip.Unlock()
eip.updateNamespaceEgress(vnid, "")
}

ns := eip.namespacesByVNID[vnid]
if ns == nil {
return
func (eip *egressIPWatcher) syncEgressIP(eg *egressIPInfo) {
assignedNodeIPChanged := eip.syncEgressIPTablesState(eg)
eip.syncEgressOVSState(eg, assignedNodeIPChanged)
}

func (eip *egressIPWatcher) syncEgressIPTablesState(eg *egressIPInfo) bool {
// The egressIPInfo should have an assigned node IP if and only if the
// egress IP is active (ie, it is assigned to exactly 1 node and exactly
// 1 namespace).
egressIPActive := (len(eg.nodes) == 1 && len(eg.namespaces) == 1)
assignedNodeIPChanged := false
if egressIPActive && eg.assignedNodeIP != eg.nodes[0].nodeIP {
eg.assignedNodeIP = eg.nodes[0].nodeIP
eg.assignedIPTablesMark = getMarkForVNID(eg.namespaces[0].vnid, eip.masqueradeBit)
assignedNodeIPChanged = true
if eg.assignedNodeIP == eip.localIP {
if err := eip.assignEgressIP(eg.ip, eg.assignedIPTablesMark); err != nil {
utilruntime.HandleError(fmt.Errorf("Error assigning Egress IP %q: %v", eg.ip, err))
eg.assignedNodeIP = ""
}
}
} else if !egressIPActive && eg.assignedNodeIP != "" {
if eg.assignedNodeIP == eip.localIP {
if err := eip.releaseEgressIP(eg.ip, eg.assignedIPTablesMark); err != nil {
utilruntime.HandleError(fmt.Errorf("Error releasing Egress IP %q: %v", eg.ip, err))
}
}
eg.assignedNodeIP = ""
eg.assignedIPTablesMark = ""
assignedNodeIPChanged = true
}
return assignedNodeIPChanged
}

func (eip *egressIPWatcher) syncEgressOVSState(eg *egressIPInfo, assignedNodeIPChanged bool) {
var blockedVNIDs map[uint32]bool

// If multiple namespaces are assigned to the same EgressIP, we need to block
// outgoing traffic from all of them.
if len(eg.namespaces) > 1 {
eg.assignedVNID = 0
blockedVNIDs = make(map[uint32]bool)
for _, ns := range eg.namespaces {
blockedVNIDs[ns.vnid] = true
if !eg.blockedVNIDs[ns.vnid] {
err := eip.oc.SetNamespaceEgressDropped(ns.vnid)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error updating Namespace egress rules: %v", err))
}
}
}
}

// If we have, or had, a single egress namespace, then update the OVS flows if
// something has changed
var err error
if len(eg.namespaces) == 1 && (eg.assignedVNID != eg.namespaces[0].vnid || assignedNodeIPChanged) {
eg.assignedVNID = eg.namespaces[0].vnid
delete(eg.blockedVNIDs, eg.assignedVNID)
err = eip.oc.SetNamespaceEgressViaEgressIP(eg.assignedVNID, eg.assignedNodeIP, getMarkForVNID(eg.assignedVNID, eip.masqueradeBit))
} else if len(eg.namespaces) == 0 && eg.assignedVNID != 0 {
err = eip.oc.SetNamespaceEgressNormal(eg.assignedVNID)
eg.assignedVNID = 0
}
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error updating Namespace egress rules: %v", err))
}

if ns.assignedIP != "" {
ns.requestedIP = ""
egressIP := ns.assignedIP
eip.deleteEgressIP(egressIP)
delete(eip.namespacesByEgressIP, egressIP)
// If we previously had blocked VNIDs, we need to unblock any that have been removed
// from the duplicates list
for vnid := range eg.blockedVNIDs {
if !blockedVNIDs[vnid] {
err := eip.oc.SetNamespaceEgressNormal(vnid)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error updating Namespace egress rules: %v", err))
}
}
}
delete(eip.namespacesByVNID, vnid)
eg.blockedVNIDs = blockedVNIDs
}

func (eip *egressIPWatcher) assignEgressIP(egressIP, mark string) error {
Expand Down

0 comments on commit a21509b

Please sign in to comment.