Skip to content

Commit

Permalink
Align api calls timeouts cronjob ip reconciler
Browse files Browse the repository at this point in the history
Parent timeout context of 30s was removed. All listing operations
used by the cronjob reconciler has 30s as timeout.

Fixes #389

Signed-off-by: Marcelo Guerrero <marguerr@redhat.com>
  • Loading branch information
mlguerrero12 committed Jun 19, 2024
1 parent 638d58d commit fc56b7a
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 97 deletions.
20 changes: 7 additions & 13 deletions cmd/whereabouts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ func AllocateAndReleaseAddressesTest(ipRange string, gw string, kubeconfigPath s
wbClient := *kubernetes.NewKubernetesClient(
fake.NewSimpleClientset(
ipPool(conf.IPRanges[0].Range, podNamespace, ipamNetworkName)),
fakek8sclient.NewSimpleClientset(),
0)
fakek8sclient.NewSimpleClientset())

for i := 0; i < len(expectedAddresses); i++ {
name := fmt.Sprintf("%s-%d", podName, i)
Expand Down Expand Up @@ -164,8 +163,7 @@ var _ = Describe("Whereabouts operations", func() {
fake.NewSimpleClientset(
ipPool(ipamConf.IPRanges[0].Range, podNamespace, ipamNetworkName, []whereaboutstypes.IPReservation{
{PodRef: ipamConf.GetPodRef(), IfName: ifname, IP: net.ParseIP(expectedAddress)}, {PodRef: "test"}}...)),
fakek8sclient.NewSimpleClientset(),
0)
fakek8sclient.NewSimpleClientset())

cniConf, err := newCNINetConf(cniVersion, ipamConf)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -928,8 +926,7 @@ var _ = Describe("Whereabouts operations", func() {
wbClient := *kubernetes.NewKubernetesClient(
fake.NewSimpleClientset(
ipPool(ipamConf.IPRanges[0].Range, podNamespace, ipamConf.NetworkName)),
fakek8sclient.NewSimpleClientset(),
0)
fakek8sclient.NewSimpleClientset())

// allocate 8 IPs (192.168.1.5 - 192.168.1.12); the entirety of the pool defined above
for i := 0; i < 8; i++ {
Expand Down Expand Up @@ -1000,8 +997,7 @@ var _ = Describe("Whereabouts operations", func() {
wbClient := *kubernetes.NewKubernetesClient(
fake.NewSimpleClientset(
ipPool(firstRange, podNamespace, ""), ipPool(secondRange, podNamespace, "")),
fakek8sclient.NewSimpleClientset(),
0)
fakek8sclient.NewSimpleClientset())

// ----------------------------- range 1

Expand Down Expand Up @@ -1124,8 +1120,7 @@ var _ = Describe("Whereabouts operations", func() {
wbClient := *kubernetes.NewKubernetesClient(
fake.NewSimpleClientset(
ipPool(firstRange, podNamespace, ""), ipPool(secondRange, podNamespace, "")),
fakek8sclient.NewSimpleClientset(),
0)
fakek8sclient.NewSimpleClientset())

// ----------------------------- range 1

Expand Down Expand Up @@ -1248,8 +1243,7 @@ var _ = Describe("Whereabouts operations", func() {
wbClient := *kubernetes.NewKubernetesClient(
fake.NewSimpleClientset(
ipPool(firstRange, podNamespace, ""), ipPool(secondRange, podNamespace, "")),
fakek8sclient.NewSimpleClientset(),
0)
fakek8sclient.NewSimpleClientset())

// ----------------------------- range 1

Expand Down Expand Up @@ -1373,7 +1367,7 @@ func newK8sIPAM(containerID, ifName string, ipamConf *whereaboutstypes.IPAMConfi
if err != nil {
return nil
}
k8sIPAM.Client = *kubernetes.NewKubernetesClient(wbClient, k8sCoreClient, 0)
k8sIPAM.Client = *kubernetes.NewKubernetesClient(wbClient, k8sCoreClient)
return k8sIPAM
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controlloop/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (pc *PodController) garbageCollectPodIPs(pod *v1.Pod) error {
if allocation.PodRef == podID(podNamespace, podName) {
logging.Verbosef("stale allocation to cleanup: %+v", allocation)

client := *wbclient.NewKubernetesClient(nil, pc.k8sClient, 0)
client := *wbclient.NewKubernetesClient(nil, pc.k8sClient)
wbClient := &wbclient.KubernetesIPAM{
Client: client,
Config: *ipamConfig,
Expand Down
15 changes: 3 additions & 12 deletions pkg/reconciler/ip.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,20 @@
package reconciler

import (
"context"
"time"

"github.com/k8snetworkplumbingwg/whereabouts/pkg/logging"
)

const (
defaultReconcilerTimeout = 30
)

func ReconcileIPs(errorChan chan error) {
logging.Verbosef("starting reconciler run")
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(defaultReconcilerTimeout*time.Second))
defer cancel()

ipReconcileLoop, err := NewReconcileLooper(ctx, defaultReconcilerTimeout)
ipReconcileLoop, err := NewReconcileLooper()
if err != nil {
_ = logging.Errorf("failed to create the reconcile looper: %v", err)
errorChan <- err
return
}

cleanedUpIps, err := ipReconcileLoop.ReconcileIPPools(ctx)
cleanedUpIps, err := ipReconcileLoop.ReconcileIPPools()
if err != nil {
_ = logging.Errorf("failed to clean up IP for allocations: %v", err)
errorChan <- err
Expand All @@ -36,7 +27,7 @@ func ReconcileIPs(errorChan chan error) {
logging.Debugf("no IP addresses to cleanup")
}

if err := ipReconcileLoop.ReconcileOverlappingIPAddresses(ctx); err != nil {
if err := ipReconcileLoop.ReconcileOverlappingIPAddresses(); err != nil {
errorChan <- err
return
}
Expand Down
35 changes: 17 additions & 18 deletions pkg/reconciler/ip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ var _ = Describe("Whereabouts IP reconciler", func() {
namespace = "default"
networkName = "net1"
podName = "pod1"
timeout = 10
)

var (
Expand Down Expand Up @@ -75,16 +74,16 @@ var _ = Describe("Whereabouts IP reconciler", func() {
Context("reconciling the IPPool", func() {
BeforeEach(func() {
var err error
reconcileLooper, err = NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout)
reconcileLooper, err = NewReconcileLooperWithClient(kubernetes.NewKubernetesClient(wbClient, k8sClientSet))
Expect(err).NotTo(HaveOccurred())
})

It("should report the deleted IP reservation", func() {
Expect(reconcileLooper.ReconcileIPPools(context.TODO())).To(Equal([]net.IP{net.ParseIP("10.10.10.1")}))
Expect(reconcileLooper.ReconcileIPPools()).To(Equal([]net.IP{net.ParseIP("10.10.10.1")}))
})

It("the pool's orphaned IP should be deleted after the reconcile loop", func() {
_, err := reconcileLooper.ReconcileIPPools(context.TODO())
_, err := reconcileLooper.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
poolAfterCleanup, err := wbClient.WhereaboutsV1alpha1().IPPools(namespace).Get(context.TODO(), pool.GetName(), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -139,18 +138,18 @@ var _ = Describe("Whereabouts IP reconciler", func() {
Context("reconciling the IPPool", func() {
BeforeEach(func() {
var err error
reconcileLooper, err = NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout)
reconcileLooper, err = NewReconcileLooperWithClient(kubernetes.NewKubernetesClient(wbClient, k8sClientSet))
Expect(err).NotTo(HaveOccurred())
})

It("should report the dead pod's IP address as deleted", func() {
deletedIPAddrs, err := reconcileLooper.ReconcileIPPools(context.TODO())
deletedIPAddrs, err := reconcileLooper.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
Expect(deletedIPAddrs).To(Equal([]net.IP{net.ParseIP("10.10.10.1")}))
})

It("the IPPool should have only the IP reservation of the live pod", func() {
deletedIPAddrs, err := reconcileLooper.ReconcileIPPools(context.TODO())
deletedIPAddrs, err := reconcileLooper.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
Expect(deletedIPAddrs).NotTo(BeEmpty())

Expand Down Expand Up @@ -190,11 +189,11 @@ var _ = Describe("Whereabouts IP reconciler", func() {

By("initializing the reconciler")
var err error
reconcileLooper, err = NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout)
reconcileLooper, err = NewReconcileLooperWithClient(kubernetes.NewKubernetesClient(wbClient, k8sClientSet))
Expect(err).NotTo(HaveOccurred())

By("reconciling and checking that the correct entry is deleted")
deletedIPAddrs, err := reconcileLooper.ReconcileIPPools(context.TODO())
deletedIPAddrs, err := reconcileLooper.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
Expect(deletedIPAddrs).To(Equal([]net.IP{net.ParseIP("10.10.10.2")}))

Expand Down Expand Up @@ -272,9 +271,9 @@ var _ = Describe("Whereabouts IP reconciler", func() {

It("will delete an orphaned IP address", func() {
Expect(k8sClientSet.CoreV1().Pods(namespace).Delete(context.TODO(), pods[podIndexToRemove].Name, metav1.DeleteOptions{})).NotTo(HaveOccurred())
newReconciler, err := NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout)
newReconciler, err := NewReconcileLooperWithClient(kubernetes.NewKubernetesClient(wbClient, k8sClientSet))
Expect(err).NotTo(HaveOccurred())
Expect(newReconciler.ReconcileOverlappingIPAddresses(context.TODO())).To(Succeed())
Expect(newReconciler.ReconcileOverlappingIPAddresses()).To(Succeed())

expectedClusterWideIPs := 2
clusterWideIPAllocations, err := wbClient.WhereaboutsV1alpha1().OverlappingRangeIPReservations(namespace).List(context.TODO(), metav1.ListOptions{})
Expand Down Expand Up @@ -338,9 +337,9 @@ var _ = Describe("Whereabouts IP reconciler", func() {
})

It("will not delete an IP address that isn't orphaned after running reconciler", func() {
newReconciler, err := NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout)
newReconciler, err := NewReconcileLooperWithClient(kubernetes.NewKubernetesClient(wbClient, k8sClientSet))
Expect(err).NotTo(HaveOccurred())
Expect(newReconciler.ReconcileOverlappingIPAddresses(context.TODO())).To(Succeed())
Expect(newReconciler.ReconcileOverlappingIPAddresses()).To(Succeed())

expectedClusterWideIPs := 1
clusterWideIPAllocations, err := wbClient.WhereaboutsV1alpha1().OverlappingRangeIPReservations(namespace).List(context.TODO(), metav1.ListOptions{})
Expand Down Expand Up @@ -369,12 +368,12 @@ var _ = Describe("Whereabouts IP reconciler", func() {

pool = generateIPPoolSpec(ipRange, namespace, poolName, pod.Name)
wbClient = fakewbclient.NewSimpleClientset(pool)
reconcileLooper, err = NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout)
reconcileLooper, err = NewReconcileLooperWithClient(kubernetes.NewKubernetesClient(wbClient, k8sClientSet))
Expect(err).NotTo(HaveOccurred())
})

It("can be reconciled", func() {
Expect(reconcileLooper.ReconcileIPPools(context.TODO())).NotTo(BeEmpty())
Expect(reconcileLooper.ReconcileIPPools()).NotTo(BeEmpty())
})
})
})
Expand Down Expand Up @@ -410,7 +409,7 @@ var _ = Describe("IPReconciler", func() {
})

It("does not delete anything", func() {
reconciledIPs, err := ipReconciler.ReconcileIPPools(context.TODO())
reconciledIPs, err := ipReconciler.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
Expect(reconciledIPs).To(BeEmpty())
})
Expand Down Expand Up @@ -438,7 +437,7 @@ var _ = Describe("IPReconciler", func() {
})

It("does delete the orphaned IP address", func() {
reconciledIPs, err := ipReconciler.ReconcileIPPools(context.TODO())
reconciledIPs, err := ipReconciler.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
Expect(reconciledIPs).To(Equal([]net.IP{net.ParseIP(firstIPInRange)}))
})
Expand All @@ -458,7 +457,7 @@ var _ = Describe("IPReconciler", func() {
})

It("does delete *only the orphaned* the IP address", func() {
reconciledIPs, err := ipReconciler.ReconcileIPPools(context.TODO())
reconciledIPs, err := ipReconciler.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
Expect(reconciledIPs).To(ConsistOf([]net.IP{net.ParseIP("192.168.14.2")}))
})
Expand Down
46 changes: 17 additions & 29 deletions pkg/reconciler/iploop.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,29 @@ type ReconcileLooper struct {
liveWhereaboutsPods map[string]podWrapper
orphanedIPs []OrphanedIPReservations
orphanedClusterWideIPs []whereaboutsv1alpha1.OverlappingRangeIPReservation
requestTimeout int
}

type OrphanedIPReservations struct {
Pool storage.IPPool
Allocations []types.IPReservation
}

func NewReconcileLooperWithKubeconfig(ctx context.Context, kubeconfigPath string, timeout int) (*ReconcileLooper, error) {
logging.Debugf("NewReconcileLooper - Kubernetes config file located at: %s", kubeconfigPath)
k8sClient, err := kubernetes.NewClientViaKubeconfig(kubeconfigPath, time.Duration(timeout)*time.Second)
if err != nil {
return nil, logging.Errorf("failed to instantiate the Kubernetes client: %+v", err)
}
return NewReconcileLooperWithClient(ctx, k8sClient, timeout)
}

func NewReconcileLooper(ctx context.Context, timeout int) (*ReconcileLooper, error) {
func NewReconcileLooper() (*ReconcileLooper, error) {
logging.Debugf("NewReconcileLooper - inferred connection data")
k8sClient, err := kubernetes.NewClient(time.Duration(timeout) * time.Second)
k8sClient, err := kubernetes.NewClient()
if err != nil {
return nil, logging.Errorf("failed to instantiate the Kubernetes client: %+v", err)
}
return NewReconcileLooperWithClient(ctx, k8sClient, timeout)
return NewReconcileLooperWithClient(k8sClient)
}

func NewReconcileLooperWithClient(ctx context.Context, k8sClient *kubernetes.Client, timeout int) (*ReconcileLooper, error) {
ipPools, err := k8sClient.ListIPPools(ctx)
func NewReconcileLooperWithClient(k8sClient *kubernetes.Client) (*ReconcileLooper, error) {
ipPools, err := k8sClient.ListIPPools()
if err != nil {
return nil, logging.Errorf("failed to retrieve all IP pools: %v", err)
}

pods, err := k8sClient.ListPods(ctx)
pods, err := k8sClient.ListPods()
if err != nil {
return nil, err
}
Expand All @@ -62,14 +52,13 @@ func NewReconcileLooperWithClient(ctx context.Context, k8sClient *kubernetes.Cli
looper := &ReconcileLooper{
k8sClient: *k8sClient,
liveWhereaboutsPods: indexPods(pods, whereaboutsPodRefs),
requestTimeout: timeout,
}

if err := looper.findOrphanedIPsPerPool(ipPools); err != nil {
return nil, err
}

if err := looper.findClusterWideIPReservations(ctx); err != nil {
if err := looper.findClusterWideIPReservations(); err != nil {
return nil, err
}
return looper, nil
Expand Down Expand Up @@ -173,7 +162,7 @@ func composePodRef(pod v1.Pod) string {
return fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName())
}

func (rl ReconcileLooper) ReconcileIPPools(ctx context.Context) ([]net.IP, error) {
func (rl ReconcileLooper) ReconcileIPPools() ([]net.IP, error) {
findAllocationIndex := func(reservation types.IPReservation, reservations []types.IPReservation) int {
for idx, r := range reservations {
if r.PodRef == reservation.PodRef && r.IP.Equal(reservation.IP) {
Expand Down Expand Up @@ -206,21 +195,23 @@ func (rl ReconcileLooper) ReconcileIPPools(ctx context.Context) ([]net.IP, error

if len(cleanedUpIpsPerPool) != 0 {
logging.Debugf("Going to update the reserve list to: %+v", currentIPReservations)

ctx, cancel := context.WithTimeout(context.Background(), storage.RequestTimeout)
if err := orphanedIP.Pool.Update(ctx, currentIPReservations); err != nil {
cancel()
return nil, logging.Errorf("failed to update the reservation list: %v", err)
}

cancel()
totalCleanedUpIps = append(totalCleanedUpIps, cleanedUpIpsPerPool...)
}
}

return totalCleanedUpIps, nil
}

func (rl *ReconcileLooper) findClusterWideIPReservations(ctx context.Context) error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Duration(rl.requestTimeout)*time.Second)
defer cancel()

clusterWideIPReservations, err := rl.k8sClient.ListOverlappingIPs(ctxWithTimeout)
func (rl *ReconcileLooper) findClusterWideIPReservations() error {
clusterWideIPReservations, err := rl.k8sClient.ListOverlappingIPs()
if err != nil {
return logging.Errorf("failed to list all OverLappingIPs: %v", err)
}
Expand All @@ -243,14 +234,11 @@ func (rl *ReconcileLooper) findClusterWideIPReservations(ctx context.Context) er
return nil
}

func (rl ReconcileLooper) ReconcileOverlappingIPAddresses(ctx context.Context) error {
func (rl ReconcileLooper) ReconcileOverlappingIPAddresses() error {
var failedReconciledClusterWideIPs []string

ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Duration(rl.requestTimeout)*time.Second)
defer cancel()

for _, overlappingIPStruct := range rl.orphanedClusterWideIPs {
if err := rl.k8sClient.DeleteOverlappingIP(ctxWithTimeout, &overlappingIPStruct); err != nil {
if err := rl.k8sClient.DeleteOverlappingIP(&overlappingIPStruct); err != nil {
logging.Errorf("failed to remove cluster wide IP: %s", overlappingIPStruct.GetName())
failedReconciledClusterWideIPs = append(failedReconciledClusterWideIPs, overlappingIPStruct.GetName())
continue
Expand Down
Loading

0 comments on commit fc56b7a

Please sign in to comment.