Skip to content

Commit

Permalink
Merge pull request #37 from openshift-cloud-team/rebase-bot-master
Browse files Browse the repository at this point in the history
  • Loading branch information
openshift-merge-bot[bot] committed Jan 11, 2024
2 parents 5ff9de1 + a430a61 commit 97e8335
Show file tree
Hide file tree
Showing 2,039 changed files with 100,963 additions and 41,460 deletions.
3 changes: 0 additions & 3 deletions cmd/manager/main.go
Expand Up @@ -36,7 +36,6 @@ func printVersion() {
}

func main() {
ctrl.SetLogger(klogr.New())
printVersion()

err := ctrlCfg.ControllerCFG.LoadControllerConfig()
Expand All @@ -46,8 +45,6 @@ func main() {
}
ctrl.SetLogger(klogr.New().V(ctrlCfg.ControllerCFG.LogLevel))

printVersion()

// Get a config to talk to the api-server
cfg := config.GetConfigOrDie()
cfg.QPS = ctrlCfg.ControllerCFG.RuntimeConfig.QPS
Expand Down
187 changes: 99 additions & 88 deletions go.mod

Large diffs are not rendered by default.

406 changes: 208 additions & 198 deletions go.sum

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pkg/config/controller_config.go
Expand Up @@ -27,6 +27,7 @@ const (
flagServiceMaxConcurrentReconciles = "concurrent-service-syncs"
flagRouteReconciliationPeriod = "route-reconciliation-period"
flagNodeMonitorPeriod = "node-monitor-period"
flagServerGroupBatchSize = "sg-batch-size"
flagNetwork = "network"

defaultCloudProvider = "alibabacloud"
Expand All @@ -36,6 +37,7 @@ const (
defaultCloudConfig = ""
defaultRouteReconciliationPeriod = 5 * time.Minute
defaultNodeMonitorPeriod = 5 * time.Minute
defaultServerGroupBatchSize = 40
defaultNetwork = "vpc"
)

Expand All @@ -50,6 +52,7 @@ type ControllerConfig struct {
Controllers []string
FeatureGates string
ServiceMaxConcurrentReconciles int
ServerGroupBatchSize int
LogLevel int
DryRun bool
NetWork string
Expand All @@ -75,6 +78,7 @@ func (cfg *ControllerConfig) BindFlags(fs *pflag.FlagSet) {
fs.DurationVar(&cfg.RouteReconciliationPeriod.Duration, flagRouteReconciliationPeriod, defaultRouteReconciliationPeriod,
"The period for reconciling routes created for nodes by cloud provider. The minimum value is 1 minute")
fs.DurationVar(&cfg.NodeMonitorPeriod.Duration, flagNodeMonitorPeriod, defaultNodeMonitorPeriod, "The period for syncing NodeStatus in NodeController.")
fs.IntVar(&cfg.ServerGroupBatchSize, flagServerGroupBatchSize, defaultServerGroupBatchSize, "The batch size for syncing server group. The value range is 1-40")
fs.StringVar(&cfg.FeatureGates, flagFeatureGates, "", "A set of key=value pairs that describe feature gates for alpha/experimental features.")
fs.BoolVar(&cfg.AllowUntaggedCloud, "allow-untagged-cloud", false, "Allow the cluster to run without the cluster-id on cloud instances. This is a legacy mode of operation and a cluster-id will be required in the future.")
_ = fs.MarkDeprecated("allow-untagged-cloud", "This flag is deprecated and will be removed in a future release. A cluster-id will be required on cloud instances.")
Expand Down
26 changes: 17 additions & 9 deletions pkg/config/runtime_config.go
Expand Up @@ -2,13 +2,17 @@ package config

import (
"github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
"time"
)

const (
flagMetricsBindAddr = "metrics-bind-addr"
flagHealthProbeBindAddr = "health-probe-bind-addr"
flagPprofBindAddr = "pprof-bind-addr"
flagQPS = "kube-api-qps"
flagBurst = "kube-api-burst"
flagLeaderElect = "leader-elect"
Expand All @@ -18,18 +22,17 @@ const (
flagLeaderElectResourceName = "leader-elect-resource-name"
flagLeaderElectResourceNamespace = "leader-elect-resource-namespace"
flagLeaderElectRetryPeriod = "leader-elect-retry-period"
flagSyncPeriod = "sync-period"

defaultMetricsAddr = ":8080"
defaultHealthProbeBindAddress = ":10258"
defaultHealthProbeBindAddr = ":10258"
defaultPprofBindAddr = ":6060"
defaultLeaderElect = true
defaultLeaderElectLeaseDuration = 15 * time.Second
defaultElectRenewDeadline = 10 * time.Second
defaultLeaderElectRetryPeriod = 2 * time.Second
defaultLeaderElectResourceLock = "leases"
defaultLeaderElectResourceName = "ccm"
defaultLeaderElectResourceNamespace = "kube-system"
defaultSyncPeriod = 60 * time.Minute
defaultQPS = 20.0
defaultBurst = 30
)
Expand All @@ -38,21 +41,23 @@ const (
type RuntimeConfig struct {
MetricsBindAddress string
HealthProbeBindAddress string
PprofBindAddress string
LeaderElect bool
LeaderElectLeaseDuration time.Duration
LeaderElectRenewDeadline time.Duration
LeaderElectRetryPeriod time.Duration
LeaderElectResourceLock string
LeaderElectResourceName string
LeaderElectResourceNamespace string
SyncPeriod time.Duration
QPS float32
Burst int
}

func (c *RuntimeConfig) BindFlags(fs *pflag.FlagSet) {
fs.StringVar(&c.MetricsBindAddress, flagMetricsBindAddr, defaultMetricsAddr, "The address the metric endpoint binds to.")
fs.StringVar(&c.HealthProbeBindAddress, flagHealthProbeBindAddr, defaultHealthProbeBindAddress, "The address the health probes binds to.")
fs.StringVar(&c.HealthProbeBindAddress, flagHealthProbeBindAddr, defaultHealthProbeBindAddr, "The address the health probes binds to.")
fs.StringVar(&c.PprofBindAddress, flagPprofBindAddr, defaultPprofBindAddr,
"The address that the controller should bind to for serving pprof. It can be set to '' or 0 to disable the pprof serving.")
fs.Float32Var(&c.QPS, flagQPS, defaultQPS, "QPS to use while talking with kubernetes apiserver.")
fs.IntVar(&c.Burst, flagBurst, defaultBurst, "Burst to use while talking with kubernetes apiserver.")
fs.BoolVar(&c.LeaderElect, flagLeaderElect, defaultLeaderElect,
Expand All @@ -74,22 +79,25 @@ func (c *RuntimeConfig) BindFlags(fs *pflag.FlagSet) {
"The name of resource object that is used for locking during leader election. ")
fs.StringVar(&c.LeaderElectResourceNamespace, flagLeaderElectResourceNamespace, defaultLeaderElectResourceNamespace,
"The namespace of resource object that is used for locking during leader election.")
fs.DurationVar(&c.SyncPeriod, flagSyncPeriod, defaultSyncPeriod,
"Period at which the controller forces the repopulation of its local object stores.")

}

func BuildRuntimeOptions(rtCfg RuntimeConfig) manager.Options {
return manager.Options{
ClientDisableCacheFor: []client.Object{
&v1.Node{},
&v1.Service{},
&v1.Endpoints{},
&discovery.EndpointSlice{},
},
MetricsBindAddress: rtCfg.MetricsBindAddress,
HealthProbeBindAddress: rtCfg.HealthProbeBindAddress,
PprofBindAddress: rtCfg.PprofBindAddress,
LeaderElection: rtCfg.LeaderElect,
LeaderElectionID: rtCfg.LeaderElectResourceName,
LeaderElectionResourceLock: rtCfg.LeaderElectResourceLock,
LeaderElectionNamespace: rtCfg.LeaderElectResourceNamespace,
LeaseDuration: &rtCfg.LeaderElectLeaseDuration,
RenewDeadline: &rtCfg.LeaderElectRenewDeadline,
RetryPeriod: &rtCfg.LeaderElectRetryPeriod,
SyncPeriod: &rtCfg.SyncPeriod,
}
}
2 changes: 1 addition & 1 deletion pkg/controller/helper/endpoint_utils.go
Expand Up @@ -2,7 +2,7 @@ package helper

import (
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
discovery "k8s.io/api/discovery/v1"
"strings"
)

Expand Down
1 change: 1 addition & 0 deletions pkg/controller/helper/event.go
Expand Up @@ -13,6 +13,7 @@ const (
FailedRemoveHash = "FailedRemoveHash"
FailedUpdateStatus = "FailedUpdateStatus"
UnAvailableBackends = "UnAvailableLoadBalancer"
SkipSyncBackends = "SkipSyncBackends"
FailedSyncLB = "SyncLoadBalancerFailed"
SucceedCleanLB = "CleanLoadBalancer"
FailedCleanLB = "CleanLoadBalancerFailed"
Expand Down
24 changes: 23 additions & 1 deletion pkg/controller/helper/node_utils.go
Expand Up @@ -133,7 +133,6 @@ func FindNodeByNodeName(nodes []v1.Node, nodeName string) *v1.Node {
return &n
}
}
klog.Infof("node %s not found ", nodeName)
return nil
}

Expand Down Expand Up @@ -189,3 +188,26 @@ func IsNodeExcludeFromEdgeLoadBalancer(node *v1.Node) bool {
}
return false
}

func GetNodeInternalIP(node *v1.Node) (string, error) {
if len(node.Status.Addresses) == 0 {
return "", fmt.Errorf("node %s do not contains addresses", node.Name)
}
for _, addr := range node.Status.Addresses {
if addr.Type == v1.NodeInternalIP {
return addr.Address, nil
}
}
return "", fmt.Errorf("node %s can not find InternalIP in node addresses", node.Name)
}

func NodeInfo(node *v1.Node) string {
if node == nil {
return ""
}
pNode := node.DeepCopy()
pNode.ManagedFields = nil
pNode.Status.Images = nil
jsonByte, _ := json.Marshal(pNode)
return string(jsonByte)
}
6 changes: 3 additions & 3 deletions pkg/controller/ingress/alb_controller.go
Expand Up @@ -130,14 +130,14 @@ type albconfigReconciler struct {
maxConcurrentReconciles int
}

func (g *albconfigReconciler) setupWatches(_ context.Context, c controller.Controller) error {
func (g *albconfigReconciler) setupWatches(_ context.Context, c controller.Controller, mgr manager.Manager) error {
g.acEventChan = make(chan event.GenericEvent)
acEventHandler := NewEnqueueRequestsForAlbconfigEvent(g.k8sClient, g.eventRecorder, g.logger)
if err := c.Watch(&source.Channel{Source: g.acEventChan}, acEventHandler); err != nil {
return err
}

if err := c.Watch(&source.Kind{Type: &v1.AlbConfig{}}, acEventHandler); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &v1.AlbConfig{}), acEventHandler); err != nil {
return err
}

Expand All @@ -153,7 +153,7 @@ func (g *albconfigReconciler) SetupWithManager(ctx context.Context, mgr manager.
return err
}

if err := g.setupWatches(ctx, c); err != nil {
if err := g.setupWatches(ctx, c, mgr); err != nil {
return err
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/ingress/event_handler.go
@@ -1,6 +1,7 @@
package ingress

import (
"context"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -30,15 +31,15 @@ type enqueueRequestsForAlbconfigEvent struct {
logger logr.Logger
}

func (h *enqueueRequestsForAlbconfigEvent) Create(e event.CreateEvent, queue workqueue.RateLimitingInterface) {
func (h *enqueueRequestsForAlbconfigEvent) Create(_ context.Context, e event.CreateEvent, queue workqueue.RateLimitingInterface) {
albconfig, ok := e.Object.(*v1.AlbConfig)
if ok {
h.logger.Info("controller: albconfig Create event", "albconfig", util.NamespacedName(albconfig).String())
h.enqueueAlbconfig(queue, albconfig)
}
}

func (h *enqueueRequestsForAlbconfigEvent) Update(e event.UpdateEvent, queue workqueue.RateLimitingInterface) {
func (h *enqueueRequestsForAlbconfigEvent) Update(_ context.Context, e event.UpdateEvent, queue workqueue.RateLimitingInterface) {
albconfigOld := e.ObjectOld.(*v1.AlbConfig)
albconfigNew := e.ObjectNew.(*v1.AlbConfig)

Expand All @@ -52,10 +53,10 @@ func (h *enqueueRequestsForAlbconfigEvent) Update(e event.UpdateEvent, queue wor
h.enqueueAlbconfig(queue, albconfigNew)
}

func (h *enqueueRequestsForAlbconfigEvent) Delete(e event.DeleteEvent, queue workqueue.RateLimitingInterface) {
func (h *enqueueRequestsForAlbconfigEvent) Delete(_ context.Context, e event.DeleteEvent, queue workqueue.RateLimitingInterface) {
}

func (h *enqueueRequestsForAlbconfigEvent) Generic(e event.GenericEvent, queue workqueue.RateLimitingInterface) {
func (h *enqueueRequestsForAlbconfigEvent) Generic(_ context.Context, e event.GenericEvent, queue workqueue.RateLimitingInterface) {
albconfig, ok := e.Object.(*v1.AlbConfig)
if ok {
h.logger.Info("controller: albconfig Generic event", "albconfig", util.NamespacedName(albconfig).String())
Expand Down
48 changes: 48 additions & 0 deletions pkg/controller/node/event_handler.go
@@ -0,0 +1,48 @@
package node

import (
"context"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type enqueueRequestForNodeEvent struct{}

var _ handler.EventHandler = (*enqueueRequestForNodeEvent)(nil)

// NewEnqueueRequestForNodeEvent, event handler for node event
func NewEnqueueRequestForNodeEvent() *enqueueRequestForNodeEvent {
return &enqueueRequestForNodeEvent{}
}

func (h *enqueueRequestForNodeEvent) Create(_ context.Context, e event.CreateEvent, queue workqueue.RateLimitingInterface) {
node, ok := e.Object.(*v1.Node)
if ok && needAdd(node) {
queue.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: node.Name,
},
})
}
}

func (h *enqueueRequestForNodeEvent) Update(_ context.Context, e event.UpdateEvent, queue workqueue.RateLimitingInterface) {

}

func (h *enqueueRequestForNodeEvent) Delete(_ context.Context, e event.DeleteEvent, queue workqueue.RateLimitingInterface) {

}

func (h *enqueueRequestForNodeEvent) Generic(_ context.Context, e event.GenericEvent, queue workqueue.RateLimitingInterface) {

}

func needAdd(node *v1.Node) bool {
cloudTaint := findCloudTaint(node.Spec.Taints)
return cloudTaint != nil
}
15 changes: 8 additions & 7 deletions pkg/controller/node/node_controller.go
Expand Up @@ -13,12 +13,12 @@ import (
"k8s.io/cloud-provider-alibaba-cloud/pkg/context/shared"
"k8s.io/cloud-provider-alibaba-cloud/pkg/controller/helper"
"k8s.io/cloud-provider-alibaba-cloud/pkg/provider"
"k8s.io/cloud-provider-alibaba-cloud/pkg/util"
"k8s.io/cloud-provider-alibaba-cloud/pkg/util/metric"
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
Expand Down Expand Up @@ -73,7 +73,7 @@ func add(mgr manager.Manager, r *ReconcileNode) error {
}

// Watch for changes to primary resource AutoRepair
if err := c.Watch(&source.Kind{Type: &corev1.Node{}}, &handler.EnqueueRequestForObject{}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Node{}), NewEnqueueRequestForNodeEvent()); err != nil {
return err
}

Expand Down Expand Up @@ -101,22 +101,21 @@ type ReconcileNode struct {
record record.EventRecorder
}

func (m *ReconcileNode) Reconcile(
ctx context.Context, request reconcile.Request,
) (reconcile.Result, error) {
klog.V(5).Infof("reconcile node: %s", request.NamespacedName)
func (m *ReconcileNode) Reconcile(_ context.Context, request reconcile.Request) (reconcile.Result, error) {
log.Info("reconcile node", "node", request.NamespacedName)
node := &corev1.Node{}
err := m.client.Get(context.TODO(), request.NamespacedName, node)
if err != nil {
if errors.IsNotFound(err) {
klog.Infof("node not found, skip")
log.Info("node not found, skip", "node", request.NamespacedName)
// Request object not found, could have been deleted
// after reconcile request.
// Owned objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
// Return and don't requeue
return reconcile.Result{}, nil
}
log.Error(err, "get node error", "node", request.NamespacedName)
return reconcile.Result{}, err
}
return reconcile.Result{}, m.syncCloudNode(node)
Expand Down Expand Up @@ -216,12 +215,14 @@ func (m *ReconcileNode) syncNode(nodes []corev1.Node) error {
}

if cloudNode == nil {
log.V(5).Info(util.PrettyJson(helper.NodeInfo(node)))
// if cloud node has been deleted, try to delete node from cluster
condition := nodeConditionReady(m.client, node)
if condition != nil && condition.Status == corev1.ConditionUnknown {
log.Info("node is NotReady and cloud node can not found by prvdId, try to delete node from cluster ", "node", node.Name, "prvdId", node.Spec.ProviderID)
// ignore error, retry next loop
deleteNode(m, node)
continue
}

log.Info("cloud node not found by prvdId, skip update node address", "node", node.Name, "prvdId", node.Spec.ProviderID)
Expand Down

0 comments on commit 97e8335

Please sign in to comment.