Skip to content

Commit

Permalink
k8s: optimize API calls made to kube-apiserver
Browse files Browse the repository at this point in the history
With stores locally available we don't need to fetch the latest state
from kube-apiserver. Changed all calls to use the stores instead.

Signed-off-by: André Martins <andre@cilium.io>
  • Loading branch information
aanm authored and qmonnet committed Oct 12, 2022
1 parent 694892c commit 69e4c69
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 18 deletions.
10 changes: 9 additions & 1 deletion cilium/cmd/preflight_identity_crd_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,21 @@ func migrateIdentities(clientset k8sClient.Clientset, shutdowner hive.Shutdowner
return nil
}

type NodeGetter struct {
*k8s.K8sClient
*k8s.K8sCiliumClient
}

// initK8s connects to k8s with a allocator.Backend and an initialized
// allocator.Allocator, using the k8s config passed into the command.
func initK8s(ctx context.Context, clientset k8sClient.Clientset) (crdBackend allocator.Backend, crdAllocator *allocator.Allocator) {
log.Info("Setting up kubernetes client")

k8s.SetClients(clientset, clientset.Slim(), clientset, clientset)
if err := k8s.WaitForNodeInformation(ctx, k8s.Client()); err != nil {
if err := k8s.WaitForNodeInformation(ctx, &NodeGetter{
K8sClient: k8s.Client(),
K8sCiliumClient: k8s.CiliumClient(),
}); err != nil {
log.WithError(err).Fatal("Unable to connect to get node spec from apiserver")
}

Expand Down
2 changes: 1 addition & 1 deletion daemon/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ func NewDaemon(ctx context.Context, cleaner *daemonCleanup,
d.ipcache,
d.cgroupManager,
)
nd.RegisterK8sNodeGetter(d.k8sWatcher)
nd.RegisterK8sGetters(d.k8sWatcher)
d.ipcache.RegisterK8sSyncedChecker(&d)

d.k8sWatcher.RegisterNodeSubscriber(d.endpointManager)
Expand Down
10 changes: 10 additions & 0 deletions pkg/k8s/annotate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cilium/cilium/pkg/annotation"
"github.com/cilium/cilium/pkg/cidr"
"github.com/cilium/cilium/pkg/controller"
cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
clientset "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned"
slimclientset "github.com/cilium/cilium/pkg/k8s/slim/k8s/client/clientset/versioned"
"github.com/cilium/cilium/pkg/logging/logfields"
Expand Down Expand Up @@ -151,3 +152,12 @@ func (k8sCli K8sClient) GetK8sNode(ctx context.Context, nodeName string) (*core_

return k8sCli.CoreV1().Nodes().Get(ctx, nodeName, v1.GetOptions{})
}

// GetCiliumNode returns the CiliumNode with the given nodeName.
func (k8sCiliumCli K8sCiliumClient) GetCiliumNode(ctx context.Context, nodeName string) (*cilium_v2.CiliumNode, error) {
if k8sCiliumCli.Interface == nil {
return nil, fmt.Errorf("GetK8sNode: No k8s, cannot access k8s nodes")
}

return k8sCiliumCli.CiliumV2().CiliumNodes().Get(ctx, nodeName, v1.GetOptions{})
}
19 changes: 10 additions & 9 deletions pkg/k8s/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (

"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/cilium/cilium/pkg/backoff"
ipamOption "github.com/cilium/cilium/pkg/ipam/option"
ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
k8sConst "github.com/cilium/cilium/pkg/k8s/constants"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
"github.com/cilium/cilium/pkg/logging/logfields"
Expand All @@ -28,11 +28,12 @@ const (
nodeRetrievalMaxRetries = 15
)

type nodeGetter interface {
type k8sGetter interface {
GetK8sNode(ctx context.Context, nodeName string) (*corev1.Node, error)
GetCiliumNode(ctx context.Context, nodeName string) (*ciliumv2.CiliumNode, error)
}

func waitForNodeInformation(ctx context.Context, nodeGetter nodeGetter, nodeName string) *nodeTypes.Node {
func waitForNodeInformation(ctx context.Context, k8sGetter k8sGetter, nodeName string) *nodeTypes.Node {
backoff := backoff.Exponential{
Min: time.Duration(200) * time.Millisecond,
Max: 2 * time.Minute,
Expand All @@ -41,7 +42,7 @@ func waitForNodeInformation(ctx context.Context, nodeGetter nodeGetter, nodeName
}

for retry := 0; retry < nodeRetrievalMaxRetries; retry++ {
n, err := retrieveNodeInformation(ctx, nodeGetter, nodeName)
n, err := retrieveNodeInformation(ctx, k8sGetter, nodeName)
if err != nil {
log.WithError(err).Warning("Waiting for k8s node information")
backoff.Wait(ctx)
Expand All @@ -54,7 +55,7 @@ func waitForNodeInformation(ctx context.Context, nodeGetter nodeGetter, nodeName
return nil
}

func retrieveNodeInformation(ctx context.Context, nodeGetter nodeGetter, nodeName string) (*nodeTypes.Node, error) {
func retrieveNodeInformation(ctx context.Context, nodeGetter k8sGetter, nodeName string) (*nodeTypes.Node, error) {
requireIPv4CIDR := option.Config.K8sRequireIPv4PodCIDR
requireIPv6CIDR := option.Config.K8sRequireIPv6PodCIDR
// At this point it's not clear whether the device auto-detection will
Expand All @@ -65,7 +66,7 @@ func retrieveNodeInformation(ctx context.Context, nodeGetter nodeGetter, nodeNam
var n *nodeTypes.Node

if option.Config.IPAM == ipamOption.IPAMClusterPool || option.Config.IPAM == ipamOption.IPAMClusterPoolV2 {
ciliumNode, err := CiliumClient().CiliumV2().CiliumNodes().Get(ctx, nodeName, v1.GetOptions{})
ciliumNode, err := nodeGetter.GetCiliumNode(ctx, nodeName)
if err != nil {
// If no CIDR is required, retrieving the node information is
// optional
Expand Down Expand Up @@ -130,9 +131,9 @@ func useNodeCIDR(n *nodeTypes.Node) {

// WaitForNodeInformation retrieves the node information via the CiliumNode or
// Kubernetes Node resource. This function will block until the information is
// received. nodeGetter is a function used to retrieved the node from either
// received. k8sGetter is a function used to retrieve the node from either
// the kube-apiserver or a local cache, depending on the caller.
func WaitForNodeInformation(ctx context.Context, nodeGetter nodeGetter) error {
func WaitForNodeInformation(ctx context.Context, k8sGetter k8sGetter) error {
// Use of the environment variable overwrites the node-name
// automatically derived
nodeName := nodeTypes.GetName()
Expand All @@ -146,7 +147,7 @@ func WaitForNodeInformation(ctx context.Context, nodeGetter nodeGetter) error {
return nil
}

if n := waitForNodeInformation(ctx, nodeGetter, nodeName); n != nil {
if n := waitForNodeInformation(ctx, k8sGetter, nodeName); n != nil {
nodeIP4 := n.GetNodeIP(false)
nodeIP6 := n.GetNodeIP(true)

Expand Down
42 changes: 42 additions & 0 deletions pkg/k8s/watchers/cilium_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
package watchers

import (
"context"
"sync"

k8sErrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"

"github.com/cilium/cilium/pkg/comparator"
Expand Down Expand Up @@ -141,3 +145,41 @@ func (k *K8sWatcher) ciliumNodeInit(ciliumNPClient *k8s.K8sCiliumClient, asyncCo
log.Info("Disconnected from key-value store, restarting CiliumNode watcher")
}
}

// GetCiliumNode returns the CiliumNode "nodeName" from the local store. If the
// local store is not initialized then it will fallback retrieving the node
// from kube-apiserver.
func (k *K8sWatcher) GetCiliumNode(ctx context.Context, nodeName string) (*cilium_v2.CiliumNode, error) {
var (
err error
nodeInterface interface{}
exists, getFromAPIServer bool
)
k.ciliumNodeStoreMU.RLock()
// k.ciliumNodeStore might not be set in all invocations of GetCiliumNode,
// for example, during Cilium initialization GetCiliumNode is called from
// WaitForNodeInformation, which happens before ciliumNodeStore,
// so we will fallback to perform an API request to kube-apiserver.
if k.ciliumNodeStore == nil {
getFromAPIServer = true
} else {
nodeInterface, exists, err = k.ciliumNodeStore.GetByKey(nodeName)
}
k.ciliumNodeStoreMU.RUnlock()

if getFromAPIServer {
// fallback to using the kube-apiserver
return k8s.CiliumClient().CiliumV2().CiliumNodes().Get(ctx, nodeName, v1.GetOptions{})
}

if err != nil {
return nil, err
}
if !exists {
return nil, k8sErrors.NewNotFound(schema.GroupResource{
Group: "cilium",
Resource: "CiliumNode",
}, nodeName)
}
return nodeInterface.(*cilium_v2.CiliumNode).DeepCopy(), nil
}
18 changes: 11 additions & 7 deletions pkg/nodediscovery/nodediscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ const (

var log = logging.DefaultLogger.WithField(logfields.LogSubsys, nodeDiscoverySubsys)

type k8sNodeGetter interface {
type k8sGetters interface {
GetK8sNode(ctx context.Context, nodeName string) (*corev1.Node, error)
GetCiliumNode(ctx context.Context, nodeName string) (*ciliumv2.CiliumNode, error)
}

// NodeDiscovery represents a node discovery action
Expand All @@ -68,7 +69,7 @@ type NodeDiscovery struct {
Registered chan struct{}
localStateInitialized chan struct{}
NetConf *cnitypes.NetConf
k8sNodeGetter k8sNodeGetter
k8sGetters k8sGetters
localNodeLock lock.Mutex
localNode nodeTypes.Node
}
Expand Down Expand Up @@ -354,8 +355,9 @@ func (n *NodeDiscovery) UpdateCiliumNodeResource() {
performUpdate := true
if performGet {
var err error
nodeResource, err = ciliumClient.CiliumV2().CiliumNodes().Get(context.TODO(), nodeTypes.GetName(), metav1.GetOptions{})
nodeResource, err = n.k8sGetters.GetCiliumNode(context.TODO(), nodeTypes.GetName())
if err != nil {
log.WithError(err).Warning("Unable to get node resource")
performUpdate = false
nodeResource = &ciliumv2.CiliumNode{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -388,8 +390,10 @@ func (n *NodeDiscovery) UpdateCiliumNodeResource() {
}
} else {
if _, err := ciliumClient.CiliumV2().CiliumNodes().Create(context.TODO(), nodeResource, metav1.CreateOptions{}); err != nil {
if k8serrors.IsConflict(err) {
if k8serrors.IsConflict(err) || k8serrors.IsAlreadyExists(err) {
log.WithError(err).Warn("Unable to create CiliumNode resource, will retry")
// Backoff before retrying
time.Sleep(500 * time.Millisecond)
continue
}
log.WithError(err).Fatal("Unable to create CiliumNode resource")
Expand Down Expand Up @@ -419,7 +423,7 @@ func (n *NodeDiscovery) mutateNodeResource(nodeResource *ciliumv2.CiliumNode) er
// as this was added in sufficiently earlier versions of Cilium (v1.6).
// Source:
// https://github.com/cilium/cilium/commit/5c365f2c6d7930dcda0b8f0d5e6b826a64022a4f
k8sNode, err := n.k8sNodeGetter.GetK8sNode(
k8sNode, err := n.k8sGetters.GetK8sNode(
context.TODO(),
nodeTypes.GetName(),
)
Expand Down Expand Up @@ -707,8 +711,8 @@ func (n *NodeDiscovery) mutateNodeResource(nodeResource *ciliumv2.CiliumNode) er
return nil
}

func (n *NodeDiscovery) RegisterK8sNodeGetter(k8sNodeGetter k8sNodeGetter) {
n.k8sNodeGetter = k8sNodeGetter
func (n *NodeDiscovery) RegisterK8sGetters(k8sGetters k8sGetters) {
n.k8sGetters = k8sGetters
}

// LocalAllocCIDRsUpdated informs the agent that the local allocation CIDRs have
Expand Down

0 comments on commit 69e4c69

Please sign in to comment.