Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion apis/placement/v1beta1/stageupdate_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ type UpdateRunSpec struct {

// The resource snapshot index of the selected resources to be updated across clusters.
// The index represents a group of resource snapshots that includes all the resources a ResourcePlacement selected.
// +kubebuilder:validation:Required
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="resourceSnapshotIndex is immutable"
// +kubebuilder:validation:Optional
ResourceSnapshotIndex string `json:"resourceSnapshotIndex"`

// The name of the update strategy that specifies the stages and the sequence
Expand Down Expand Up @@ -374,6 +374,11 @@ type UpdateRunStatus struct {
// +kubebuilder:validation:Optional
PolicyObservedClusterCount int `json:"policyObservedClusterCount,omitempty"`

// ResourceSnapshotIndexUsed records the resource snapshot index that the update run is based on.
// The index represents the same resource snapshots as specified in the spec field, or the latest.
// +kubbebuilder:validation:Optional
ResourceSnapshotIndexUsed string `json:"resourceSnapshotIndexUsed,omitempty"`

// ApplyStrategy is the apply strategy that the stagedUpdateRun is using.
// It is the same as the apply strategy in the CRP when the staged update run starts.
// The apply strategy is not updated during the update run even if it changes in the CRP.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,6 @@ spec:
type: string
required:
- placementName
- resourceSnapshotIndex
- stagedRolloutStrategyName
type: object
x-kubernetes-validations:
Expand Down Expand Up @@ -1895,6 +1894,11 @@ spec:
All clusters involved in the update run are selected from the list of clusters scheduled by the CRP according
to the current policy.
type: string
resourceSnapshotIndexUsed:
description: |-
ResourceSnapshotIndexUsed records the resource snapshot index that the update run is based on.
The index represents the same resource snapshots as specified in the spec field, or the latest.
type: string
stagedUpdateStrategySnapshot:
description: |-
UpdateStrategySnapshot is the snapshot of the UpdateStrategy used for the update run.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ spec:
type: string
required:
- placementName
- resourceSnapshotIndex
- stagedRolloutStrategyName
type: object
x-kubernetes-validations:
Expand Down Expand Up @@ -815,6 +814,11 @@ spec:
All clusters involved in the update run are selected from the list of clusters scheduled by the CRP according
to the current policy.
type: string
resourceSnapshotIndexUsed:
description: |-
ResourceSnapshotIndexUsed records the resource snapshot index that the update run is based on.
The index represents the same resource snapshots as specified in the spec field, or the latest.
type: string
stagedUpdateStrategySnapshot:
description: |-
UpdateStrategySnapshot is the snapshot of the UpdateStrategy used for the update run.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/updaterun/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (r *Reconciler) executeUpdatingStage(
updateRunSpec := updateRun.GetUpdateRunSpec()
updatingStageStatus := &updateRunStatus.StagesStatus[updatingStageIndex]
// The parse error is ignored because the initialization should have caught it.
resourceIndex, _ := strconv.Atoi(updateRunSpec.ResourceSnapshotIndex)
resourceIndex, _ := strconv.Atoi(updateRunStatus.ResourceSnapshotIndexUsed)
resourceSnapshotName := fmt.Sprintf(placementv1beta1.ResourceSnapshotNameFmt, updateRunSpec.PlacementName, resourceIndex)
updateRunRef := klog.KObj(updateRun)
// Create the map of the toBeUpdatedBindings.
Expand Down
16 changes: 8 additions & 8 deletions pkg/controllers/updaterun/execution_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ var _ = Describe("UpdateRun execution tests - double stages", func() {
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the initialization succeeded and the execution started")
initialized := generateSucceededInitializationStatus(crp, updateRun, policySnapshot, updateStrategy, clusterResourceOverride)
initialized := generateSucceededInitializationStatus(crp, updateRun, testResourceSnapshotIndex, policySnapshot, updateStrategy, clusterResourceOverride)
wantStatus = generateExecutionStartedStatus(updateRun, initialized)
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")

Expand Down Expand Up @@ -521,7 +521,7 @@ var _ = Describe("UpdateRun execution tests - double stages", func() {
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the initialization succeeded and the execution started")
initialized := generateSucceededInitializationStatus(crp, updateRun, policySnapshot, updateStrategy, clusterResourceOverride)
initialized := generateSucceededInitializationStatus(crp, updateRun, testResourceSnapshotIndex, policySnapshot, updateStrategy, clusterResourceOverride)
wantStatus = generateExecutionStartedStatus(updateRun, initialized)
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")

Expand Down Expand Up @@ -680,7 +680,7 @@ var _ = Describe("UpdateRun execution tests - single stage", func() {
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the initialization succeeded and the execution started")
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, policySnapshot, updateStrategy)
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, testResourceSnapshotIndex, policySnapshot, updateStrategy)
wantStatus = generateExecutionStartedStatus(updateRun, initialized)
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")

Expand Down Expand Up @@ -774,7 +774,7 @@ var _ = Describe("UpdateRun execution tests - single stage", func() {
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the initialization succeeded and the execution started")
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, policySnapshot, updateStrategy)
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, testResourceSnapshotIndex, policySnapshot, updateStrategy)
wantStatus = generateExecutionStartedStatus(updateRun, initialized)
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")

Expand Down Expand Up @@ -883,7 +883,7 @@ var _ = Describe("UpdateRun execution tests - single stage", func() {
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the initialization succeeded and the execution started")
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, policySnapshot, updateStrategy)
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, testResourceSnapshotIndex, policySnapshot, updateStrategy)
wantStatus = generateExecutionStartedStatus(updateRun, initialized)
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")

Expand Down Expand Up @@ -1014,7 +1014,7 @@ var _ = Describe("UpdateRun execution tests - single stage", func() {
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the initialization succeeded and the execution started")
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, policySnapshot, updateStrategy)
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, testResourceSnapshotIndex, policySnapshot, updateStrategy)
wantStatus = generateExecutionStartedStatus(updateRun, initialized)
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")

Expand Down Expand Up @@ -1106,7 +1106,7 @@ var _ = Describe("UpdateRun execution tests - single stage", func() {
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the initialization succeeded and the execution started")
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, policySnapshot, updateStrategy)
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, testResourceSnapshotIndex, policySnapshot, updateStrategy)
wantStatus = generateExecutionStartedStatus(updateRun, initialized)
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")

Expand Down Expand Up @@ -1163,7 +1163,7 @@ var _ = Describe("UpdateRun execution tests - single stage", func() {
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())

By("Validating the initialization succeeded and the execution started")
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, policySnapshot, updateStrategy)
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, testResourceSnapshotIndex, policySnapshot, updateStrategy)
wantStatus = generateExecutionStartedStatus(updateRun, initialized)
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")
})
Expand Down
87 changes: 62 additions & 25 deletions pkg/controllers/updaterun/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,30 +471,10 @@ func validateAfterStageTask(tasks []placementv1beta1.StageTask) error {
func (r *Reconciler) recordOverrideSnapshots(ctx context.Context, placementKey types.NamespacedName, updateRun placementv1beta1.UpdateRunObj) error {
updateRunRef := klog.KObj(updateRun)
updateRunSpec := updateRun.GetUpdateRunSpec()
placementName := placementKey.Name

snapshotIndex, err := strconv.Atoi(updateRunSpec.ResourceSnapshotIndex)
if err != nil || snapshotIndex < 0 {
err := controller.NewUserError(fmt.Errorf("invalid resource snapshot index `%s` provided, expected an integer >= 0", updateRunSpec.ResourceSnapshotIndex))
klog.ErrorS(err, "Failed to parse the resource snapshot index", "updateRun", updateRunRef)
// no more retries here.
return fmt.Errorf("%w: %s", errInitializedFailed, err.Error())
}

resourceSnapshotList, err := controller.ListAllResourceSnapshotWithAnIndex(ctx, r.Client, updateRunSpec.ResourceSnapshotIndex, placementName, placementKey.Namespace)
resourceSnapshotObjs, err := r.getResourceSnapshotObjs(ctx, placementKey, updateRun)
if err != nil {
klog.ErrorS(err, "Failed to list the resourceSnapshots associated with the placement",
"placement", placementKey, "resourceSnapshotIndex", snapshotIndex, "updateRun", updateRunRef)
// err can be retried.
return controller.NewAPIServerError(true, err)
}

resourceSnapshotObjs := resourceSnapshotList.GetResourceSnapshotObjs()
if len(resourceSnapshotObjs) == 0 {
err := controller.NewUserError(fmt.Errorf("no resourceSnapshots with index `%d` found for placement `%s`", snapshotIndex, placementKey))
klog.ErrorS(err, "No specified resourceSnapshots found", "updateRun", updateRunRef)
// no more retries here.
return fmt.Errorf("%w: %s", errInitializedFailed, err.Error())
return err
}

// Look for the master resourceSnapshot.
Expand All @@ -509,12 +489,18 @@ func (r *Reconciler) recordOverrideSnapshots(ctx context.Context, placementKey t

// No masterResourceSnapshot found.
if masterResourceSnapshot == nil {
err := controller.NewUnexpectedBehaviorError(fmt.Errorf("no master resourceSnapshot found for placement `%s` with index `%d`", placementKey, snapshotIndex))
err := controller.NewUnexpectedBehaviorError(fmt.Errorf("no master resourceSnapshot found for placement %s", placementKey))
klog.ErrorS(err, "Failed to find master resourceSnapshot", "updateRun", updateRunRef)
// no more retries here.
return fmt.Errorf("%w: %s", errInitializedFailed, err.Error())
}
klog.V(2).InfoS("Found master resourceSnapshot", "placement", placementKey, "index", snapshotIndex, "updateRun", updateRunRef)

klog.V(2).InfoS("Found master resourceSnapshot", "placement", placementKey, "masterResourceSnapshot", masterResourceSnapshot.GetName(), "updateRun", updateRunRef)

// Record the resource snapshot index used.
updateRunStatus := updateRun.GetUpdateRunStatus()
updateRunStatus.ResourceSnapshotIndexUsed = masterResourceSnapshot.GetLabels()[placementv1beta1.ResourceIndexLabel]
updateRun.SetUpdateRunStatus(*updateRunStatus)

resourceSnapshotRef := klog.KObj(masterResourceSnapshot)
// Fetch all the matching overrides.
Expand All @@ -526,7 +512,6 @@ func (r *Reconciler) recordOverrideSnapshots(ctx context.Context, placementKey t
}

// Pick the overrides associated with each target cluster.
updateRunStatus := updateRun.GetUpdateRunStatus()
for _, stageStatus := range updateRunStatus.StagesStatus {
for i := range stageStatus.Clusters {
clusterStatus := &stageStatus.Clusters[i]
Expand All @@ -543,6 +528,58 @@ func (r *Reconciler) recordOverrideSnapshots(ctx context.Context, placementKey t
return nil
}

// getResourceSnapshotObjs retrieves the list of resource snapshot objects from the specified ResourceSnapshotIndex.
// If ResourceSnapshotIndex is unspecified, it returns the list of latest resource snapshots.
func (r *Reconciler) getResourceSnapshotObjs(ctx context.Context, placementKey types.NamespacedName, updateRun placementv1beta1.UpdateRunObj) ([]placementv1beta1.ResourceSnapshotObj, error) {
updateRunRef := klog.KObj(updateRun)
updateRunSpec := updateRun.GetUpdateRunSpec()
var resourceSnapshotObjs []placementv1beta1.ResourceSnapshotObj
if updateRunSpec.ResourceSnapshotIndex != "" {
snapshotIndex, err := strconv.Atoi(updateRunSpec.ResourceSnapshotIndex)
if err != nil || snapshotIndex < 0 {
err := controller.NewUserError(fmt.Errorf("invalid resource snapshot index `%s` provided, expected an integer >= 0", updateRunSpec.ResourceSnapshotIndex))
klog.ErrorS(err, "Failed to parse the resource snapshot index", "updateRun", updateRunRef)
// no more retries here.
return nil, fmt.Errorf("%w: %s", errInitializedFailed, err.Error())
}

resourceSnapshotList, err := controller.ListAllResourceSnapshotWithAnIndex(ctx, r.Client, updateRunSpec.ResourceSnapshotIndex, placementKey.Name, placementKey.Namespace)
if err != nil {
klog.ErrorS(err, "Failed to list the resourceSnapshots associated with the placement",
"placement", placementKey, "resourceSnapshotIndex", snapshotIndex, "updateRun", updateRunRef)
// list err can be retried.
return nil, controller.NewAPIServerError(true, err)
}

resourceSnapshotObjs = resourceSnapshotList.GetResourceSnapshotObjs()
if len(resourceSnapshotObjs) == 0 {
err := controller.NewUserError(fmt.Errorf("no resourceSnapshots with index `%d` found for placement `%s`", snapshotIndex, placementKey))
klog.ErrorS(err, "No specified resourceSnapshots found", "updateRun", updateRunRef)
// no more retries here.
return resourceSnapshotObjs, fmt.Errorf("%w: %s", errInitializedFailed, err.Error())
}
return resourceSnapshotObjs, nil
}

klog.V(2).InfoS("No resource snapshot index specified, fetching latest resource snapshots", "placement", placementKey, "updateRun", updateRunRef)
latestResourceSnapshots, err := controller.ListLatestResourceSnapshots(ctx, r.Client, placementKey)
if err != nil {
klog.ErrorS(err, "Failed to list the latest resourceSnapshots associated with the placement",
"placement", placementKey, "updateRun", updateRunRef)
// list err can be retried.
return nil, controller.NewAPIServerError(true, err)
}

resourceSnapshotObjs = latestResourceSnapshots.GetResourceSnapshotObjs()
if len(resourceSnapshotObjs) == 0 {
err := fmt.Errorf("no latest resourceSnapshots found for placement `%s`. This might be a transient state, need retry", placementKey)
klog.ErrorS(err, "No latest resourceSnapshots found for placement. This might be transient, need retry", "placement", placementKey, "updateRun", updateRunRef)
// retryable error.
return resourceSnapshotObjs, err
}
return resourceSnapshotObjs, nil
}

// recordInitializationSucceeded records the successful initialization condition in the UpdateRun status.
func (r *Reconciler) recordInitializationSucceeded(ctx context.Context, updateRun placementv1beta1.UpdateRunObj) error {
updateRunStatus := updateRun.GetUpdateRunStatus()
Expand Down
Loading
Loading