Skip to content

Commit

Permalink
Align api calls timeouts cronjob ip reconciler
Browse files Browse the repository at this point in the history
Fixes #389

Signed-off-by: Marcelo Guerrero <marguerr@redhat.com>
  • Loading branch information
mlguerrero12 committed Jun 14, 2024
1 parent 638d58d commit 7de75c3
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 97 deletions.
20 changes: 7 additions & 13 deletions cmd/whereabouts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ func AllocateAndReleaseAddressesTest(ipRange string, gw string, kubeconfigPath s
wbClient := *kubernetes.NewKubernetesClient(
fake.NewSimpleClientset(
ipPool(conf.IPRanges[0].Range, podNamespace, ipamNetworkName)),
fakek8sclient.NewSimpleClientset(),
0)
fakek8sclient.NewSimpleClientset())

for i := 0; i < len(expectedAddresses); i++ {
name := fmt.Sprintf("%s-%d", podName, i)
Expand Down Expand Up @@ -164,8 +163,7 @@ var _ = Describe("Whereabouts operations", func() {
fake.NewSimpleClientset(
ipPool(ipamConf.IPRanges[0].Range, podNamespace, ipamNetworkName, []whereaboutstypes.IPReservation{
{PodRef: ipamConf.GetPodRef(), IfName: ifname, IP: net.ParseIP(expectedAddress)}, {PodRef: "test"}}...)),
fakek8sclient.NewSimpleClientset(),
0)
fakek8sclient.NewSimpleClientset())

cniConf, err := newCNINetConf(cniVersion, ipamConf)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -928,8 +926,7 @@ var _ = Describe("Whereabouts operations", func() {
wbClient := *kubernetes.NewKubernetesClient(
fake.NewSimpleClientset(
ipPool(ipamConf.IPRanges[0].Range, podNamespace, ipamConf.NetworkName)),
fakek8sclient.NewSimpleClientset(),
0)
fakek8sclient.NewSimpleClientset())

// allocate 8 IPs (192.168.1.5 - 192.168.1.12); the entirety of the pool defined above
for i := 0; i < 8; i++ {
Expand Down Expand Up @@ -1000,8 +997,7 @@ var _ = Describe("Whereabouts operations", func() {
wbClient := *kubernetes.NewKubernetesClient(
fake.NewSimpleClientset(
ipPool(firstRange, podNamespace, ""), ipPool(secondRange, podNamespace, "")),
fakek8sclient.NewSimpleClientset(),
0)
fakek8sclient.NewSimpleClientset())

// ----------------------------- range 1

Expand Down Expand Up @@ -1124,8 +1120,7 @@ var _ = Describe("Whereabouts operations", func() {
wbClient := *kubernetes.NewKubernetesClient(
fake.NewSimpleClientset(
ipPool(firstRange, podNamespace, ""), ipPool(secondRange, podNamespace, "")),
fakek8sclient.NewSimpleClientset(),
0)
fakek8sclient.NewSimpleClientset())

// ----------------------------- range 1

Expand Down Expand Up @@ -1248,8 +1243,7 @@ var _ = Describe("Whereabouts operations", func() {
wbClient := *kubernetes.NewKubernetesClient(
fake.NewSimpleClientset(
ipPool(firstRange, podNamespace, ""), ipPool(secondRange, podNamespace, "")),
fakek8sclient.NewSimpleClientset(),
0)
fakek8sclient.NewSimpleClientset())

// ----------------------------- range 1

Expand Down Expand Up @@ -1373,7 +1367,7 @@ func newK8sIPAM(containerID, ifName string, ipamConf *whereaboutstypes.IPAMConfi
if err != nil {
return nil
}
k8sIPAM.Client = *kubernetes.NewKubernetesClient(wbClient, k8sCoreClient, 0)
k8sIPAM.Client = *kubernetes.NewKubernetesClient(wbClient, k8sCoreClient)
return k8sIPAM
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controlloop/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (pc *PodController) garbageCollectPodIPs(pod *v1.Pod) error {
if allocation.PodRef == podID(podNamespace, podName) {
logging.Verbosef("stale allocation to cleanup: %+v", allocation)

client := *wbclient.NewKubernetesClient(nil, pc.k8sClient, 0)
client := *wbclient.NewKubernetesClient(nil, pc.k8sClient)
wbClient := &wbclient.KubernetesIPAM{
Client: client,
Config: *ipamConfig,
Expand Down
15 changes: 3 additions & 12 deletions pkg/reconciler/ip.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,20 @@
package reconciler

import (
"context"
"time"

"github.com/k8snetworkplumbingwg/whereabouts/pkg/logging"
)

const (
defaultReconcilerTimeout = 30
)

func ReconcileIPs(errorChan chan error) {
logging.Verbosef("starting reconciler run")
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(defaultReconcilerTimeout*time.Second))
defer cancel()

ipReconcileLoop, err := NewReconcileLooper(ctx, defaultReconcilerTimeout)
ipReconcileLoop, err := NewReconcileLooper()
if err != nil {
_ = logging.Errorf("failed to create the reconcile looper: %v", err)
errorChan <- err
return
}

cleanedUpIps, err := ipReconcileLoop.ReconcileIPPools(ctx)
cleanedUpIps, err := ipReconcileLoop.ReconcileIPPools()
if err != nil {
_ = logging.Errorf("failed to clean up IP for allocations: %v", err)
errorChan <- err
Expand All @@ -36,7 +27,7 @@ func ReconcileIPs(errorChan chan error) {
logging.Debugf("no IP addresses to cleanup")
}

if err := ipReconcileLoop.ReconcileOverlappingIPAddresses(ctx); err != nil {
if err := ipReconcileLoop.ReconcileOverlappingIPAddresses(); err != nil {
errorChan <- err
return
}
Expand Down
35 changes: 17 additions & 18 deletions pkg/reconciler/ip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ var _ = Describe("Whereabouts IP reconciler", func() {
namespace = "default"
networkName = "net1"
podName = "pod1"
timeout = 10
)

var (
Expand Down Expand Up @@ -75,16 +74,16 @@ var _ = Describe("Whereabouts IP reconciler", func() {
Context("reconciling the IPPool", func() {
BeforeEach(func() {
var err error
reconcileLooper, err = NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout)
reconcileLooper, err = NewReconcileLooperWithClient(kubernetes.NewKubernetesClient(wbClient, k8sClientSet))
Expect(err).NotTo(HaveOccurred())
})

It("should report the deleted IP reservation", func() {
Expect(reconcileLooper.ReconcileIPPools(context.TODO())).To(Equal([]net.IP{net.ParseIP("10.10.10.1")}))
Expect(reconcileLooper.ReconcileIPPools()).To(Equal([]net.IP{net.ParseIP("10.10.10.1")}))
})

It("the pool's orphaned IP should be deleted after the reconcile loop", func() {
_, err := reconcileLooper.ReconcileIPPools(context.TODO())
_, err := reconcileLooper.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
poolAfterCleanup, err := wbClient.WhereaboutsV1alpha1().IPPools(namespace).Get(context.TODO(), pool.GetName(), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -139,18 +138,18 @@ var _ = Describe("Whereabouts IP reconciler", func() {
Context("reconciling the IPPool", func() {
BeforeEach(func() {
var err error
reconcileLooper, err = NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout)
reconcileLooper, err = NewReconcileLooperWithClient(kubernetes.NewKubernetesClient(wbClient, k8sClientSet))
Expect(err).NotTo(HaveOccurred())
})

It("should report the dead pod's IP address as deleted", func() {
deletedIPAddrs, err := reconcileLooper.ReconcileIPPools(context.TODO())
deletedIPAddrs, err := reconcileLooper.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
Expect(deletedIPAddrs).To(Equal([]net.IP{net.ParseIP("10.10.10.1")}))
})

It("the IPPool should have only the IP reservation of the live pod", func() {
deletedIPAddrs, err := reconcileLooper.ReconcileIPPools(context.TODO())
deletedIPAddrs, err := reconcileLooper.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
Expect(deletedIPAddrs).NotTo(BeEmpty())

Expand Down Expand Up @@ -190,11 +189,11 @@ var _ = Describe("Whereabouts IP reconciler", func() {

By("initializing the reconciler")
var err error
reconcileLooper, err = NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout)
reconcileLooper, err = NewReconcileLooperWithClient(kubernetes.NewKubernetesClient(wbClient, k8sClientSet))
Expect(err).NotTo(HaveOccurred())

By("reconciling and checking that the correct entry is deleted")
deletedIPAddrs, err := reconcileLooper.ReconcileIPPools(context.TODO())
deletedIPAddrs, err := reconcileLooper.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
Expect(deletedIPAddrs).To(Equal([]net.IP{net.ParseIP("10.10.10.2")}))

Expand Down Expand Up @@ -272,9 +271,9 @@ var _ = Describe("Whereabouts IP reconciler", func() {

It("will delete an orphaned IP address", func() {
Expect(k8sClientSet.CoreV1().Pods(namespace).Delete(context.TODO(), pods[podIndexToRemove].Name, metav1.DeleteOptions{})).NotTo(HaveOccurred())
newReconciler, err := NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout)
newReconciler, err := NewReconcileLooperWithClient(kubernetes.NewKubernetesClient(wbClient, k8sClientSet))
Expect(err).NotTo(HaveOccurred())
Expect(newReconciler.ReconcileOverlappingIPAddresses(context.TODO())).To(Succeed())
Expect(newReconciler.ReconcileOverlappingIPAddresses()).To(Succeed())

expectedClusterWideIPs := 2
clusterWideIPAllocations, err := wbClient.WhereaboutsV1alpha1().OverlappingRangeIPReservations(namespace).List(context.TODO(), metav1.ListOptions{})
Expand Down Expand Up @@ -338,9 +337,9 @@ var _ = Describe("Whereabouts IP reconciler", func() {
})

It("will not delete an IP address that isn't orphaned after running reconciler", func() {
newReconciler, err := NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout)
newReconciler, err := NewReconcileLooperWithClient(kubernetes.NewKubernetesClient(wbClient, k8sClientSet))
Expect(err).NotTo(HaveOccurred())
Expect(newReconciler.ReconcileOverlappingIPAddresses(context.TODO())).To(Succeed())
Expect(newReconciler.ReconcileOverlappingIPAddresses()).To(Succeed())

expectedClusterWideIPs := 1
clusterWideIPAllocations, err := wbClient.WhereaboutsV1alpha1().OverlappingRangeIPReservations(namespace).List(context.TODO(), metav1.ListOptions{})
Expand Down Expand Up @@ -369,12 +368,12 @@ var _ = Describe("Whereabouts IP reconciler", func() {

pool = generateIPPoolSpec(ipRange, namespace, poolName, pod.Name)
wbClient = fakewbclient.NewSimpleClientset(pool)
reconcileLooper, err = NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout)
reconcileLooper, err = NewReconcileLooperWithClient(kubernetes.NewKubernetesClient(wbClient, k8sClientSet))
Expect(err).NotTo(HaveOccurred())
})

It("can be reconciled", func() {
Expect(reconcileLooper.ReconcileIPPools(context.TODO())).NotTo(BeEmpty())
Expect(reconcileLooper.ReconcileIPPools()).NotTo(BeEmpty())
})
})
})
Expand Down Expand Up @@ -410,7 +409,7 @@ var _ = Describe("IPReconciler", func() {
})

It("does not delete anything", func() {
reconciledIPs, err := ipReconciler.ReconcileIPPools(context.TODO())
reconciledIPs, err := ipReconciler.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
Expect(reconciledIPs).To(BeEmpty())
})
Expand Down Expand Up @@ -438,7 +437,7 @@ var _ = Describe("IPReconciler", func() {
})

It("does delete the orphaned IP address", func() {
reconciledIPs, err := ipReconciler.ReconcileIPPools(context.TODO())
reconciledIPs, err := ipReconciler.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
Expect(reconciledIPs).To(Equal([]net.IP{net.ParseIP(firstIPInRange)}))
})
Expand All @@ -458,7 +457,7 @@ var _ = Describe("IPReconciler", func() {
})

It("does delete *only the orphaned* the IP address", func() {
reconciledIPs, err := ipReconciler.ReconcileIPPools(context.TODO())
reconciledIPs, err := ipReconciler.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
Expect(reconciledIPs).To(ConsistOf([]net.IP{net.ParseIP("192.168.14.2")}))
})
Expand Down
46 changes: 17 additions & 29 deletions pkg/reconciler/iploop.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,29 @@ type ReconcileLooper struct {
liveWhereaboutsPods map[string]podWrapper
orphanedIPs []OrphanedIPReservations
orphanedClusterWideIPs []whereaboutsv1alpha1.OverlappingRangeIPReservation
requestTimeout int
}

type OrphanedIPReservations struct {
Pool storage.IPPool
Allocations []types.IPReservation
}

func NewReconcileLooperWithKubeconfig(ctx context.Context, kubeconfigPath string, timeout int) (*ReconcileLooper, error) {
logging.Debugf("NewReconcileLooper - Kubernetes config file located at: %s", kubeconfigPath)
k8sClient, err := kubernetes.NewClientViaKubeconfig(kubeconfigPath, time.Duration(timeout)*time.Second)
if err != nil {
return nil, logging.Errorf("failed to instantiate the Kubernetes client: %+v", err)
}
return NewReconcileLooperWithClient(ctx, k8sClient, timeout)
}

func NewReconcileLooper(ctx context.Context, timeout int) (*ReconcileLooper, error) {
func NewReconcileLooper() (*ReconcileLooper, error) {
logging.Debugf("NewReconcileLooper - inferred connection data")
k8sClient, err := kubernetes.NewClient(time.Duration(timeout) * time.Second)
k8sClient, err := kubernetes.NewClient()
if err != nil {
return nil, logging.Errorf("failed to instantiate the Kubernetes client: %+v", err)
}
return NewReconcileLooperWithClient(ctx, k8sClient, timeout)
return NewReconcileLooperWithClient(k8sClient)
}

func NewReconcileLooperWithClient(ctx context.Context, k8sClient *kubernetes.Client, timeout int) (*ReconcileLooper, error) {
ipPools, err := k8sClient.ListIPPools(ctx)
func NewReconcileLooperWithClient(k8sClient *kubernetes.Client) (*ReconcileLooper, error) {
ipPools, err := k8sClient.ListIPPools()
if err != nil {
return nil, logging.Errorf("failed to retrieve all IP pools: %v", err)
}

pods, err := k8sClient.ListPods(ctx)
pods, err := k8sClient.ListPods()
if err != nil {
return nil, err
}
Expand All @@ -62,14 +52,13 @@ func NewReconcileLooperWithClient(ctx context.Context, k8sClient *kubernetes.Cli
looper := &ReconcileLooper{
k8sClient: *k8sClient,
liveWhereaboutsPods: indexPods(pods, whereaboutsPodRefs),
requestTimeout: timeout,
}

if err := looper.findOrphanedIPsPerPool(ipPools); err != nil {
return nil, err
}

if err := looper.findClusterWideIPReservations(ctx); err != nil {
if err := looper.findClusterWideIPReservations(); err != nil {
return nil, err
}
return looper, nil
Expand Down Expand Up @@ -173,7 +162,7 @@ func composePodRef(pod v1.Pod) string {
return fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName())
}

func (rl ReconcileLooper) ReconcileIPPools(ctx context.Context) ([]net.IP, error) {
func (rl ReconcileLooper) ReconcileIPPools() ([]net.IP, error) {
findAllocationIndex := func(reservation types.IPReservation, reservations []types.IPReservation) int {
for idx, r := range reservations {
if r.PodRef == reservation.PodRef && r.IP.Equal(reservation.IP) {
Expand Down Expand Up @@ -206,21 +195,23 @@ func (rl ReconcileLooper) ReconcileIPPools(ctx context.Context) ([]net.IP, error

if len(cleanedUpIpsPerPool) != 0 {
logging.Debugf("Going to update the reserve list to: %+v", currentIPReservations)

ctx, cancel := context.WithTimeout(context.Background(), storage.RequestTimeout)
if err := orphanedIP.Pool.Update(ctx, currentIPReservations); err != nil {
cancel()
return nil, logging.Errorf("failed to update the reservation list: %v", err)
}

cancel()
totalCleanedUpIps = append(totalCleanedUpIps, cleanedUpIpsPerPool...)
}
}

return totalCleanedUpIps, nil
}

func (rl *ReconcileLooper) findClusterWideIPReservations(ctx context.Context) error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Duration(rl.requestTimeout)*time.Second)
defer cancel()

clusterWideIPReservations, err := rl.k8sClient.ListOverlappingIPs(ctxWithTimeout)
func (rl *ReconcileLooper) findClusterWideIPReservations() error {
clusterWideIPReservations, err := rl.k8sClient.ListOverlappingIPs()
if err != nil {
return logging.Errorf("failed to list all OverLappingIPs: %v", err)
}
Expand All @@ -243,14 +234,11 @@ func (rl *ReconcileLooper) findClusterWideIPReservations(ctx context.Context) er
return nil
}

func (rl ReconcileLooper) ReconcileOverlappingIPAddresses(ctx context.Context) error {
func (rl ReconcileLooper) ReconcileOverlappingIPAddresses() error {
var failedReconciledClusterWideIPs []string

ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Duration(rl.requestTimeout)*time.Second)
defer cancel()

for _, overlappingIPStruct := range rl.orphanedClusterWideIPs {
if err := rl.k8sClient.DeleteOverlappingIP(ctxWithTimeout, &overlappingIPStruct); err != nil {
if err := rl.k8sClient.DeleteOverlappingIP(&overlappingIPStruct); err != nil {
logging.Errorf("failed to remove cluster wide IP: %s", overlappingIPStruct.GetName())
failedReconciledClusterWideIPs = append(failedReconciledClusterWideIPs, overlappingIPStruct.GetName())
continue
Expand Down
Loading

0 comments on commit 7de75c3

Please sign in to comment.