Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 2091157: [release-4.10] Free IPs and delete resources for completed pods #1152

Merged
merged 7 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion go-controller/hybrid-overlay/pkg/controller/node_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,13 @@ func (n *NodeController) AddPod(pod *kapi.Pod) error {
if !util.PodWantsNetwork(pod) {
return nil
}
if util.PodCompleted(pod) {
klog.Infof("Cleaning up hybrid overlay pod %s/%s because it has completed", pod.Namespace, pod.Name)
return n.DeletePod(pod)
}
podIPs, podMAC, err := getPodDetails(pod)
if err != nil {
klog.V(5).Infof("Cleaning up hybrid overlay pod %s/%s because %v", pod.Namespace, pod.Name, err)
klog.Warningf("Cleaning up hybrid overlay pod %s/%s because it has no OVN annotation %v", pod.Namespace, pod.Name, err)
return n.DeletePod(pod)
}

Expand Down
5 changes: 4 additions & 1 deletion go-controller/pkg/libovsdbops/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,11 @@ func DeleteNATsFromRouterOps(nbClient libovsdbclient.Client, ops []libovsdb.Oper
}

routerNats, err := getRouterNATs(nbClient, router)
if err == libovsdbclient.ErrNotFound {
return ops, nil
}
if err != nil {
return ops, err
return ops, fmt.Errorf("unable to get NAT entries for router %+v: %w", router, err)
}

natUUIDs := make([]string, 0, len(nats))
Expand Down
37 changes: 22 additions & 15 deletions go-controller/pkg/ovn/egressgw.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,23 +222,10 @@ func (oc *Controller) addGWRoutesForNamespace(namespace string, egress gatewayIn
if err != nil {
return fmt.Errorf("failed to get all the pods (%v)", err)
}
// TODO (trozet): use the go bindings here and batch commands
for _, pod := range existingPods {
podNsName := ktypes.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
if config.Gateway.DisableSNATMultipleGWs {
logicalPort := util.GetLogicalPortName(pod.Namespace, pod.Name)
portInfo, err := oc.logicalPortCache.get(logicalPort)
if err != nil {
klog.Warningf("Unable to get port %s in cache for SNAT rule removal", logicalPort)
} else {
// delete all perPodSNATs (if this pod was controlled by egressIP controller, it will stop working since
// a pod cannot be used for multiple-external-gateways and egressIPs at the same time)
if err = deletePerPodGRSNAT(oc.nbClient, pod.Spec.NodeName, []*net.IPNet{}, portInfo.ips); err != nil {
klog.Error(err.Error())
}
}
if util.PodCompleted(pod) || !util.PodWantsNetwork(pod) {
continue
}

podIPs := make([]*net.IPNet, 0)
for _, podIP := range pod.Status.PodIPs {
cidr := podIP.IP + GetIPFullMask(podIP.IP)
Expand All @@ -248,6 +235,18 @@ func (oc *Controller) addGWRoutesForNamespace(namespace string, egress gatewayIn
}
podIPs = append(podIPs, ipNet)
}
if len(podIPs) == 0 {
klog.Warningf("Will not add gateway routes pod %s/%s. IPs not found!", pod.Namespace, pod.Name)
continue
}
if config.Gateway.DisableSNATMultipleGWs {
// delete all perPodSNATs (if this pod was controlled by egressIP controller, it will stop working since
// a pod cannot be used for multiple-external-gateways and egressIPs at the same time)
if err = deletePerPodGRSNAT(oc.nbClient, pod.Spec.NodeName, []*net.IPNet{}, podIPs); err != nil {
klog.Error(err.Error())
}
}
podNsName := ktypes.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
if err := oc.addGWRoutesForPod([]*gatewayInfo{&egress}, podIPs, podNsName, pod.Spec.NodeName); err != nil {
return err
}
Expand Down Expand Up @@ -1149,6 +1148,10 @@ func (oc *Controller) buildClusterECMPCacheFromNamespaces(clusterRouteCache map[
}
for _, gwIP := range gwIPs {
for _, nsPod := range nsPods {
// ignore completed pods, host networked pods, pods not scheduled
if !util.PodWantsNetwork(nsPod) || util.PodCompleted(nsPod) || !util.PodScheduled(nsPod) {
continue
}
for _, podIP := range nsPod.Status.PodIPs {
if utilnet.IsIPv6(gwIP) != utilnet.IsIPv6String(podIP.IP) {
continue
Expand Down Expand Up @@ -1202,6 +1205,10 @@ func (oc *Controller) buildClusterECMPCacheFromPods(clusterRouteCache map[string
}
for _, gwIP := range gwIPs {
for _, nsPod := range nsPods {
// ignore completed pods, host networked pods, pods not scheduled
if !util.PodWantsNetwork(nsPod) || util.PodCompleted(nsPod) || !util.PodScheduled(nsPod) {
continue
}
for _, podIP := range nsPod.Status.PodIPs {
if utilnet.IsIPv6(gwIP) != utilnet.IsIPv6String(podIP.IP) {
continue
Expand Down
7 changes: 7 additions & 0 deletions go-controller/pkg/ovn/egressip.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ func (oc *Controller) reconcileEgressIP(old, new *egressipv1.EgressIP) (err erro
return err
}
}
if util.PodCompleted(pod) {
continue
}
if newPodSelector.Matches(podLabels) && !oldPodSelector.Matches(podLabels) {
if err := oc.addPodEgressIPAssignments(name, newEIP.Status.Items, pod); err != nil {
return err
Expand Down Expand Up @@ -1197,6 +1200,10 @@ func (oc *Controller) generateCacheForEgressIP(eIPs []interface{}) (map[string]e
continue
}
for _, pod := range pods {
if util.PodCompleted(pod) {
continue
}
// FIXME(trozet): potential race where pod is not yet added in the cache by the pod handler
logicalPort, err := oc.logicalPortCache.get(util.GetLogicalPortName(pod.Namespace, pod.Name))
if err != nil {
klog.Errorf("Error getting logical port %s, err: %v", util.GetLogicalPortName(pod.Namespace, pod.Name), err)
Expand Down
17 changes: 7 additions & 10 deletions go-controller/pkg/ovn/ipallocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,15 @@ import (
type Interface interface {
Allocate(net.IP) error
AllocateNext() (net.IP, error)
Release(net.IP) error
Release(net.IP)
ForEach(func(net.IP))
CIDR() net.IPNet

// For testing
Has(ip net.IP) bool
}

var (
ErrFull = errors.New("range is full")
ErrAllocated = errors.New("provided IP is already allocated")
ErrMismatchedNetwork = errors.New("the provided network does not match the current range")
ErrFull = errors.New("range is full")
ErrAllocated = errors.New("provided IP is already allocated")
)

type ErrNotInRange struct {
Expand Down Expand Up @@ -109,7 +106,7 @@ func NewAllocatorCIDRRange(cidr *net.IPNet, allocatorFactory allocator.Allocator
return &r, err
}

// Helper that wraps NewAllocatorCIDRRange, for creating a range backed by an in-memory store.
// NewCIDRRange is a helper that wraps NewAllocatorCIDRRange, for creating a range backed by an in-memory store.
func NewCIDRRange(cidr *net.IPNet) (*Range, error) {
return NewAllocatorCIDRRange(cidr, func(max int, rangeSpec string) (allocator.Interface, error) {
return allocator.NewAllocationMap(max, rangeSpec), nil
Expand Down Expand Up @@ -174,13 +171,13 @@ func (r *Range) AllocateNext() (net.IP, error) {
// Release releases the IP back to the pool. Releasing an
// unallocated IP or an IP out of the range is a no-op and
// returns no error.
func (r *Range) Release(ip net.IP) error {
func (r *Range) Release(ip net.IP) {
ok, offset := r.contains(ip)
if !ok {
return nil
return
}

return r.alloc.Release(offset)
r.alloc.Release(offset)
}

// ForEach calls the provided function for each allocated IP.
Expand Down
5 changes: 2 additions & 3 deletions go-controller/pkg/ovn/ipallocator/allocator/bitmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,16 @@ func (r *AllocationBitmap) AllocateNext() (int, bool, error) {
// Release releases the item back to the pool. Releasing an
// unallocated item or an item out of the range is a no-op and
// returns no error.
func (r *AllocationBitmap) Release(offset int) error {
func (r *AllocationBitmap) Release(offset int) {
r.lock.Lock()
defer r.lock.Unlock()

if r.allocated.Bit(offset) == 0 {
return nil
return
}

r.allocated = r.allocated.SetBit(r.allocated, offset, 0)
r.count--
return nil
}

const (
Expand Down
16 changes: 4 additions & 12 deletions go-controller/pkg/ovn/ipallocator/allocator/bitmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ func TestRelease(t *testing.T) {
t.Errorf("expect offset %v allocated", offset)
}

if err := m.Release(offset); err != nil {
t.Errorf("unexpected error: %v", err)
}
m.Release(offset)

if m.Has(offset) {
t.Errorf("expect offset %v not allocated", offset)
Expand Down Expand Up @@ -196,9 +194,7 @@ func TestRoundRobinAllocationOrdering(t *testing.T) {
}

// Release one of the pre-allocated entries
if err := m.Release(0); err != nil {
t.Fatalf("unexpected error: %v", err)
}
m.Release(0)

// Next allocation should be after the most recently allocated entry,
// not one of the just-released ones
Expand Down Expand Up @@ -264,9 +260,7 @@ func TestRoundRobinRelease(t *testing.T) {
t.Fatalf("expect offset %d allocated", offset)
}

if err := m.Release(offset); err != nil {
t.Fatalf("unexpected error: %v", err)
}
m.Release(offset)

if m.Has(offset) {
t.Fatalf("expect offset %d not allocated", offset)
Expand All @@ -289,9 +283,7 @@ func TestRoundRobinWrapAround(t *testing.T) {
t.Fatalf("got offset %d but expected offset %d", offset, i)
}

if err := m.Release(offset); err != nil {
t.Fatalf("unexpected release error: %v", err)
}
m.Release(offset)
}
}

Expand Down
2 changes: 1 addition & 1 deletion go-controller/pkg/ovn/ipallocator/allocator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package allocator
type Interface interface {
Allocate(int) (bool, error)
AllocateNext() (int, bool, error)
Release(int) error
Release(int)
ForEach(func(int))

// For testing
Expand Down
8 changes: 2 additions & 6 deletions go-controller/pkg/ovn/ipallocator/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ func TestAllocate(t *testing.T) {
t.Fatal(err)
}
released := net.ParseIP(tc.released)
if err := r.Release(released); err != nil {
t.Fatal(err)
}
r.Release(released)
if f := r.Free(); f != 1 {
t.Errorf("Test %s unexpected free %d", tc.name, f)
}
Expand All @@ -131,9 +129,7 @@ func TestAllocate(t *testing.T) {
t.Errorf("Test %s unexpected %s : %s", tc.name, ip, released)
}

if err := r.Release(released); err != nil {
t.Fatal(err)
}
r.Release(released)
for _, outOfRange := range tc.outOfRange {
err = r.Allocate(net.ParseIP(outOfRange))
if _, ok := err.(*ErrNotInRange); !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,25 @@ func (manager *LogicalSwitchManager) GetSwitchSubnets(nodeName string) []*net.IP
return nil
}

// AllocateUntilFull used for unit testing only, allocates the rest of the node subnet
func (manager *LogicalSwitchManager) AllocateUntilFull(nodeName string) error {
manager.RLock()
defer manager.RUnlock()
lsi, ok := manager.cache[nodeName]
if !ok {
return fmt.Errorf("unable to allocate ips, node: %s does not exist in logical switch manager", nodeName)
} else if len(lsi.ipams) == 0 {
return fmt.Errorf("unable to allocate ips for node: %s. logical switch manager has no IPAM", nodeName)
}
var err error
for err != ipam.ErrFull {
for _, ipam := range lsi.ipams {
_, err = ipam.AllocateNext()
}
}
return nil
}

// AllocateIPs will block off IPs in the ipnets slice as already allocated
// for a given switch
func (manager *LogicalSwitchManager) AllocateIPs(nodeName string, ipnets []*net.IPNet) error {
Expand All @@ -196,10 +215,9 @@ func (manager *LogicalSwitchManager) AllocateIPs(nodeName string, ipnets []*net.
// iterate over range of already allocated indices and release
// ips allocated before the error occurred.
for relIdx, relIPNet := range allocated {
if relErr := lsi.ipams[relIdx].Release(relIPNet.IP); relErr != nil {
klog.Errorf("Error while releasing IP: %s, err: %v", relIPNet.IP, relErr)
} else {
klog.Warningf("Reserved IP: %s were released", relIPNet.IP.String())
lsi.ipams[relIdx].Release(relIPNet.IP)
if relIPNet.IP != nil {
klog.Warningf("Reserved IP: %s was released", relIPNet.IP.String())
}
}
}
Expand Down Expand Up @@ -252,11 +270,11 @@ func (manager *LogicalSwitchManager) AllocateNextIPs(nodeName string) ([]*net.IP
// iterate over range of already allocated indices and release
// ips allocated before the error occurred.
for relIdx, relIPNet := range ipnets {
if relErr := lsi.ipams[relIdx].Release(relIPNet.IP); relErr != nil {
klog.Errorf("Error while releasing IP: %s, err: %v", relIPNet.IP, relErr)
lsi.ipams[relIdx].Release(relIPNet.IP)
if relIPNet.IP != nil {
klog.Warningf("Reserved IP: %s was released", relIPNet.IP.String())
}
}
klog.Warningf("Allocated IPs: %s were released", util.JoinIPNetIPs(ipnets, " "))
}
}()

Expand Down Expand Up @@ -295,16 +313,48 @@ func (manager *LogicalSwitchManager) ReleaseIPs(nodeName string, ipnets []*net.I
for _, ipam := range lsi.ipams {
cidr := ipam.CIDR()
if cidr.Contains(ipnet.IP) {
if err := ipam.Release(ipnet.IP); err != nil {
return err
}
ipam.Release(ipnet.IP)
break
}
}
}
return nil
}

// ConditionalIPRelease determines if any IP is available to be released from an IPAM conditionally if func is true.
// It guarantees state of the allocator will not change while executing the predicate function
// TODO(trozet): add unit testing for this function
func (manager *LogicalSwitchManager) ConditionalIPRelease(nodeName string, ipnets []*net.IPNet, predicate func() (bool, error)) (bool, error) {
manager.RLock()
defer manager.RUnlock()
if ipnets == nil || nodeName == "" {
klog.V(5).Infof("Node name is empty or ip slice to release is nil")
return false, nil
}
lsi, ok := manager.cache[nodeName]
if !ok {
return false, nil
}
if len(lsi.ipams) == 0 {
return false, nil
}

// check if ipam has one of the ip addresses, and then execute the predicate function to determine
// if this IP should be released or not
for _, ipnet := range ipnets {
for _, ipam := range lsi.ipams {
cidr := ipam.CIDR()
if cidr.Contains(ipnet.IP) {
if ipam.Has(ipnet.IP) {
return predicate()
}
}
}
}

return false, nil
}

// IP allocator manager for join switch's IPv4 and IPv6 subnets.
type JoinSwitchIPManager struct {
lsm *LogicalSwitchManager
Expand Down