Skip to content

Commit

Permalink
Support cluster-autoscaler 1.30.x yaml status (#297)
Browse files Browse the repository at this point in the history
* Add parsing of yaml status, present since cluster-autoscaler 1.30
* Remove parsing of LongNotStarted due to deprecation since cluster-autoscaler 1.22
  • Loading branch information
antonincms committed Aug 2, 2024
1 parent 95b7daa commit 2d8cd56
Show file tree
Hide file tree
Showing 16 changed files with 825 additions and 234 deletions.
33 changes: 17 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,23 @@ Follow Kubestitute documentation for Helm deployment [here](./helm/kubestitute).
### <a id="Configuration_Optional_args"></a>Optional args

The kubestitute container takes as argument the parameters below.
| Key | Description | Default |
| ----------------------------------------------- | --------------------------------------------------------------------------------------------------------------------- | ------------------------------------ |
| clusterautoscaler-namespace | The namespace the clusterautoscaler belongs to. | kube-system |
| clusterautoscaler-status-name | The names of the clusterautoscaler status configmap. | cluster-autoscaler-status |
| cluster-autoscaler-priority-expander-config-map | The name of the clusterautoscaler priority expander config map. | cluster-autoscaler-priority-expander |
| priority-expander-enabled | Is the PriorityExpander controller enabled. | `false` |
| priority-expander-namespace | The namespace the _unique_ priority expander object belongs to. | kubestitute-system |
| priority-expander-name | The only accepted name for the priority expander object. | priority-expander-default |
| dev | Enable dev mode for logging. | `false` |
| v | Logs verbosity. 0 => panic, 1 => error, 2 => warning, 3 => info, 4 => debug | 3 |
| asg-poll-interval | AutoScaling Groups polling interval (used to generate custom metrics about ASGs). | 30 |
| eviction-timeout | The timeout in seconds for pods eviction on Instance deletion. | 300 |
| instances-max-concurrent-reconciles | The maximum number of concurrent Reconciles which can be run for Instances. | 10 |
| metrics-bind-address | The address the metric endpoint binds to. | :8080 |
| health-probe-bind-address | The address the probe endpoint binds to. | :8081 |
| leader-elect | Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager. | `false` |
| Key | Description | Default |
| ----------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------ |
| clusterautoscaler-namespace | The namespace the clusterautoscaler belongs to. | kube-system |
| clusterautoscaler-status-name | The names of the clusterautoscaler status configmap. | cluster-autoscaler-status |
| cluster-autoscaler-priority-expander-config-map | The name of the clusterautoscaler priority expander config map. | cluster-autoscaler-priority-expander |
| clusterautoscaler-status-legacy-format | Set if the clusterautoscaler status configmap is formatted the legacy readable format, used by cluster-autoscaler up to version 1.29. | `false` |
| priority-expander-enabled | Is the PriorityExpander controller enabled. | `false` |
| priority-expander-namespace | The namespace the _unique_ priority expander object belongs to. | kubestitute-system |
| priority-expander-name | The only accepted name for the priority expander object. | priority-expander-default |
| dev | Enable dev mode for logging. | `false` |
| v | Logs verbosity. 0 => panic, 1 => error, 2 => warning, 3 => info, 4 => debug | 3 |
| asg-poll-interval | AutoScaling Groups polling interval (used to generate custom metrics about ASGs). | 30 |
| eviction-timeout | The timeout in seconds for pods eviction on Instance deletion. | 300 |
| instances-max-concurrent-reconciles | The maximum number of concurrent Reconciles which can be run for Instances. | 10 |
| metrics-bind-address | The address the metric endpoint binds to. | :8080 |
| health-probe-bind-address | The address the probe endpoint binds to. | :8081 |
| leader-elect | Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager. | `false` |

## CustomResourceDefinitions

Expand Down
3 changes: 2 additions & 1 deletion api/v1alpha1/scheduler_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ type IntOrArithmeticOperation struct {
// It is based on ASG health status.
type Field string

// All Field constants
// All Field constants.
// LongNotStarted is deprecated and will always be 0.
const (
FieldReady Field = "Ready"
FieldUnready Field = "Unready"
Expand Down
44 changes: 27 additions & 17 deletions controllers/priorityexpander_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ import (
)

type PriorityExpanderReconcilerConfiguration struct {
ClusterAutoscalerNamespace string
ClusterAutoscalerStatusName string
ClusterAutoscalerPEConfigMapName string
PriorityExpanderNamespace string
PriorityExpanderName string
ClusterAutoscalerNamespace string
ClusterAutoscalerStatusName string
ClusterAutoscalerStatusLegacyFormat bool
ClusterAutoscalerPEConfigMapName string
PriorityExpanderNamespace string
PriorityExpanderName string
}

type PriorityExpanderReconciler struct {
Expand Down Expand Up @@ -125,20 +126,30 @@ func (r *PriorityExpanderReconciler) Reconcile(ctx context.Context, req ctrl.Req
}

// ... and parse it.
status := clusterautoscaler.ParseReadableString(readableStatus)
var status *clusterautoscaler.ClusterAutoscalerStatus
if !r.Configuration.ClusterAutoscalerStatusLegacyFormat {
s, err := clusterautoscaler.ParseYamlStatus(readableStatus)
if err != nil {
log.Error(err, "Unable to parse status configmap yaml content")
return ctrl.Result{}, fmt.Errorf("unable to parse status configmap yaml content: %w", err)
}
status = s
} else {
status = clusterautoscaler.ParseReadableStatus(readableStatus)
}

var oroot = map[string]map[string]int32{}
oroot := map[string]map[string]int32{}
for _, node := range status.NodeGroups {
oroot[node.Name] = make(map[string]int32)
oroot[node.Name]["CloudProviderTarget"] = node.Health.CloudProviderTarget
oroot[node.Name]["Ready"] = node.Health.Ready
oroot[node.Name]["Unready"] = node.Health.Unready
oroot[node.Name]["NotStarted"] = node.Health.NotStarted
oroot[node.Name]["LongNotStarted"] = node.Health.LongNotStarted
oroot[node.Name]["Registered"] = node.Health.Registered
oroot[node.Name]["LongUnregistered"] = node.Health.LongUnregistered
oroot[node.Name]["MinSize"] = node.Health.MinSize
oroot[node.Name]["MaxSize"] = node.Health.MaxSize
oroot[node.Name]["CloudProviderTarget"] = int32(node.Health.CloudProviderTarget)
oroot[node.Name]["Ready"] = int32(node.Health.NodeCounts.Registered.Ready)
oroot[node.Name]["Unready"] = int32(node.Health.NodeCounts.Registered.Unready.Total)
oroot[node.Name]["NotStarted"] = int32(node.Health.NodeCounts.Registered.NotStarted)
oroot[node.Name]["LongNotStarted"] = 0
oroot[node.Name]["Registered"] = int32(node.Health.NodeCounts.Registered.Total)
oroot[node.Name]["LongUnregistered"] = int32(node.Health.NodeCounts.LongUnregistered)
oroot[node.Name]["MinSize"] = int32(node.Health.MinSize)
oroot[node.Name]["MaxSize"] = int32(node.Health.MaxSize)
}

// Create new PriorityExpander template and parse it
Expand Down Expand Up @@ -169,7 +180,6 @@ func (r *PriorityExpanderReconciler) Reconcile(ctx context.Context, req ctrl.Req
}

op, err := ctrl.CreateOrUpdate(ctx, r.Client, &pecm, func() error {

pecm.Data = map[string]string{
"priorities": buf.String(),
}
Expand Down
55 changes: 32 additions & 23 deletions controllers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ const (

// SchedulerReconcilerConfiguration wraps configuration for the SchedulerReconciler.
type SchedulerReconcilerConfiguration struct {
ClusterAutoscalerNamespace string
ClusterAutoscalerStatusName string
ClusterAutoscalerNamespace string
ClusterAutoscalerStatusName string
ClusterAutoscalerStatusLegacyFormat bool
}

// SchedulerReconciler reconciles a Scheduler object
Expand Down Expand Up @@ -138,13 +139,23 @@ func (r *SchedulerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}

// Parse it and retrieve NodeGroups from targets and fallbacks
status := clusterautoscaler.ParseReadableString(readableStatus)
var status *clusterautoscaler.ClusterAutoscalerStatus
if !r.Configuration.ClusterAutoscalerStatusLegacyFormat {
s, err := clusterautoscaler.ParseYamlStatus(readableStatus)
if err != nil {
log.Error(err, "Unable to parse status configmap yaml content")
return ctrl.Result{}, fmt.Errorf("unable to parse status configmap yaml content: %w", err)
}
status = s
} else {
status = clusterautoscaler.ParseReadableStatus(readableStatus)
}

asgTargets := scheduler.Spec.ASGTargets
if len(asgTargets) == 0 {
asgTargets = []string{scheduler.Spec.ASGTarget}
}
targetNodeGroups := make([]clusterautoscaler.NodeGroup, 0, len(asgTargets))
targetNodeGroups := make([]clusterautoscaler.NodeGroupStatus, 0, len(asgTargets))
for _, target := range asgTargets {
targetNodeGroup := clusterautoscaler.GetNodeGroupWithName(status.NodeGroups, target)
if targetNodeGroup == nil {
Expand All @@ -162,12 +173,12 @@ func (r *SchedulerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (

// Update target statuses
for i := range targetNodeGroups {
for _, s := range []clusterautoscaler.ScaleUpStatus{
clusterautoscaler.ScaleUpNeeded,
clusterautoscaler.ScaleUpNotNeeded,
clusterautoscaler.ScaleUpInProgress,
clusterautoscaler.ScaleUpNoActivity,
clusterautoscaler.ScaleUpBackoff,
for _, s := range []clusterautoscaler.ClusterAutoscalerConditionStatus{
clusterautoscaler.ClusterAutoscalerNeeded,
clusterautoscaler.ClusterAutoscalerNotNeeded,
clusterautoscaler.ClusterAutoscalerInProgress,
clusterautoscaler.ClusterAutoscalerNoActivity,
clusterautoscaler.ClusterAutoscalerBackoff,
} {
targetNodeGroupStatus := metrics.SchedulerTargetNodeGroupStatus.With(prometheus.Labels{
"node_group_name": targetNodeGroups[i].Name,
Expand Down Expand Up @@ -277,7 +288,7 @@ func (r *SchedulerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if down > 0 {
scaleDownAllowed := false
for i := range targetNodeGroups {
if targetNodeGroups[i].ScaleUp.Status != clusterautoscaler.ScaleUpBackoff {
if targetNodeGroups[i].ScaleUp.Status != clusterautoscaler.ClusterAutoscalerBackoff {
scaleDownAllowed = true
break
}
Expand Down Expand Up @@ -511,7 +522,7 @@ func getMatchedPolicy(m []matchedPolicy, p corev1alpha1.SchedulerPolicy) *matche
// nodeGroupIntOrFieldValue returns the desired value matching IntOrField.
// Field returns the NodeGroup Field value ans has priority over Int if a valid
// Field is given.
func nodeGroupIntOrFieldValue(ngs []clusterautoscaler.NodeGroup, iof corev1alpha1.IntOrField) int32 {
func nodeGroupIntOrFieldValue(ngs []clusterautoscaler.NodeGroupStatus, iof corev1alpha1.IntOrField) int32 {
if iof.FieldVal == nil {
return iof.IntVal
}
Expand All @@ -520,39 +531,37 @@ func nodeGroupIntOrFieldValue(ngs []clusterautoscaler.NodeGroup, iof corev1alpha
switch *iof.FieldVal {
case corev1alpha1.FieldReady:
for i := range ngs {
val += ngs[i].Health.Ready
val += int32(ngs[i].Health.NodeCounts.Registered.Ready)
}
case corev1alpha1.FieldUnready:
for i := range ngs {
val += ngs[i].Health.Unready
val += int32(ngs[i].Health.NodeCounts.Registered.Unready.Total)
}
case corev1alpha1.FieldNotStarted:
for i := range ngs {
val += ngs[i].Health.NotStarted
val += int32(ngs[i].Health.NodeCounts.Registered.NotStarted)
}
case corev1alpha1.FieldLongNotStarted:
for i := range ngs {
val += ngs[i].Health.LongNotStarted
}
// Field deprecated, do nothing.
case corev1alpha1.FieldRegistered:
for i := range ngs {
val += ngs[i].Health.Registered
val += int32(ngs[i].Health.NodeCounts.Registered.Total)
}
case corev1alpha1.FieldLongUnregistered:
for i := range ngs {
val += ngs[i].Health.LongUnregistered
val += int32(ngs[i].Health.NodeCounts.LongUnregistered)
}
case corev1alpha1.FieldCloudProviderTarget:
for i := range ngs {
val += ngs[i].Health.CloudProviderTarget
val += int32(ngs[i].Health.CloudProviderTarget)
}
}

return val
}

// matchPolicy returns if given NodeGroup match desired Scheduler policy.
func matchPolicy(ngs []clusterautoscaler.NodeGroup, policy corev1alpha1.SchedulerPolicy) bool {
func matchPolicy(ngs []clusterautoscaler.NodeGroupStatus, policy corev1alpha1.SchedulerPolicy) bool {
left := nodeGroupIntOrFieldValue(ngs, policy.LeftOperand)
right := nodeGroupIntOrFieldValue(ngs, policy.RightOperand)

Expand All @@ -576,7 +585,7 @@ func matchPolicy(ngs []clusterautoscaler.NodeGroup, policy corev1alpha1.Schedule
}

// replicas returns the number of required replicas.
func nodeGroupReplicas(ngs []clusterautoscaler.NodeGroup, operation corev1alpha1.IntOrArithmeticOperation) int32 {
func nodeGroupReplicas(ngs []clusterautoscaler.NodeGroupStatus, operation corev1alpha1.IntOrArithmeticOperation) int32 {
if operation.OperationVal == nil {
return operation.IntVal
}
Expand Down
Loading

0 comments on commit 2d8cd56

Please sign in to comment.