Skip to content

Commit

Permalink
kube: use paging when List-ing API objects directly
Browse files Browse the repository at this point in the history
Listing when there's a bunch of API objects puts lots of load
on the apiserver. Be nice and use paging.

Signed-off-by: Dan Williams <dcbw@redhat.com>
  • Loading branch information
dcbw authored and jcaamano committed Dec 4, 2023
1 parent ebd0c2a commit 7447607
Show file tree
Hide file tree
Showing 17 changed files with 156 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,8 @@ func (n *NodeController) initSelf(node *kapi.Node, nodeSubnet *net.IPNet) error
return fmt.Errorf("error in initializing/fetching nodes: %v", err)
}

for _, node := range nodes.Items {
for _, node := range nodes {
node := *node
// Add VXLAN tunnel to the remote nodes
if node.Status.NodeInfo.MachineID != n.machineID {
n.AddNode(&node)
Expand All @@ -374,7 +375,8 @@ func (n *NodeController) uninitSelf(node *kapi.Node) error {
}

// Delete VXLAN tunnel to the remote nodes
for _, node := range nodes.Items {
for _, node := range nodes {
node := *node
if node.Status.NodeInfo.MachineID != n.machineID {
n.DeleteNode(&node)
}
Expand Down
24 changes: 14 additions & 10 deletions go-controller/pkg/clustermanager/egressip_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,9 +699,10 @@ func (eIPC *egressIPClusterController) reconcileNonOVNNetworkEIPs(node *v1.Node)
if err != nil {
return fmt.Errorf("unable to list EgressIPs, err: %v", err)
}
reconcileEgressIPs := make([]*egressipv1.EgressIP, 0, len(egressIPs.Items))
reconcileEgressIPs := make([]*egressipv1.EgressIP, 0, len(egressIPs))
eIPC.allocator.Lock()
for _, egressIP := range egressIPs.Items {
for _, egressIP := range egressIPs {
egressIP := *egressIP
for _, status := range egressIP.Status.Items {
if status.Node == node.Name {
egressIPIP := net.ParseIP(status.EgressIP)
Expand Down Expand Up @@ -754,7 +755,8 @@ func (eIPC *egressIPClusterController) addEgressNode(nodeName string) error {
if err != nil {
return fmt.Errorf("unable to list EgressIPs, err: %v", err)
}
for _, egressIP := range egressIPs.Items {
for _, egressIP := range egressIPs {
egressIP := *egressIP
if len(egressIP.Spec.EgressIPs) != len(egressIP.Status.Items) {
// Send a "synthetic update" on all egress IPs which are not fully
// assigned, the reconciliation loop for WatchEgressIP will try to
Expand Down Expand Up @@ -795,7 +797,8 @@ func (eIPC *egressIPClusterController) deleteEgressNode(nodeName string) error {
if err != nil {
return fmt.Errorf("unable to list EgressIPs, err: %v", err)
}
for _, egressIP := range egressIPs.Items {
for _, egressIP := range egressIPs {
egressIP := *egressIP
for _, status := range egressIP.Status.Items {
if status.Node == nodeName {
// Send a "synthetic update" on all egress IPs which have an
Expand Down Expand Up @@ -1559,7 +1562,7 @@ func (eIPC *egressIPClusterController) reconcileCloudPrivateIPConfig(old, new *o
return err
}
for _, resyncEgressIP := range resyncEgressIPs {
if err := eIPC.reconcileEgressIP(nil, &resyncEgressIP); err != nil {
if err := eIPC.reconcileEgressIP(nil, resyncEgressIP); err != nil {
return fmt.Errorf("synthetic update for EgressIP: %s failed, err: %v", egressIP.Name, err)
}
}
Expand Down Expand Up @@ -1661,7 +1664,7 @@ func cloudPrivateIPConfigNameToIPString(name string) string {
// removePendingOps removes the existing pending CloudPrivateIPConfig operations
// from the cache and returns the EgressIP object which can be re-synced given
// the new assignment possibilities.
func (eIPC *egressIPClusterController) removePendingOpsAndGetResyncs(egressIPName, egressIP string) ([]egressipv1.EgressIP, error) {
func (eIPC *egressIPClusterController) removePendingOpsAndGetResyncs(egressIPName, egressIP string) ([]*egressipv1.EgressIP, error) {
eIPC.pendingCloudPrivateIPConfigsMutex.Lock()
defer eIPC.pendingCloudPrivateIPConfigsMutex.Unlock()
ops, pending := eIPC.pendingCloudPrivateIPConfigsOps[egressIPName]
Expand Down Expand Up @@ -1690,8 +1693,9 @@ func (eIPC *egressIPClusterController) removePendingOpsAndGetResyncs(egressIPNam
if err != nil {
return nil, fmt.Errorf("unable to list EgressIPs, err: %v", err)
}
resyncs := make([]egressipv1.EgressIP, 0, len(egressIPs.Items))
for _, egressIP := range egressIPs.Items {
resyncs := make([]*egressipv1.EgressIP, 0, len(egressIPs))
for _, egressIP := range egressIPs {
egressIP := *egressIP
// Do not process the egress IP object which owns the
// CloudPrivateIPConfig for which we are currently processing the
// deletion for.
Expand All @@ -1703,14 +1707,14 @@ func (eIPC *egressIPClusterController) removePendingOpsAndGetResyncs(egressIPNam
// If the EgressIP was never added to the pending cache to begin
// with, but has un-assigned egress IPs, try it.
if !pending && unassigned > 0 {
resyncs = append(resyncs, egressIP)
resyncs = append(resyncs, &egressIP)
continue
}
// If the EgressIP has pending operations, have a look at if the
// unassigned operations superseed the pending ones. It could be
// that it could only execute a couple of assignments at one point.
if pending && unassigned > len(ops) {
resyncs = append(resyncs, egressIP)
resyncs = append(resyncs, &egressIP)
}
}
return resyncs, nil
Expand Down
71 changes: 53 additions & 18 deletions go-controller/pkg/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
kapi "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes"
kv1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/pager"
"k8s.io/klog/v2"
anpclientset "sigs.k8s.io/network-policy-api/pkg/client/clientset/versioned"
)
Expand All @@ -31,8 +33,8 @@ type InterfaceOVN interface {
UpdateEgressIP(eIP *egressipv1.EgressIP) error
PatchEgressIP(name string, patchData []byte) error
GetEgressIP(name string) (*egressipv1.EgressIP, error)
GetEgressIPs() (*egressipv1.EgressIPList, error)
GetEgressFirewalls() (*egressfirewall.EgressFirewallList, error)
GetEgressIPs() ([]*egressipv1.EgressIP, error)
GetEgressFirewalls() ([]*egressfirewall.EgressFirewall, error)
CreateCloudPrivateIPConfig(cloudPrivateIPConfig *ocpcloudnetworkapi.CloudPrivateIPConfig) (*ocpcloudnetworkapi.CloudPrivateIPConfig, error)
UpdateCloudPrivateIPConfig(cloudPrivateIPConfig *ocpcloudnetworkapi.CloudPrivateIPConfig) (*ocpcloudnetworkapi.CloudPrivateIPConfig, error)
DeleteCloudPrivateIPConfig(name string) error
Expand All @@ -53,9 +55,9 @@ type Interface interface {
UpdateNodeStatus(node *kapi.Node) error
UpdatePodStatus(pod *kapi.Pod) error
GetAnnotationsOnPod(namespace, name string) (map[string]string, error)
GetNodes() (*kapi.NodeList, error)
GetNamespaces(labelSelector metav1.LabelSelector) (*kapi.NamespaceList, error)
GetPods(namespace string, labelSelector metav1.LabelSelector) (*kapi.PodList, error)
GetNodes() ([]*kapi.Node, error)
GetNamespaces(labelSelector metav1.LabelSelector) ([]*kapi.Namespace, error)
GetPods(namespace string, opts metav1.ListOptions) ([]*kapi.Pod, error)
GetPod(namespace, name string) (*kapi.Pod, error)
GetNode(name string) (*kapi.Node, error)
Events() kv1core.EventInterface
Expand Down Expand Up @@ -323,19 +325,31 @@ func (k *Kube) GetAnnotationsOnPod(namespace, name string) (map[string]string, e
}

// GetNamespaces returns the list of all Namespace objects matching the labelSelector
func (k *Kube) GetNamespaces(labelSelector metav1.LabelSelector) (*kapi.NamespaceList, error) {
return k.KClient.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{
func (k *Kube) GetNamespaces(labelSelector metav1.LabelSelector) ([]*kapi.Namespace, error) {
list := []*kapi.Namespace{}
err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return k.KClient.CoreV1().Namespaces().List(ctx, opts)
}).EachListItem(context.TODO(), metav1.ListOptions{
LabelSelector: labels.Set(labelSelector.MatchLabels).String(),
ResourceVersion: "0",
}, func(obj runtime.Object) error {
list = append(list, obj.(*kapi.Namespace))
return nil
})
return list, err
}

// GetPods returns the list of all Pod objects in a namespace matching the labelSelector
func (k *Kube) GetPods(namespace string, labelSelector metav1.LabelSelector) (*kapi.PodList, error) {
return k.KClient.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: labels.Set(labelSelector.MatchLabels).String(),
ResourceVersion: "0",
// GetPods returns the list of all Pod objects in a namespace matching the options
func (k *Kube) GetPods(namespace string, opts metav1.ListOptions) ([]*kapi.Pod, error) {
list := []*kapi.Pod{}
opts.ResourceVersion = "0"
err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return k.KClient.CoreV1().Pods(namespace).List(ctx, opts)
}).EachListItem(context.TODO(), opts, func(obj runtime.Object) error {
list = append(list, obj.(*kapi.Pod))
return nil
})
return list, err
}

// GetPod obtains the pod from kubernetes apiserver, given the name and namespace
Expand All @@ -344,10 +358,17 @@ func (k *Kube) GetPod(namespace, name string) (*kapi.Pod, error) {
}

// GetNodes returns the list of all Node objects from kubernetes
func (k *Kube) GetNodes() (*kapi.NodeList, error) {
return k.KClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{
func (k *Kube) GetNodes() ([]*kapi.Node, error) {
list := []*kapi.Node{}
err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return k.KClient.CoreV1().Nodes().List(ctx, opts)
}).EachListItem(context.TODO(), metav1.ListOptions{
ResourceVersion: "0",
}, func(obj runtime.Object) error {
list = append(list, obj.(*kapi.Node))
return nil
})
return list, err
}

// GetNode returns the Node resource from kubernetes apiserver, given its name
Expand Down Expand Up @@ -385,17 +406,31 @@ func (k *KubeOVN) GetEgressIP(name string) (*egressipv1.EgressIP, error) {
}

// GetEgressIPs returns the list of all EgressIP objects from kubernetes
func (k *KubeOVN) GetEgressIPs() (*egressipv1.EgressIPList, error) {
return k.EIPClient.K8sV1().EgressIPs().List(context.TODO(), metav1.ListOptions{
func (k *KubeOVN) GetEgressIPs() ([]*egressipv1.EgressIP, error) {
list := []*egressipv1.EgressIP{}
err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return k.EIPClient.K8sV1().EgressIPs().List(ctx, opts)
}).EachListItem(context.TODO(), metav1.ListOptions{
ResourceVersion: "0",
}, func(obj runtime.Object) error {
list = append(list, obj.(*egressipv1.EgressIP))
return nil
})
return list, err
}

// GetEgressFirewalls returns the list of all EgressFirewall objects from kubernetes
func (k *KubeOVN) GetEgressFirewalls() (*egressfirewall.EgressFirewallList, error) {
return k.EgressFirewallClient.K8sV1().EgressFirewalls(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{
func (k *KubeOVN) GetEgressFirewalls() ([]*egressfirewall.EgressFirewall, error) {
list := []*egressfirewall.EgressFirewall{}
err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return k.EgressFirewallClient.K8sV1().EgressFirewalls(metav1.NamespaceAll).List(ctx, opts)
}).EachListItem(context.TODO(), metav1.ListOptions{
ResourceVersion: "0",
}, func(obj runtime.Object) error {
list = append(list, obj.(*egressfirewall.EgressFirewall))
return nil
})
return list, err
}

func (k *KubeOVN) CreateCloudPrivateIPConfig(cloudPrivateIPConfig *ocpcloudnetworkapi.CloudPrivateIPConfig) (*ocpcloudnetworkapi.CloudPrivateIPConfig, error) {
Expand Down
34 changes: 17 additions & 17 deletions go-controller/pkg/kube/mocks/Interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 7447607

Please sign in to comment.