Skip to content

Commit

Permalink
add framework.status
Browse files Browse the repository at this point in the history
Signed-off-by: haoqing0110 <qhao@redhat.com>
  • Loading branch information
haoqing0110 committed Apr 28, 2022
1 parent 11539e4 commit 467b5e5
Show file tree
Hide file tree
Showing 18 changed files with 354 additions and 146 deletions.
85 changes: 85 additions & 0 deletions pkg/controllers/framework/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package framework

import (
"errors"
"strings"
)

// Code is the Status code/type which is returned from plugins.
type Code int

// These are predefined codes used in a Status.
const (
// Success means that plugin ran correctly and found placement schedulable.
// NOTE: A nil status is also considered as "Success".
Success Code = iota
// Warning means that plugin ran correctly and found placement schedulable, but with some failures to notice.
Warning
// Error is used for internal plugin errors etc.
Error
// Misconfigured is used for internal plugin configuration errors, unexpected input, etc.
Misconfigured
// Skip is used when a plugin chooses to skip running.
Skip
)

type Status struct {
code Code
// reasons contains the message about status.
reasons []string
// Err contains the error message.
err error
// plugin is an optional field that records the plugin name.
plugin string
}

// Code returns code of the Status.
func (s *Status) Code() Code {
if s == nil {
return Success
}
return s.code
}

// Message returns a concatenated message on reasons of the Status.
func (s *Status) Message() string {
if s == nil {
return ""
}
return strings.Join(s.reasons, ", ")
}

// AppendReason appends given reason to the Status.
func (s *Status) AppendReason(reason string) {
s.reasons = append(s.reasons, reason)
}

// AsError returns nil if the status is a success; otherwise returns an "error" object
// with a concatenated message on reasons of the Status.
func (s *Status) AsError() error {
if s.Code() == Success {
return nil
}
if s.err != nil {
return s.err
}
return errors.New(s.Message())
}

// FailedPlugin returns the failed plugin name.
func (s *Status) Plugin() string {
return s.plugin
}

// NewStatus makes a Status out of the given arguments and returns its pointer.
func NewStatus(plugin string, code Code, reasons ...string) *Status {
s := &Status{
code: code,
reasons: reasons,
plugin: plugin,
}
if code == Error {
s.err = errors.New(s.Message())
}
return s
}
36 changes: 19 additions & 17 deletions pkg/controllers/scheduling/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
clusterlisterv1beta1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1"
clusterapiv1 "open-cluster-management.io/api/cluster/v1"
clusterapiv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
"open-cluster-management.io/placement/pkg/controllers/framework"
"open-cluster-management.io/placement/pkg/plugins"
"open-cluster-management.io/placement/pkg/plugins/addon"
"open-cluster-management.io/placement/pkg/plugins/balance"
Expand All @@ -39,7 +40,7 @@ type Scheduler interface {
ctx context.Context,
placement *clusterapiv1beta1.Placement,
clusters []*clusterapiv1.ManagedCluster,
) (ScheduleResult, error)
) (ScheduleResult, *framework.Status)
}

type ScheduleResult interface {
Expand Down Expand Up @@ -167,8 +168,9 @@ func (s *pluginScheduler) Schedule(
ctx context.Context,
placement *clusterapiv1beta1.Placement,
clusters []*clusterapiv1.ManagedCluster,
) (ScheduleResult, error) {
) (ScheduleResult, *framework.Status) {
filtered := clusters
finalStatus := framework.NewStatus("", framework.Success, "")

results := &scheduleResult{
filteredRecords: map[string][]*clusterapiv1.ManagedCluster{},
Expand All @@ -179,12 +181,13 @@ func (s *pluginScheduler) Schedule(
filterPipline := []string{}

for _, f := range s.filters {
filterResult := f.Filter(ctx, placement, filtered)
filterResult, status := f.Filter(ctx, placement, filtered)
filtered = filterResult.Filtered
err := filterResult.Err

if err != nil {
return nil, err
if status.Code() == framework.Error || status.Code() == framework.Misconfigured {
return nil, status
} else if status.Code() == framework.Warning {
finalStatus = status
}

filterPipline = append(filterPipline, f.Name())
Expand All @@ -197,13 +200,13 @@ func (s *pluginScheduler) Schedule(
// For example, weights is {"Steady": 1, "Balance":1, "AddOn/default/ratio":3}.
weights, err := getWeights(s.prioritizerWeights, placement)
if err != nil {
return nil, err
return nil, framework.NewStatus("", framework.Misconfigured, err.Error())
}

// 2. Generate prioritizers for each placement whose weight != 0.
prioritizers, err := getPrioritizers(weights, s.handle)
if err != nil {
return nil, err
return nil, framework.NewStatus("", framework.Misconfigured, err.Error())
}

// 3. Calculate clusters scores.
Expand All @@ -213,12 +216,13 @@ func (s *pluginScheduler) Schedule(
}
for sc, p := range prioritizers {
// Get cluster score.
scoreResult := p.Score(ctx, placement, filtered)
scoreResult, status := p.Score(ctx, placement, filtered)
score := scoreResult.Scores
err := scoreResult.Err

if err != nil {
return nil, err
if status.Code() == framework.Error || status.Code() == framework.Misconfigured {
return nil, status
} else if status.Code() == framework.Warning {
finalStatus = status
}

// Record prioritizer score and weight
Expand Down Expand Up @@ -260,19 +264,19 @@ func (s *pluginScheduler) Schedule(

// set placement requeue time
for _, f := range s.filters {
if r := f.RequeueAfter(ctx, placement); r.RequeueTime != nil {
if r, _ := f.RequeueAfter(ctx, placement); r.RequeueTime != nil {
newRequeueAfter := time.Until(*r.RequeueTime)
results.requeueAfter = setRequeueAfter(results.requeueAfter, &newRequeueAfter)
}
}
for _, p := range prioritizers {
if r := p.RequeueAfter(ctx, placement); r.RequeueTime != nil {
if r, _ := p.RequeueAfter(ctx, placement); r.RequeueTime != nil {
newRequeueAfter := time.Until(*r.RequeueTime)
results.requeueAfter = setRequeueAfter(results.requeueAfter, &newRequeueAfter)
}
}

return results, nil
return results, finalStatus
}

// makeClusterDecisions selects clusters based on given cluster slice and then creates
Expand Down Expand Up @@ -374,12 +378,10 @@ func getPrioritizers(
WithPrioritizerName(k.BuiltIn).
Build()
default:
//TODO: show the failure in placement.status conditions
return nil, fmt.Errorf("incorrect builtin prioritizer: %s", k.BuiltIn)
}
} else {
if k.AddOn == nil {
//TODO: show the failure in placement.status conditions
return nil, fmt.Errorf("addOn should not be empty")
}
result[k] = addon.NewAddOnPrioritizerBuilder(handle).WithResourceName(k.AddOn.ResourceName).WithScoreName(k.AddOn.ScoreName).Build()
Expand Down
85 changes: 62 additions & 23 deletions pkg/controllers/scheduling/scheduling_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
clusterlisterv1beta1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1"
clusterapiv1 "open-cluster-management.io/api/cluster/v1"
clusterapiv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
"open-cluster-management.io/placement/pkg/controllers/framework"
)

const (
Expand Down Expand Up @@ -284,9 +285,29 @@ func (c *schedulingController) syncPlacement(
}

// schedule placement with scheduler
scheduleResult, err := c.scheduler.Schedule(ctx, placement, clusters)
if err != nil {
return err
scheduleResult, status := c.scheduler.Schedule(ctx, placement, clusters)
misconfiguredCondition := newMisconfiguredCondition(status)
satisfiedCondition := newSatisfiedCondition(
placement.Spec.ClusterSets,
clusterSetNames,
len(bindings),
len(clusters),
len(scheduleResult.Decisions()),
scheduleResult.NumOfUnscheduled(),
status,
)

if status.Code() == framework.Misconfigured || status.Code() == framework.Error {
if err := c.updateStatus(
ctx,
placement,
int32(len(scheduleResult.Decisions())),
misconfiguredCondition,
satisfiedCondition,
); err != nil {
return err
}
return status.AsError()
}

// requeue placement if requeueAfter is defined in scheduleResult
Expand All @@ -306,10 +327,9 @@ func (c *schedulingController) syncPlacement(
return c.updateStatus(
ctx,
placement,
clusterSetNames,
len(bindings),
len(clusters),
scheduleResult,
int32(len(scheduleResult.Decisions())),
satisfiedCondition,
misconfiguredCondition,
)
}

Expand Down Expand Up @@ -432,27 +452,19 @@ func (c *schedulingController) getAvailableClusters(
func (c *schedulingController) updateStatus(
ctx context.Context,
placement *clusterapiv1beta1.Placement,
eligibleClusterSetNames []string,
numOfBindings,
numOfAvailableClusters int,
scheduleResult ScheduleResult,
numberOfSelectedClusters int32,
conditions ...metav1.Condition,
) error {
newPlacement := placement.DeepCopy()
newPlacement.Status.NumberOfSelectedClusters = int32(len(scheduleResult.Decisions()))
newPlacement.Status.NumberOfSelectedClusters = numberOfSelectedClusters

satisfiedCondition := newSatisfiedCondition(
placement.Spec.ClusterSets,
eligibleClusterSetNames,
numOfBindings,
numOfAvailableClusters,
len(scheduleResult.Decisions()),
scheduleResult.NumOfUnscheduled(),
)

meta.SetStatusCondition(&newPlacement.Status.Conditions, satisfiedCondition)
for _, c := range conditions {
meta.SetStatusCondition(&newPlacement.Status.Conditions, c)
}
if reflect.DeepEqual(newPlacement.Status, placement.Status) {
return nil
}

_, err := c.clusterClient.ClusterV1beta1().
Placements(newPlacement.Namespace).
UpdateStatus(ctx, newPlacement, metav1.UpdateOptions{})
Expand All @@ -467,6 +479,7 @@ func newSatisfiedCondition(
numOfAvailableClusters,
numOfFeasibleClusters,
numOfUnscheduledDecisions int,
status *framework.Status,
) metav1.Condition {
condition := metav1.Condition{
Type: clusterapiv1beta1.PlacementConditionSatisfied,
Expand All @@ -490,14 +503,22 @@ func newSatisfiedCondition(
"All ManagedClusterSets [%s] have no member ManagedCluster",
strings.Join(eligibleClusterSets, ","),
)
case status.Code() == framework.Error:
condition.Status = metav1.ConditionFalse
condition.Reason = "NotAllDecisionsScheduled"
condition.Message = status.AsError().Error()
case numOfFeasibleClusters == 0:
condition.Status = metav1.ConditionFalse
condition.Reason = "NoManagedClusterMatched"
condition.Message = "No ManagedCluster matches any of the cluster predicate"
case numOfUnscheduledDecisions == 0:
condition.Status = metav1.ConditionTrue
condition.Reason = "AllDecisionsScheduled"
condition.Message = "All cluster decisions scheduled"
if status.Code() == framework.Warning {
condition.Message = status.AsError().Error()
} else {
condition.Message = "All cluster decisions scheduled"
}
default:
condition.Status = metav1.ConditionFalse
condition.Reason = "NotAllDecisionsScheduled"
Expand All @@ -509,6 +530,24 @@ func newSatisfiedCondition(
return condition
}

func newMisconfiguredCondition(status *framework.Status) metav1.Condition {
if status.Code() == framework.Misconfigured {
return metav1.Condition{
Type: clusterapiv1beta1.PlacementConditionMisconfigured,
Status: metav1.ConditionTrue,
Reason: "Misconfigured",
Message: "Placement configurations check failed",
}
} else {
return metav1.Condition{
Type: clusterapiv1beta1.PlacementConditionMisconfigured,
Status: metav1.ConditionFalse,
Reason: "Succeedconfigured",
Message: "Placement configurations check pass",
}
}
}

// bind updates the cluster decisions in the status of the placementdecisions with the given
// cluster decision slice. New placementdecisions will be created if no one exists.
func (c *schedulingController) bind(
Expand Down
4 changes: 3 additions & 1 deletion pkg/controllers/scheduling/scheduling_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterapiv1 "open-cluster-management.io/api/cluster/v1"
clusterapiv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
"open-cluster-management.io/placement/pkg/controllers/framework"
testinghelpers "open-cluster-management.io/placement/pkg/helpers/testing"
)

Expand All @@ -26,7 +27,7 @@ type testScheduler struct {
func (s *testScheduler) Schedule(ctx context.Context,
placement *clusterapiv1beta1.Placement,
clusters []*clusterapiv1.ManagedCluster,
) (ScheduleResult, error) {
) (ScheduleResult, *framework.Status) {
return s.result, nil
}

Expand Down Expand Up @@ -591,6 +592,7 @@ func TestNewSatisfiedCondition(t *testing.T) {
c.numOfAvailableClusters,
c.numOfFeasibleClusters,
c.numOfUnscheduledDecisions,
nil,
)

if condition.Status != c.expectedStatus {
Expand Down
3 changes: 2 additions & 1 deletion pkg/debugger/debugger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterapiv1 "open-cluster-management.io/api/cluster/v1"
clusterapiv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
"open-cluster-management.io/placement/pkg/controllers/framework"
scheduling "open-cluster-management.io/placement/pkg/controllers/scheduling"
testinghelpers "open-cluster-management.io/placement/pkg/helpers/testing"
)
Expand Down Expand Up @@ -52,7 +53,7 @@ func (r *testResult) NumOfUnscheduled() int {
func (s *testScheduler) Schedule(ctx context.Context,
placement *clusterapiv1beta1.Placement,
clusters []*clusterapiv1.ManagedCluster,
) (scheduling.ScheduleResult, error) {
) (scheduling.ScheduleResult, *framework.Status) {
return s.result, nil
}

Expand Down
Loading

0 comments on commit 467b5e5

Please sign in to comment.