Skip to content

Commit

Permalink
Merge pull request #376 from csrwng/fix_exclude_clusteroperators_44
Browse files Browse the repository at this point in the history
Bug 1841239: Avoid pre-creating clusteroperators that should be excluded
  • Loading branch information
openshift-merge-robot committed Sep 12, 2020
2 parents e84a52a + 44b7cb9 commit 9515b23
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 24 deletions.
2 changes: 1 addition & 1 deletion hack/cluster-version-util/task_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func newTaskGraphCmd() *cobra.Command {

func runTaskGraphCmd(cmd *cobra.Command, args []string) error {
manifestDir := args[0]
release, err := payload.LoadUpdate(manifestDir, "")
release, err := payload.LoadUpdate(manifestDir, "", "")
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions lib/resourcebuilder/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const (
UpdatingMode Mode = iota
ReconcilingMode
InitializingMode
PrecreatingMode
)

type Interface interface {
Expand Down
10 changes: 8 additions & 2 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (vcb *verifyClientBuilder) HTTPClient() (*http.Client, error) {
// controller that loads and applies content to the cluster. It returns an error if the payload appears to
// be in error rather than continuing.
func (optr *Operator) InitializeFromPayload(restConfig *rest.Config, burstRestConfig *rest.Config) error {
update, err := payload.LoadUpdate(optr.defaultPayloadDir(), optr.releaseImage)
update, err := payload.LoadUpdate(optr.defaultPayloadDir(), optr.releaseImage, optr.exclude)
if err != nil {
return fmt.Errorf("the local release contents are invalid - no current version can be determined from disk: %v", err)
}
Expand Down Expand Up @@ -663,7 +663,11 @@ func (b *resourceBuilder) builderFor(m *lib.Manifest, state payload.State) (reso
}

if b.clusterOperators != nil && m.GVK == configv1.SchemeGroupVersion.WithKind("ClusterOperator") {
return cvointernal.NewClusterOperatorBuilder(b.clusterOperators, *m), nil
client, err := clientset.NewForConfig(config)
if err != nil {
return nil, err
}
return cvointernal.NewClusterOperatorBuilder(b.clusterOperators, client.ConfigV1().ClusterOperators(), *m), nil
}
if resourcebuilder.Mapper.Exists(m.GVK) {
return resourcebuilder.New(resourcebuilder.Mapper, config, *m)
Expand Down Expand Up @@ -694,6 +698,8 @@ func stateToMode(state payload.State) resourcebuilder.Mode {
return resourcebuilder.UpdatingMode
case payload.ReconcilingPayload:
return resourcebuilder.ReconcilingMode
case payload.PrecreatingPayload:
return resourcebuilder.PrecreatingMode
default:
panic(fmt.Sprintf("unexpected payload state %d", int(state)))
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cvo/cvo_scenarios_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cvo-loop-test"),
client: client,
cvLister: &clientCVLister{client: client},
exclude: "exclude-test",
}

dynamicScheme := runtime.NewScheme()
Expand All @@ -90,7 +91,7 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak
wait.Backoff{
Steps: 1,
},
"",
"exclude-test",
)
o.configSync = worker

Expand Down
42 changes: 32 additions & 10 deletions pkg/cvo/internal/operatorstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"
"unicode"

kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand Down Expand Up @@ -48,16 +49,16 @@ func readClusterOperatorV1OrDie(objBytes []byte) *configv1.ClusterOperator {
}

type clusterOperatorBuilder struct {
client ClusterOperatorsGetter
raw []byte
modifier resourcebuilder.MetaV1ObjectModifierFunc
mode resourcebuilder.Mode
client ClusterOperatorsGetter
createClient configclientv1.ClusterOperatorInterface
raw []byte
modifier resourcebuilder.MetaV1ObjectModifierFunc
mode resourcebuilder.Mode
}

func newClusterOperatorBuilder(config *rest.Config, m lib.Manifest) resourcebuilder.Interface {
return NewClusterOperatorBuilder(clientClusterOperatorsGetter{
getter: configclientv1.NewForConfigOrDie(config).ClusterOperators(),
}, m)
client := configclientv1.NewForConfigOrDie(config).ClusterOperators()
return NewClusterOperatorBuilder(clientClusterOperatorsGetter{getter: client}, client, m)
}

// ClusterOperatorsGetter abstracts object access with a client or a cache lister.
Expand All @@ -75,10 +76,11 @@ func (g clientClusterOperatorsGetter) Get(name string) (*configv1.ClusterOperato

// NewClusterOperatorBuilder accepts the ClusterOperatorsGetter interface which may be implemented by a
// client or a lister cache.
func NewClusterOperatorBuilder(client ClusterOperatorsGetter, m lib.Manifest) resourcebuilder.Interface {
func NewClusterOperatorBuilder(client ClusterOperatorsGetter, createClient configclientv1.ClusterOperatorInterface, m lib.Manifest) resourcebuilder.Interface {
return &clusterOperatorBuilder{
client: client,
raw: m.Raw,
client: client,
createClient: createClient,
raw: m.Raw,
}
}

Expand All @@ -97,6 +99,26 @@ func (b *clusterOperatorBuilder) Do(ctx context.Context) error {
if b.modifier != nil {
b.modifier(os)
}

// create the object, and if we successfully created, update the status
if b.mode == resourcebuilder.PrecreatingMode {
clusterOperator, err := b.createClient.Create(os)
if err != nil {
if kerrors.IsAlreadyExists(err) {
return nil
}
return err
}
clusterOperator.Status.RelatedObjects = os.Status.DeepCopy().RelatedObjects
if _, err := b.createClient.UpdateStatus(clusterOperator); err != nil {
if kerrors.IsConflict(err) {
return nil
}
return err
}
return nil
}

return waitForOperatorStatusToBeDone(ctx, 1*time.Second, b.client, os, b.mode)
}

Expand Down
34 changes: 27 additions & 7 deletions pkg/cvo/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
return err
}

payloadUpdate, err := payload.LoadUpdate(info.Directory, update.Image)
payloadUpdate, err := payload.LoadUpdate(info.Directory, update.Image, w.exclude)
if err != nil {
reporter.Report(SyncWorkerStatus{
Generation: work.Generation,
Expand Down Expand Up @@ -585,12 +585,14 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
}
graph := payload.NewTaskGraph(tasks)
graph.Split(payload.SplitOnJobs)
var precreateObjects bool
switch work.State {
case payload.InitializingPayload:
// Create every component in parallel to maximize reaching steady
// state.
graph.Parallelize(payload.FlattenByNumberAndComponent)
maxWorkers = len(graph.Nodes)
precreateObjects = true
case payload.ReconcilingPayload:
// Run the graph in random order during reconcile so that we don't
// hang on any particular component - we seed from the number of
Expand All @@ -608,6 +610,28 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
// perform an orderly roll out by payload order, using some parallelization
// but avoiding out of order creation so components have some base
graph.Parallelize(payload.ByNumberAndComponent)
precreateObjects = true
}

// in specific modes, attempt to precreate a set of known types (currently ClusterOperator) without
// retries
if precreateObjects {
payload.RunGraph(ctx, graph, 8, func(ctx context.Context, tasks []*payload.Task) error {
for _, task := range tasks {
if err := ctx.Err(); err != nil {
return cr.ContextError(err)
}
if task.Manifest.GVK != configv1.SchemeGroupVersion.WithKind("ClusterOperator") {
continue
}
if err := w.builder.Apply(ctx, task.Manifest, payload.PrecreatingPayload); err != nil {
klog.V(2).Infof("Unable to precreate resource %s: %v", task, err)
continue
}
klog.V(4).Infof("Precreated resource %s", task)
}
return nil
})
}

// update each object
Expand All @@ -621,7 +645,7 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
klog.V(4).Infof("Running sync for %s", task)
klog.V(5).Infof("Manifest: %s", string(task.Manifest.Raw))

ov, ok := getOverrideForManifest(work.Overrides, w.exclude, task.Manifest)
ov, ok := getOverrideForManifest(work.Overrides, task.Manifest)
if ok && ov.Unmanaged {
klog.V(4).Infof("Skipping %s as unmanaged", task)
continue
Expand Down Expand Up @@ -916,7 +940,7 @@ func newMultipleError(errs []error) error {
}

// getOverrideForManifest returns the override and true when override exists for manifest.
func getOverrideForManifest(overrides []configv1.ComponentOverride, excludeIdentifier string, manifest *lib.Manifest) (configv1.ComponentOverride, bool) {
func getOverrideForManifest(overrides []configv1.ComponentOverride, manifest *lib.Manifest) (configv1.ComponentOverride, bool) {
for idx, ov := range overrides {
kind, namespace, name := manifest.GVK.Kind, manifest.Object().GetNamespace(), manifest.Object().GetName()
if ov.Kind == kind &&
Expand All @@ -925,10 +949,6 @@ func getOverrideForManifest(overrides []configv1.ComponentOverride, excludeIdent
return overrides[idx], true
}
}
excludeAnnotation := fmt.Sprintf("exclude.release.openshift.io/%s", excludeIdentifier)
if annotations := manifest.Object().GetAnnotations(); annotations != nil && annotations[excludeAnnotation] == "true" {
return configv1.ComponentOverride{Unmanaged: true}, true
}
return configv1.ComponentOverride{}, false
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Test
apiVersion: v1
metadata:
name: file-20-yml
annotations:
exclude.release.openshift.io/exclude-test: "true"
22 changes: 21 additions & 1 deletion pkg/payload/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ const (
// Our goal is to get the entire payload created, even if some
// operators are still converging.
InitializingPayload
// PrecreatingPayload indicates we are selectively creating
// specific resources during a first pass of the payload to
// provide better visibility during install and upgrade of
// error conditions.
PrecreatingPayload
)

// Initializing is true if the state is InitializingPayload.
Expand Down Expand Up @@ -102,7 +107,7 @@ type Update struct {
Manifests []lib.Manifest
}

func LoadUpdate(dir, releaseImage string) (*Update, error) {
func LoadUpdate(dir, releaseImage, excludeIdentifier string) (*Update, error) {
payload, tasks, err := loadUpdatePayloadMetadata(dir, releaseImage)
if err != nil {
return nil, err
Expand Down Expand Up @@ -149,6 +154,15 @@ func LoadUpdate(dir, releaseImage string) (*Update, error) {
errs = append(errs, errors.Wrapf(err, "error parsing %s", file.Name()))
continue
}
// Filter out manifests that should be excluded based on annotation
filteredMs := []lib.Manifest{}
for _, manifest := range ms {
if shouldExclude(excludeIdentifier, &manifest) {
continue
}
filteredMs = append(filteredMs, manifest)
}
ms = filteredMs
for i := range ms {
ms[i].OriginalFilename = filepath.Base(file.Name())
}
Expand All @@ -174,6 +188,12 @@ func LoadUpdate(dir, releaseImage string) (*Update, error) {
return payload, nil
}

func shouldExclude(excludeIdentifier string, manifest *lib.Manifest) bool {
excludeAnnotation := fmt.Sprintf("exclude.release.openshift.io/%s", excludeIdentifier)
annotations := manifest.Object().GetAnnotations()
return annotations != nil && annotations[excludeAnnotation] == "true"
}

// ValidateDirectory checks if a directory can be a candidate update by
// looking for known files. It returns an error if the directory cannot
// be an update.
Expand Down
2 changes: 1 addition & 1 deletion pkg/payload/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func Test_loadUpdatePayload(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := LoadUpdate(tt.args.dir, tt.args.releaseImage)
got, err := LoadUpdate(tt.args.dir, tt.args.releaseImage, "exclude-test")
if (err != nil) != tt.wantErr {
t.Errorf("loadUpdatePayload() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/payload/task_graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func Test_TaskGraph_real(t *testing.T) {
if len(path) == 0 {
t.Skip("TEST_GRAPH_PATH unset")
}
p, err := LoadUpdate(path, "arbitrary/image:1")
p, err := LoadUpdate(path, "arbitrary/image:1", "")
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 9515b23

Please sign in to comment.