diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index bca15c68d..2ab60fd4a 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -114,6 +114,7 @@ func (sc *samplesCollector) Collect(ch chan<- prometheus.Metric) { logrus.Infof("metrics sample config retrieval failed with: %s", err.Error()) return } + cfg = cfg.DeepCopy() if cfg.Spec.ManagementState == operatorv1.Removed || cfg.Spec.ManagementState == operatorv1.Unmanaged { diff --git a/pkg/stub/files.go b/pkg/stub/files.go index 00fcc0162..06169775e 100644 --- a/pkg/stub/files.go +++ b/pkg/stub/files.go @@ -16,7 +16,11 @@ import ( func (h *Handler) processFiles(dir string, files []os.FileInfo, opcfg *v1.Config) error { for _, file := range files { if file.IsDir() { - logrus.Printf("processing subdir %s from dir %s", file.Name(), dir) + if opcfg.Status.Version != h.version { + logrus.Printf("processing subdir %s from dir %s", file.Name(), dir) + } else { + logrus.Debugf("processing subdir %s from dir %s", file.Name(), dir) + } subfiles, err := h.Filefinder.List(dir + "/" + file.Name()) if err != nil { return h.processError(opcfg, v1.SamplesExist, corev1.ConditionUnknown, err, "error reading in content: %v") @@ -28,7 +32,11 @@ func (h *Handler) processFiles(dir string, files []os.FileInfo, opcfg *v1.Config continue } - logrus.Printf("processing file %s from dir %s", file.Name(), dir) + if opcfg.Status.Version != h.version { + logrus.Printf("processing file %s from dir %s", file.Name(), dir) + } else { + logrus.Debugf("processing file %s from dir %s", file.Name(), dir) + } if strings.HasSuffix(dir, "imagestreams") { path := dir + "/" + file.Name() diff --git a/pkg/stub/handler.go b/pkg/stub/handler.go index f0fecf0d9..dc555fe4f 100644 --- a/pkg/stub/handler.go +++ b/pkg/stub/handler.go @@ -161,6 +161,8 @@ func (h *Handler) prepSamplesWatchEvent(kind, name string, annotations map[strin return nil, "", false, false, err } + cfg = cfg.DeepCopy() + if cfg.DeletionTimestamp != nil { // we do no return the cfg in this case because we do not want to bother with any progress tracking logrus.Printf("Received watch event %s but not upserting since deletion of the Config is in progress", kind+"/"+name) @@ -195,7 +197,7 @@ func (h *Handler) prepSamplesWatchEvent(kind, name string, annotations map[strin case "imagestream": filePath, inInventory = h.imagestreamFile[name] if !inInventory { - logrus.Printf("watch stream event %s not part of operators inventory", name) + logrus.Debugf("watch stream event %s not part of operators inventory", name) // we now have cases where sample providers are deleting entire imagestreams; // let's make sure there are no stale entries with inprogress / importerror processing := util.Condition(cfg, v1.ImageChangesInProgress) @@ -253,7 +255,9 @@ func (h *Handler) prepSamplesWatchEvent(kind, name string, annotations map[strin // we have gotten events for items early in the migration list but we have not // finished processing the list // avoid (re)upsert, but check import status - logrus.Printf("watch event for %s/%s while migration in progress, image in progress is false; will not update sample because of this event", kind, name) + if util.ConditionTrue(cfg, v1.MigrationInProgress) { + logrus.Printf("watch event for %s/%s while migration in progress, image in progress is false; will not update sample because of this event", kind, name) + } return cfg, "", false, false, nil } @@ -633,6 +637,7 @@ func (h *Handler) Handle(event util.Event) error { if err != nil { return err } + cfg = h.refetchCfgMinimizeConflicts(cfg) h.GoodConditionUpdate(cfg, corev1.ConditionFalse, v1.SamplesExist) dbg := "exist false update" logrus.Printf("CRDUPDATE %s", dbg) @@ -643,6 +648,7 @@ func (h *Handler) Handle(event util.Event) error { } } else { logrus.Println("Initiating finalizer processing for a SampleResource delete attempt") + cfg = h.refetchCfgMinimizeConflicts(cfg) h.RemoveFinalizer(cfg) dbg := "remove finalizer update" logrus.Printf("CRDUPDATE %s", dbg) @@ -745,6 +751,7 @@ func (h *Handler) Handle(event util.Event) error { // once the status version is in sync, we can turn off the migration condition if util.ConditionTrue(cfg, v1.MigrationInProgress) { + cfg = h.refetchCfgMinimizeConflicts(cfg) h.GoodConditionUpdate(cfg, corev1.ConditionFalse, v1.MigrationInProgress) dbg := " turn migration off" logrus.Printf("CRDUPDATE %s", dbg) @@ -766,15 +773,26 @@ func (h *Handler) Handle(event util.Event) error { // in progress if configChangeRequiresUpsert && util.ConditionTrue(cfg, v1.ImageChangesInProgress) { + cfg = h.refetchCfgMinimizeConflicts(cfg) h.GoodConditionUpdate(cfg, corev1.ConditionFalse, v1.ImageChangesInProgress) dbg := "change in progress from true to false for config change" logrus.Printf("CRDUPDATE %s", dbg) + // do not transfer config changes to status since we are just turning off in progress + // to start a new createSamples cycle return h.crdwrapper.UpdateStatus(cfg, dbg) } } - cfg.Status.ManagementState = operatorsv1api.Managed + cfg = h.refetchCfgMinimizeConflicts(cfg) + if cfg.Status.ManagementState != operatorsv1api.Managed { + cfg.Status.ManagementState = operatorsv1api.Managed + dbg := "change status management state to managed" + logrus.Printf("CRDUPDATE %s", dbg) + return h.crdwrapper.UpdateStatus(cfg, dbg) + } + // if coming from remove turn off + cfg = h.refetchCfgMinimizeConflicts(cfg) if util.ConditionTrue(cfg, v1.RemovePending) { now := kapis.Now() condition := util.Condition(cfg, v1.RemovePending) @@ -782,6 +800,9 @@ func (h *Handler) Handle(event util.Event) error { condition.LastUpdateTime = now condition.Status = corev1.ConditionFalse util.ConditionUpdate(cfg, condition) + dbg := "change remove pending to false" + logrus.Printf("CRDUPDATE %s", dbg) + return h.crdwrapper.UpdateStatus(cfg, dbg) } cfg = h.refetchCfgMinimizeConflicts(cfg) @@ -809,30 +830,13 @@ func (h *Handler) Handle(event util.Event) error { dbg := "upd status version" logrus.Printf("CRDUPDATE %s", dbg) return h.crdwrapper.UpdateStatus(cfg, dbg) - /*if cfg.ConditionFalse(cfg, v1.ImportImageErrorsExist) { - cfg.Status.Version = h.version - logrus.Printf("The samples are now at version %s", cfg.Status.Version) - logrus.Println("CRDUPDATE upd status version") - return h.crdwrapper.UpdateStatus(cfg) - } - logrus.Printf("An image import error occurred applying the latest configuration on version %s, problem resolution needed", h.version - return nil - - */ - } - - if len(cfg.Spec.Architectures) == 0 { - cfg = h.updateCfgArch(cfg) } - h.StoreCurrentValidConfig(cfg) - - // now that we have stored the skip lists in status, // cycle through the skip lists and update the managed flag if needed - for _, name := range cfg.Status.SkippedTemplates { + for _, name := range cfg.Spec.SkippedTemplates { h.setSampleManagedLabelToFalse("template", name) } - for _, name := range cfg.Status.SkippedImagestreams { + for _, name := range cfg.Spec.SkippedImagestreams { h.setSampleManagedLabelToFalse("imagestream", name) } @@ -841,15 +845,22 @@ func (h *Handler) Handle(event util.Event) error { if configChangeRequiresImportErrorUpdate && !configChangeRequiresUpsert { dbg := "config change did not require upsert but did change import errors" logrus.Printf("CRDUPDATE %s", dbg) + cfg = h.refetchCfgMinimizeConflicts(cfg) + // we have now "processed" the config and are executing changes, transfer current spec to status + h.StoreCurrentValidConfig(cfg) return h.crdwrapper.UpdateStatus(cfg, dbg) } if configChanged && !configChangeRequiresUpsert && util.ConditionTrue(cfg, v1.SamplesExist) { dbg := "bypassing upserts for non invasive config change after initial create" logrus.Printf("CRDUPDATE %s", dbg) + cfg = h.refetchCfgMinimizeConflicts(cfg) + // we have now "processed" the config and are executing changes, transfer current spec to status + h.StoreCurrentValidConfig(cfg) return h.crdwrapper.UpdateStatus(cfg, dbg) } + cfg = h.refetchCfgMinimizeConflicts(cfg) if !util.ConditionTrue(cfg, v1.ImageChangesInProgress) { // pass in true to force rebuild of maps, which we do here because at this point // we have taken on some form of config change @@ -868,7 +879,7 @@ func (h *Handler) Handle(event util.Event) error { if abortForDelete { // a delete has been initiated; let's revert in progress to false and allow the delete to complete, // including its removal of any sample we might have upserted with the above createSamples call - h.GoodConditionUpdate(h.refetchCfgMinimizeConflicts(cfg), corev1.ConditionFalse, v1.ImageChangesInProgress) + h.GoodConditionUpdate(cfg, corev1.ConditionFalse, v1.ImageChangesInProgress) // note, the imagestream watch cache gets cleared once the deletion/finalizer processing commences dbg := "progressing false because delete has arrived" logrus.Printf("CRDUPDATE %s", dbg) @@ -876,7 +887,6 @@ func (h *Handler) Handle(event util.Event) error { } if err != nil { - cfg = h.refetchCfgMinimizeConflicts(cfg) h.processError(cfg, v1.SamplesExist, corev1.ConditionUnknown, err, "error creating samples: %v") dbg := "setting samples exists to unknown" logrus.Printf("CRDUPDATE %s", dbg) @@ -887,7 +897,6 @@ func (h *Handler) Handle(event util.Event) error { return err } now := kapis.Now() - cfg = h.refetchCfgMinimizeConflicts(cfg) progressing := util.Condition(cfg, v1.ImageChangesInProgress) progressing.Reason = "" progressing.Message = "" @@ -909,6 +918,8 @@ func (h *Handler) Handle(event util.Event) error { progressing.Status = corev1.ConditionTrue } else { logrus.Debugln("there are no imagestreams available for import") + // we have now "processed" the config and are executing changes, transfer current spec to status + h.StoreCurrentValidConfig(cfg) // see if there are unskipped templates, to set SamplesExists to true if !util.ConditionTrue(cfg, v1.SamplesExist) { templatesProcessed := false @@ -921,7 +932,6 @@ func (h *Handler) Handle(event util.Event) error { break } if templatesProcessed { - cfg = h.refetchCfgMinimizeConflicts(cfg) h.GoodConditionUpdate(cfg, corev1.ConditionTrue, v1.SamplesExist) dbg := "exist true update templates only" logrus.Printf("CRDUPDATE %s", dbg) @@ -938,13 +948,16 @@ func (h *Handler) Handle(event util.Event) error { // of the "make a change" flow in our state machine cfg = h.initConditions(cfg) + // we have now "processed" the config and are executing changes, transfer current spec to status + h.StoreCurrentValidConfig(cfg) + dbg := "progressing true update" logrus.Printf("CRDUPDATE %s", dbg) return h.crdwrapper.UpdateStatus(cfg, dbg) } + cfg = h.refetchCfgMinimizeConflicts(cfg) if !util.ConditionTrue(cfg, v1.SamplesExist) { - cfg = h.refetchCfgMinimizeConflicts(cfg) h.GoodConditionUpdate(cfg, corev1.ConditionTrue, v1.SamplesExist) dbg := "exist true update" logrus.Printf("CRDUPDATE %s", dbg) @@ -954,45 +967,23 @@ func (h *Handler) Handle(event util.Event) error { // it is possible that all the cached imagestream events show that // the image imports are complete; hence we would not get any more // events until the next relist to clear out in progress; so let's - // cycle through them here now - if cache.AllUpsertEventsArrived() && util.ConditionTrue(cfg, v1.ImageChangesInProgress) { - keysToClear := []string{} - anyChange := false - ac := false + // clear it out now + if cache.AllUpsertEventsArrived() && + util.ConditionTrue(cfg, v1.ImageChangesInProgress) && + cfg.Spec.ManagementState == cfg.Status.ManagementState && + cfg.Status.Version == h.version { cfg = h.refetchCfgMinimizeConflicts(cfg) - for key, is := range cache.GetUpsertImageStreams() { - if is == nil { - // never got update, refetch - var e error - is, e = h.imageclientwrapper.Get(key) - if e != nil { - keysToClear = append(keysToClear, key) - anyChange = true - continue - } - } - cfg, _, ac = h.processImportStatus(is, cfg) - anyChange = anyChange || ac - } - for _, key := range keysToClear { - cache.RemoveUpsert(key) - util.ClearNameInReason(cfg, util.Condition(cfg, v1.ImageChangesInProgress).Reason, key) - util.ClearNameInReason(cfg, util.Condition(cfg, v1.ImportImageErrorsExist).Reason, key) - } - if len(strings.TrimSpace(util.Condition(cfg, v1.ImageChangesInProgress).Reason)) == 0 { - h.GoodConditionUpdate(cfg, corev1.ConditionFalse, v1.ImageChangesInProgress) - logrus.Println("The last in progress imagestream has completed (config event loop)") - } - if anyChange { - dbg := "updating in progress after examining cached imagestream events" - logrus.Printf("CRDUPDATE %s", dbg) - err = h.crdwrapper.UpdateStatus(cfg, dbg) - if err == nil && util.ConditionFalse(cfg, v1.ImageChangesInProgress) { - // only clear out cache if we got the update through - cache.ClearUpsertsCache() - } - return err + h.GoodConditionUpdate(cfg, corev1.ConditionFalse, v1.ImageChangesInProgress) + dbg := "updating in progress after examining cached imagestream events" + logrus.Printf("CRDUPDATE %s", dbg) + + err = h.crdwrapper.UpdateStatus(cfg, dbg) + if err == nil { + // only clear out cache if we got the update through + cache.ClearUpsertsCache() } + return err + } } return nil @@ -1006,6 +997,7 @@ func (h *Handler) setSampleManagedLabelToFalse(kind, name string) error { err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { stream, err = h.imageclientwrapper.Get(name) if err == nil && stream != nil && stream.Labels != nil { + stream = stream.DeepCopy() label, _ := stream.Labels[v1.SamplesManagedLabel] if label == "true" { stream.Labels[v1.SamplesManagedLabel] = "false" @@ -1019,6 +1011,7 @@ func (h *Handler) setSampleManagedLabelToFalse(kind, name string) error { err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { tpl, err = h.templateclientwrapper.Get(name) if err == nil && tpl != nil && tpl.Labels != nil { + tpl = tpl.DeepCopy() label, _ := tpl.Labels[v1.SamplesManagedLabel] if label == "true" { tpl.Labels[v1.SamplesManagedLabel] = "false" @@ -1090,6 +1083,9 @@ func (h *Handler) createSamples(cfg *v1.Config, updateIfPresent, registryChanged is = nil } + if is != nil { + is = is.DeepCopy() + } err = h.upsertImageStream(imagestream, is, cfg) if err != nil { if updateIfPresent { @@ -1135,6 +1131,7 @@ func (h *Handler) createSamples(cfg *v1.Config, updateIfPresent, registryChanged t = nil } + t = t.DeepCopy() err = h.upsertTemplate(template, t, cfg) if err != nil { return false, err @@ -1149,9 +1146,9 @@ func (h *Handler) createSamples(cfg *v1.Config, updateIfPresent, registryChanged func (h *Handler) refetchCfgMinimizeConflicts(cfg *v1.Config) *v1.Config { c, e := h.crdwrapper.Get(cfg.Name) if e == nil { - return c + return c.DeepCopy() } - return cfg + return cfg.DeepCopy() } func getTemplateClient(restconfig *restclient.Config) (*templatev1client.TemplateV1Client, error) { diff --git a/pkg/stub/handler_test.go b/pkg/stub/handler_test.go index 129c2c25a..7d0a1e826 100644 --- a/pkg/stub/handler_test.go +++ b/pkg/stub/handler_test.go @@ -47,16 +47,19 @@ func TestWrongSampleResourceName(t *testing.T) { cfg.Name = "foo" cfg.Status.Conditions = nil err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) validate(true, err, "", cfg, []v1.ConfigConditionType{}, []corev1.ConditionStatus{}, t) } func TestNoArchOrDist(t *testing.T) { h, cfg, event := setup() err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) // image in progress (4th entry, array index 3) should still be false when there is no content ... a la z or ppc statuses := []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} validate(true, err, "", cfg, conditions, statuses, t) err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) // and on a subsequent event, exists should still be false since we did not create any content previously validate(true, err, "", cfg, conditions, statuses, t) } @@ -64,14 +67,17 @@ func TestNoArchOrDist(t *testing.T) { func TestWithDist(t *testing.T) { h, cfg, event := setup() err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) statuses := []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} validate(true, err, "", cfg, conditions, statuses, t) err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) // image in progress (4th entry, array index 3) should still be false when there is no content ... a la z or ppc statuses = []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} validate(true, err, "", cfg, conditions, statuses, t) mimic(&h, x86ContentRootDir) err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) // with content present, image im progress should now be true statuses = []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} validate(true, err, "", cfg, conditions, statuses, t) @@ -92,11 +98,13 @@ func TestWithArchDist(t *testing.T) { mimic(&h, x86ContentRootDir) err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) statuses := []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} validate(true, err, "", cfg, conditions, statuses, t) err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) statuses = []corev1.ConditionStatus{corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} validate(true, err, "", cfg, conditions, @@ -120,6 +128,7 @@ func TestWithBadArch(t *testing.T) { "bad", } h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) invalidConfig(t, "architecture bad unsupported", util.Condition(cfg, v1.ConfigurationValid)) } @@ -131,6 +140,7 @@ func TestManagementState(t *testing.T) { cfg.Spec.ManagementState = operatorsv1api.Unmanaged err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) statuses := []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} validate(true, err, "", cfg, conditions, statuses, t) @@ -149,17 +159,27 @@ func TestManagementState(t *testing.T) { cfg.ResourceVersion = "2" cfg.Spec.ManagementState = operatorsv1api.Managed + // get status to managed err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) + if cfg.Status.ManagementState != operatorsv1api.Managed { + t.Fatalf("status not set to managed") + } + // should go to in progress + err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) statuses = []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} validate(true, err, "", cfg, conditions, statuses, t) err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) // event after in progress set to true, sets exists to true statuses = []corev1.ConditionStatus{corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} validate(true, err, "", cfg, conditions, statuses, t) // event after exists is true that should trigger samples upsert err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) // event after in progress set to true statuses = []corev1.ConditionStatus{corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} validate(true, err, "", cfg, conditions, statuses, t) @@ -179,6 +199,7 @@ func TestManagementState(t *testing.T) { cfg.ResourceVersion = "3" cfg.Spec.ManagementState = operatorsv1api.Removed err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) // RemovePending now true statuses[4] = corev1.ConditionTrue validate(true, err, "", cfg, conditions, statuses, t) @@ -189,6 +210,7 @@ func TestManagementState(t *testing.T) { // verify while we are image in progress no false and the remove on hold setting is still set to true cfg.ResourceVersion = "4" err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) // SamplesExists, ImageChangesInProgress, ImportImageErrorsExists all false statuses[0] = corev1.ConditionFalse statuses[3] = corev1.ConditionFalse @@ -203,6 +225,7 @@ func TestManagementState(t *testing.T) { // remove pending should be false statuses[4] = corev1.ConditionFalse err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) validate(true, err, "", cfg, conditions, statuses, t) } @@ -219,6 +242,7 @@ func TestSkipped(t *testing.T) { mimic(&h, x86ContentRootDir) err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) validate(true, err, "", cfg, conditions, []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse}, t) fakeisclient := h.imageclientwrapper.(*fakeImageStreamClientWrapper) @@ -264,6 +288,7 @@ func TestSkipped(t *testing.T) { }, } h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) importerror = util.Condition(cfg, v1.ImportImageErrorsExist) if len(importerror.Reason) > 0 || importerror.Status == corev1.ConditionTrue { t.Fatalf("skipped imagestream still reporting error %#v", importerror) @@ -294,22 +319,37 @@ func TestTBRInaccessibleBit(t *testing.T) { func TestProcessed(t *testing.T) { h, cfg, event := setup() event.Object = cfg + mimic(&h, x86ContentRootDir) + + // initial boostrap creation of samples + h.Handle(event) + + // shortcut completed initial bootstrap, and then change the samples registry setting cfg.Spec.SamplesRegistry = "foo.io" + progress := util.Condition(cfg, v1.ImageChangesInProgress) + progress.Status = corev1.ConditionFalse + util.ConditionUpdate(cfg, progress) + exists := util.Condition(cfg, v1.SamplesExist) + exists.Status = corev1.ConditionTrue + util.ConditionUpdate(cfg, exists) + h.crdwrapper.Update(cfg) iskeys := getISKeys() tkeys := getTKeys() - mimic(&h, x86ContentRootDir) - err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) validate(true, err, "", cfg, conditions, - []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse}, t) + []corev1.ConditionStatus{corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse}, t) err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) + // in progress should still be true because imagestream upserts have not cleared validate(true, err, "", cfg, conditions, []corev1.ConditionStatus{corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse}, t) err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) validate(true, err, "", cfg, conditions, []corev1.ConditionStatus{corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse}, t) @@ -346,15 +386,32 @@ func TestProcessed(t *testing.T) { } } + // get status samples registry set to foo.io + err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) + if cfg.Status.SamplesRegistry != "foo.io" { + t.Fatalf("status did not pick up new samples registry") + } + // make sure registries are updated after already updating from the defaults + cfg, _ = h.crdwrapper.Get(cfg.Name) cfg.Spec.SamplesRegistry = "bar.io" cfg.ResourceVersion = "2" // fake out that the samples completed updating progressing := util.Condition(cfg, v1.ImageChangesInProgress) progressing.Status = corev1.ConditionFalse + // lack of copy fix previously masked that complete updating means version updated as well + cfg.Status.Version = h.version util.ConditionUpdate(cfg, progressing) + h.crdwrapper.Update(cfg) err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) + if cfg.Status.SamplesRegistry != "bar.io" { + t.Fatalf("second update to status registry not in status") + } + err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) validate(true, err, "", cfg, conditions, []corev1.ConditionStatus{corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse}, t) @@ -372,16 +429,24 @@ func TestProcessed(t *testing.T) { } // make sure registries are updated when sampleRegistry is the form of host/path + cfg, _ = h.crdwrapper.Get(cfg.Name) cfg.Spec.SamplesRegistry = "foo.io/bar" cfg.ResourceVersion = "3" // fake out that the samples completed updating progressing = util.Condition(cfg, v1.ImageChangesInProgress) progressing.Status = corev1.ConditionFalse util.ConditionUpdate(cfg, progressing) + h.crdwrapper.Update(cfg) // reset operator image to clear out previous registry image override mimic(&h, x86ContentRootDir) err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) + if cfg.Status.SamplesRegistry != "foo.io/bar" { + t.Fatalf("third update to samples registry not in status") + } + err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) validate(true, err, "", cfg, conditions, []corev1.ConditionStatus{corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse}, t) @@ -399,16 +464,24 @@ func TestProcessed(t *testing.T) { } // make sure registries are updated when sampleRegistry is the form of host:port/path + cfg, _ = h.crdwrapper.Get(cfg.Name) cfg.Spec.SamplesRegistry = "foo.io:1111/bar" cfg.ResourceVersion = "4" // fake out that the samples completed updating progressing = util.Condition(cfg, v1.ImageChangesInProgress) progressing.Status = corev1.ConditionFalse util.ConditionUpdate(cfg, progressing) + h.crdwrapper.Update(cfg) // reset operator image to clear out previous registry image override mimic(&h, x86ContentRootDir) err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) + if cfg.Status.SamplesRegistry != "foo.io:1111/bar" { + t.Fatalf("fourth update to samples registry no in status") + } + err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) validate(true, err, "", cfg, conditions, []corev1.ConditionStatus{corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse}, t) @@ -425,6 +498,7 @@ func TestProcessed(t *testing.T) { t.Fatalf("stream repo not updated %#v, %#v", is, h) } // make sure registries are updated when switch back to default + cfg, _ = h.crdwrapper.Get(cfg.Name) cfg.ResourceVersion = "5" cfg.Spec.SamplesRegistry = "" // also make sure processing occurs for disruptive config change even if progressing==true @@ -432,10 +506,17 @@ func TestProcessed(t *testing.T) { // fake out that the samples completed updating progressing.Status = corev1.ConditionFalse util.ConditionUpdate(cfg, progressing) + h.crdwrapper.Update(cfg) // reset operator image to clear out previous registry image override mimic(&h, x86ContentRootDir) err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) + if cfg.Status.SamplesRegistry != "" { + t.Fatalf("fifth update to samples registry not in status") + } + err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) is, _ = fakeisclient.Get("foo") if is == nil || strings.HasPrefix(is.Spec.DockerImageRepository, "bar.io") { t.Fatalf("foo stream repo still has bar.io") @@ -454,10 +535,11 @@ func TestImageStreamEvent(t *testing.T) { h, cfg, event := setup() mimic(&h, x86ContentRootDir) err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) statuses := []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} validate(true, err, "", cfg, conditions, statuses, t) // expedite the stream events coming in - cache.ClearUpsertsCache() + //cache.ClearUpsertsCache() tagVersion := int64(1) is := &imagev1.ImageStream{ @@ -493,6 +575,7 @@ func TestImageStreamEvent(t *testing.T) { cfg.Status.SkippedImagestreams = []string{} cfg.Status.SkippedTemplates = []string{} h.processImageStreamWatchEvent(is, false) + cfg, _ = h.crdwrapper.Get(cfg.Name) validate(true, err, "", cfg, conditions, statuses, t) // now make sure when a non standard change event is not ignored and that we update @@ -507,6 +590,7 @@ func TestImageStreamEvent(t *testing.T) { // mimic img change condition event that sets exists to true err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) statuses[0] = corev1.ConditionTrue validate(true, err, "", cfg, conditions, statuses, t) @@ -514,6 +598,7 @@ func TestImageStreamEvent(t *testing.T) { // go false is.Annotations[v1.SamplesVersionAnnotation] = h.version h.processImageStreamWatchEvent(is, false) + cfg, _ = h.crdwrapper.Get(cfg.Name) validate(true, err, "", cfg, conditions, statuses, t) statuses[3] = corev1.ConditionFalse is = &imagev1.ImageStream{ @@ -545,6 +630,7 @@ func TestImageStreamEvent(t *testing.T) { }, } h.processImageStreamWatchEvent(is, false) + cfg, _ = h.crdwrapper.Get(cfg.Name) is = &imagev1.ImageStream{ ObjectMeta: metav1.ObjectMeta{ Name: "baz", @@ -574,6 +660,7 @@ func TestImageStreamEvent(t *testing.T) { }, } h.processImageStreamWatchEvent(is, false) + cfg, _ = h.crdwrapper.Get(cfg.Name) validate(true, err, "", cfg, conditions, statuses, t) } @@ -581,6 +668,7 @@ func TestImageStreamErrorRetry(t *testing.T) { h, cfg, event := setup() mimic(&h, x86ContentRootDir) err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) statuses := []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} validate(true, err, "", cfg, conditions, statuses, t) // expedite the stream events coming in @@ -639,6 +727,7 @@ func TestImageStreamErrorRetry(t *testing.T) { } h.processImageStreamWatchEvent(is, false) + cfg, _ = h.crdwrapper.Get(cfg.Name) if !util.ConditionTrue(cfg, v1.ImportImageErrorsExist) { t.Fatalf("Import Error Condition not true: %#v", cfg) @@ -653,6 +742,7 @@ func TestImageStreamErrorRetry(t *testing.T) { initialImportErrorLastUpdateTime := util.Condition(cfg, v1.ImportImageErrorsExist).LastUpdateTime h.processImageStreamWatchEvent(is, false) + cfg, _ = h.crdwrapper.Get(cfg.Name) // refetch to see if updated importError := util.Condition(cfg, v1.ImportImageErrorsExist) if !importError.LastUpdateTime.Equal(&initialImportErrorLastUpdateTime) { @@ -668,6 +758,7 @@ func TestImageStreamErrorRetry(t *testing.T) { fifteenMinutesAgo := lastUpdateTime h.processImageStreamWatchEvent(is, false) + cfg, _ = h.crdwrapper.Get(cfg.Name) // refetch and make sure it has changed lastUpdateTime, _ = h.imagestreamRetry[is.Name] @@ -686,6 +777,7 @@ func TestImageStreamErrorRetry(t *testing.T) { } h.processImageStreamWatchEvent(is, false) + cfg, _ = h.crdwrapper.Get(cfg.Name) if util.ConditionTrue(cfg, v1.ImportImageErrorsExist) { t.Fatalf("Import Error Condition not true: %#v", cfg) @@ -696,9 +788,11 @@ func TestTemplateEvent(t *testing.T) { h, cfg, event := setup() mimic(&h, x86ContentRootDir) err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) statuses := []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} validate(true, err, "", cfg, conditions, statuses, t) err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) statuses[0] = corev1.ConditionTrue validate(true, err, "", cfg, conditions, statuses, t) // expedite the template events coming in @@ -723,6 +817,8 @@ func setup() (Handler, *v1.Config, util.Event) { h := NewTestHandler() cfg, _ := h.CreateDefaultResourceIfNeeded(nil) cfg = h.initConditions(cfg) + cfg.Status.ManagementState = operatorsv1api.Managed + h.StoreCurrentValidConfig(cfg) h.crdwrapper.(*fakeCRDWrapper).cfg = cfg cache.ClearUpsertsCache() return h, cfg, util.Event{Object: cfg} @@ -786,6 +882,7 @@ func TestImageStreamRemovedFromPayloadWithProgressingErrors(t *testing.T) { }, } err := h.processImageStreamWatchEvent(is, false) + cfg, _ = h.crdwrapper.Get(cfg.Name) if err != nil { t.Fatal(err) } @@ -795,6 +892,7 @@ func TestImageStreamRemovedFromPayloadWithProgressingErrors(t *testing.T) { } is.Name = "bar" err = h.processImageStreamWatchEvent(is, false) + cfg, _ = h.crdwrapper.Get(cfg.Name) errors = util.Condition(cfg, v1.ImportImageErrorsExist) if strings.Contains(errors.Reason, "bar") { t.Fatal("import errors still has bar after it was no longer in payload") @@ -809,6 +907,7 @@ func TestImageStreamCreateErrorDegradedReason(t *testing.T) { fakeisclient := h.imageclientwrapper.(*fakeImageStreamClientWrapper) fakeisclient.geterrors = map[string]error{"foo": err} h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) _, reason := util.AnyConditionUnknown(cfg) if reason != "APIServerServiceUnavailableError" { t.Fatalf("unexpected reason %s", reason) @@ -831,6 +930,7 @@ func TestImageGetError(t *testing.T) { statuses := []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) if !kerrors.IsNotFound(iserr) { statuses[0] = corev1.ConditionUnknown statuses[3] = corev1.ConditionFalse @@ -853,6 +953,7 @@ func TestImageUpdateError(t *testing.T) { statuses := []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) statuses[0] = corev1.ConditionUnknown statuses[3] = corev1.ConditionFalse validate(false, err, "upsertstreamerror", cfg, conditions, statuses, t) @@ -1022,6 +1123,7 @@ func TestImageStreamImportError(t *testing.T) { needCreds.Status = corev1.ConditionTrue util.ConditionUpdate(cfg, needCreds) err := h.processImageStreamWatchEvent(is, false) + cfg, _ = h.crdwrapper.Get(cfg.Name) if err != nil { t.Fatalf("processImageStreamWatchEvent error %#v for stream %#v", err, is) } @@ -1090,6 +1192,7 @@ func TestImageStreamImportErrorRecovery(t *testing.T) { importError.Message = " import failed " util.ConditionUpdate(cfg, importError) err := h.processImageStreamWatchEvent(stream, false) + cfg, _ = h.crdwrapper.Get(cfg.Name) if err != nil { t.Fatalf("processImageStreamWatchEvent error %#v", err) } @@ -1160,6 +1263,7 @@ func TestTemplateGetError(t *testing.T) { statuses := []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) if !kerrors.IsNotFound(terr) { statuses[0] = corev1.ConditionUnknown statuses[3] = corev1.ConditionFalse @@ -1181,6 +1285,7 @@ func TestTemplateUpsertError(t *testing.T) { statuses := []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) statuses[0] = corev1.ConditionUnknown statuses[3] = corev1.ConditionFalse validate(false, err, "upsertstreamerror", cfg, conditions, statuses, t) @@ -1190,6 +1295,7 @@ func TestDeletedCR(t *testing.T) { h, cfg, event := setup() event.Deleted = true err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) statuses := []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} validate(true, err, "", cfg, conditions, statuses, t) } @@ -1202,14 +1308,17 @@ func TestSameCR(t *testing.T) { // first pass on the resource creates the samples, exists (first entry, index 0) is still false statuses := []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) validate(true, err, "", cfg, conditions, statuses, t) err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) // next pass is when we expect exists to be true statuses[0] = corev1.ConditionTrue validate(true, err, "", cfg, conditions, statuses, t) err = h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) // now with content, should see no change in status after duplicate event, where imagestream import status has not changed validate(true, err, "", cfg, conditions, statuses, t) @@ -1220,6 +1329,7 @@ func TestBadTopDirList(t *testing.T) { fakefinder := h.Filefinder.(*fakeResourceFileLister) fakefinder.errors = map[string]error{x86ContentRootDir: fmt.Errorf("badtopdir")} err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) statuses := []corev1.ConditionStatus{corev1.ConditionUnknown, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} validate(false, err, "badtopdir", cfg, conditions, statuses, t) } @@ -1230,6 +1340,7 @@ func TestBadSubDirList(t *testing.T) { fakefinder := h.Filefinder.(*fakeResourceFileLister) fakefinder.errors = map[string]error{x86ContentRootDir + "/imagestreams": fmt.Errorf("badsubdir")} err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) statuses := []corev1.ConditionStatus{corev1.ConditionUnknown, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} validate(false, err, "badsubdir", cfg, conditions, statuses, t) _, reason := util.AnyConditionUnknown(cfg) @@ -1244,6 +1355,7 @@ func TestBadTopLevelStatus(t *testing.T) { fakestatus := h.crdwrapper.(*fakeCRDWrapper) fakestatus.updateerr = fmt.Errorf("badsdkupdate") err := h.Handle(event) + cfg, _ = h.crdwrapper.Get(cfg.Name) // with deferring sdk updates to the very end, the local object will still have valid statuses on it, even though the error // error returned by h.Handle indicates etcd was not updated statuses := []corev1.ConditionStatus{corev1.ConditionFalse, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionTrue, corev1.ConditionFalse, corev1.ConditionFalse, corev1.ConditionFalse} @@ -1706,9 +1818,15 @@ type fakeCRDWrapper struct { cfg *v1.Config } -func (f *fakeCRDWrapper) UpdateStatus(opcfg *v1.Config, dbg string) error { return f.updateerr } +func (f *fakeCRDWrapper) UpdateStatus(opcfg *v1.Config, dbg string) error { + f.cfg = opcfg + return f.updateerr +} -func (f *fakeCRDWrapper) Update(opcfg *v1.Config) error { return f.updateerr } +func (f *fakeCRDWrapper) Update(opcfg *v1.Config) error { + f.cfg = opcfg + return f.updateerr +} func (f *fakeCRDWrapper) Create(opcfg *v1.Config) error { return f.createerr } diff --git a/pkg/stub/imagestreams.go b/pkg/stub/imagestreams.go index 1d2159c80..d673cf665 100644 --- a/pkg/stub/imagestreams.go +++ b/pkg/stub/imagestreams.go @@ -27,11 +27,7 @@ func (h *Handler) processImageStreamWatchEvent(is *imagev1.ImageStream, deleted logrus.Printf("CRDUPDATE %s", dbg) return h.crdwrapper.UpdateStatus(cfg, dbg) } - if cfg != nil && util.ConditionTrue(cfg, v1.ImageChangesInProgress) { - logrus.Printf("Imagestream %s watch event do upsert %v; no errors in prep %v, possibly update operator conditions %v", is.Name, doUpsert, err == nil, cfg != nil) - } else { - logrus.Debugf("Imagestream %s watch event do upsert %v; no errors in prep %v, possibly update operator conditions %v", is.Name, doUpsert, err == nil, cfg != nil) - } + logrus.Debugf("Imagestream %s watch event do upsert %v; no errors in prep %v, possibly update operator conditions %v", is.Name, doUpsert, err == nil, cfg != nil) if cfg != nil { if util.IsUnsupportedArch(cfg) { logrus.Printf("ignoring watch event for imagestream %s ignored because we are on %s", @@ -46,11 +42,12 @@ func (h *Handler) processImageStreamWatchEvent(is *imagev1.ImageStream, deleted if cfg == nil { return nil } + cfg = h.refetchCfgMinimizeConflicts(cfg) // if we initiated the change, it will be in our cache, and we want to process // the in progress, import error conditions anyChange := false - nonMatchDetail := "" + //nonMatchDetail := "" if cache.UpsertsAmount() > 0 { cache.AddReceivedEventFromUpsert(is) if !util.ConditionTrue(cfg, v1.ImageChangesInProgress) || !util.ConditionTrue(cfg, v1.SamplesExist) { @@ -61,77 +58,41 @@ func (h *Handler) processImageStreamWatchEvent(is *imagev1.ImageStream, deleted logrus.Printf("caching imagestream event %s because we have not received all %d imagestream events", is.Name, cache.UpsertsAmount()) return nil } - streams := cache.GetUpsertImageStreams() - keysToClear := []string{} - cfg = h.refetchCfgMinimizeConflicts(cfg) - for key, is := range streams { - if is == nil { - // never got update, refetch - var e error - is, e = h.imageclientwrapper.Get(key) - if e != nil { - keysToClear = append(keysToClear, key) - anyChange = true - continue - } - } - - if !util.NameInReason(cfg, util.Condition(cfg, v1.ImageChangesInProgress).Reason, is.Name) && - !util.NameInReason(cfg, util.Condition(cfg, v1.ImportImageErrorsExist).Reason, is.Name) { - logrus.Printf("skipping progress check (caching phase) for %s because it has been removed from the image progress and import error conditions", is.Name) - continue - } + cache.ClearUpsertsCache() + } - // see if we should turn off pending, clear errors - cfg, nonMatchDetail, anyChange = h.processImportStatus(is, cfg) - if !anyChange && len(nonMatchDetail) > 0 { - logrus.Printf("imagestream %s still not finished with its image imports (caching phase), including %s", is.Name, nonMatchDetail) - // cached instances are processed at the end of the handler Config event flow - return nil - } - } - for _, key := range keysToClear { - cache.RemoveUpsert(key) - util.ClearNameInReason(cfg, util.Condition(cfg, v1.ImageChangesInProgress).Reason, key) - util.ClearNameInReason(cfg, util.Condition(cfg, v1.ImportImageErrorsExist).Reason, key) + _, skipped := h.skippedImagestreams[is.Name] + writeStatus := false + dbg := "" + cfg = h.refetchCfgMinimizeConflicts(cfg) + if util.ConditionTrue(cfg, v1.ImageChangesInProgress) && cache.AllUpsertEventsArrived() { + dbg = "clearing in progress with every imagestream reporting in" + writeStatus = true + h.GoodConditionUpdate(cfg, corev1.ConditionFalse, v1.ImageChangesInProgress) + } + if skipped && util.NameInReason(cfg, util.Condition(cfg, v1.ImportImageErrorsExist).Reason, is.Name) { + writeStatus = true + if len(dbg) > 0 { + dbg = dbg + "; " } - // at this point, all the processImportStatus calls have come back with anyChange == true - // unless we were unable to fetch any of the imagestreams - // and if necessary processImportStatus would have set in progress to false as needed; + dbg = fmt.Sprintf("%sclear %s from import errors", dbg, is.Name) + h.clearStreamFromImportError(is.Name, util.Condition(cfg, v1.ImportImageErrorsExist), cfg) + } + if !skipped { + cfg, anyChange = h.processImportStatus(is, cfg) if anyChange { - dbg := fmt.Sprintf("updating progress/error condition (within caching block) after results for %s", is.Name) - logrus.Printf("CRDUPDATE %s", dbg) - err = h.crdwrapper.UpdateStatus(cfg, dbg) - // we used to not clear the cache until we confirmed the update occurred; but for - // whatever reason, we started seeing more conflicts here on the k8s 1.13 rebase; - // and it turns out, after the `cache.UpsertsAmount() > 0` check above it is tricky - // to discern a retry because of an err here vs. our not having finished upserts yet; - // so since we have decided the condition, most likely we are done; so clear the cache - // and fall into our steady state process imagestream event individually - cache.ClearUpsertsCache() + writeStatus = true + if len(dbg) > 0 { + dbg = dbg + "; " + } + dbg = fmt.Sprintf("%supdate import errors for %s", dbg, is.Name) } - return err } - - cfg = h.refetchCfgMinimizeConflicts(cfg) - if !util.NameInReason(cfg, util.Condition(cfg, v1.ImageChangesInProgress).Reason, is.Name) && - !util.NameInReason(cfg, util.Condition(cfg, v1.ImportImageErrorsExist).Reason, is.Name) { - logrus.Printf("aborting progress check (post-cache) for %s because it has been removed from the image progress and import error conditions", is.Name) - } - // otherwise, if someone else changed it in such a way that we don't want to - // upsert it again, or we missed an event and this is a relist, or we had an update conflict - // after completing cache process, update conditions as needed - // as needed - cfg, nonMatchDetail, anyChange = h.processImportStatus(is, cfg) - if anyChange { - dbg := fmt.Sprintf("updating progress/error condition after results for %s", is.Name) + if writeStatus { logrus.Printf("CRDUPDATE %s", dbg) return h.crdwrapper.UpdateStatus(cfg, dbg) - } else { - if len(nonMatchDetail) > 0 { - logrus.Printf("imagestream %s still not finished with its image imports, including %s", is.Name, nonMatchDetail) - } } + return nil } @@ -162,6 +123,9 @@ func (h *Handler) processImageStreamWatchEvent(is *imagev1.ImageStream, deleted cache.AddUpsert(imagestream.Name) + if is != nil { + is = is.DeepCopy() + } err = h.upsertImageStream(imagestream, is, cfg) if err != nil { cache.RemoveUpsert(imagestream.Name) @@ -384,11 +348,9 @@ func (h *Handler) buildImageStreamErrorMessage(name, message string) string { return "" + message + "" } -func (h *Handler) processImportStatus(is *imagev1.ImageStream, cfg *v1.Config) (*v1.Config, string, bool) { - //pending := false +func (h *Handler) processImportStatus(is *imagev1.ImageStream, cfg *v1.Config) (*v1.Config, bool) { anyErrors := false importError := util.Condition(cfg, v1.ImportImageErrorsExist) - nonMatchDetail := "" anyConditionUpdate := false // in case we have to manipulate imagestream retry map h.mapsMutex.Lock() @@ -486,95 +448,12 @@ func (h *Handler) processImportStatus(is *imagev1.ImageStream, cfg *v1.Config) ( } } } - - /*if !anyErrors { - // it is possible after an upgrade that tags can be removed because of EOL processing; - // since we do not delete those EOL images from the imagestream status (so as to not break - // existing deployments referencing specific tags), it is possible that a valid configuration - // has less tags in the spec than the status (vs. an equal amount being the only valid combination) - if len(is.Spec.Tags) <= len(is.Status.Tags) { - for _, specTag := range is.Spec.Tags { - matched := false - foundTagInStatus := false - foundItemsInStatusTag := false - for _, statusTag := range is.Status.Tags { - logrus.Debugf("checking spec tag %s against status tag %s with num items %d", specTag.Name, statusTag.Tag, len(statusTag.Items)) - if specTag.Name == statusTag.Tag { - foundTagInStatus = true - // if the latest gens have no errors, see if we got gen match - if statusTag.Items != nil { - for _, event := range statusTag.Items { - foundItemsInStatusTag = true - if specTag.Generation != nil { - logrus.Debugf("checking status tag %d against spec tag %d", event.Generation, *specTag.Generation) - } - if specTag.Generation != nil && - *specTag.Generation <= event.Generation { - logrus.Debugf("got match") - matched = true - break - } - nonMatchDetail = fmt.Sprintf("spec tag %s is at generation %d, but status tag %s is at generation %d", specTag.Name, *specTag.Generation, statusTag.Tag, event.Generation) - } - } - } - } - if !matched { - pending = true - if len(nonMatchDetail) == 0 { - switch { - case foundTagInStatus == false: - nonMatchDetail = fmt.Sprintf("spec tag %s did not have a matching status tag", specTag.Name) - case foundItemsInStatusTag == false: - nonMatchDetail = fmt.Sprintf("status tag %s had no items in it", specTag.Name) - default: - nonMatchDetail = fmt.Sprintf("match failed for some unknown reason; spec: %#v status: %#v", is.Spec.Tags, is.Status.Tags) - } - } - break - } - } - if len(is.Spec.Tags) == 0 { - nonMatchDetail = fmt.Sprintf("there were no spec tags") - } - } else { - pending = true - nonMatchDetail = "the number of status tags is less than the number of spec tags" - } - }*/ - } else { - logrus.Debugf("no error/progress checks cause stream %s is skipped", is.Name) - // but if skipped, clear out any errors, since we do not care about errors for skipped - h.clearStreamFromImportError(is.Name, importError, cfg) - anyConditionUpdate = true } - processing := util.Condition(cfg, v1.ImageChangesInProgress) - logrus.Debugf("any errors %v for %s", anyErrors, is.Name) - // we check for processing == true here as well to avoid churn on relists logrus.Printf("There are no more image imports in flight for imagestream %s", is.Name) - if processing.Status == corev1.ConditionTrue { - now := kapis.Now() - // remove this imagestream name, including the space separator, from processing - logrus.Debugf("current reason %s ", processing.Reason) - replaceOccurs := util.NameInReason(cfg, processing.Reason, is.Name) - if replaceOccurs { - processing.Reason = util.ClearNameInReason(cfg, processing.Reason, is.Name) - logrus.Debugf("processing reason now %s", processing.Reason) - if len(strings.TrimSpace(processing.Reason)) == 0 { - logrus.Println("The last in progress imagestream has completed (import status check)") - processing.Status = corev1.ConditionFalse - processing.Reason = "" - processing.LastTransitionTime = now - } - processing.LastUpdateTime = now - util.ConditionUpdate(cfg, processing) - anyConditionUpdate = true - } - } - //} + cache.RemoveUpsert(is.Name) // clear out error for this stream if there were errors previously but no longer are // think a scheduled import failing then recovering @@ -584,5 +463,5 @@ func (h *Handler) processImportStatus(is *imagev1.ImageStream, cfg *v1.Config) ( anyConditionUpdate = true } - return cfg, nonMatchDetail, anyConditionUpdate + return cfg, anyConditionUpdate } diff --git a/pkg/stub/interfaces.go b/pkg/stub/interfaces.go index 8f84d3fde..fa0b4fa4d 100644 --- a/pkg/stub/interfaces.go +++ b/pkg/stub/interfaces.go @@ -219,8 +219,9 @@ type generatedCRDWrapper struct { } func (g *generatedCRDWrapper) UpdateStatus(sr *v1.Config, dbg string) error { + srCopy := sr.DeepCopy() return wait.Poll(3*time.Second, 30*time.Second, func() (bool, error) { - _, err := g.client.UpdateStatus(context.TODO(), sr, metav1.UpdateOptions{}) + _, err := g.client.UpdateStatus(context.TODO(), srCopy, metav1.UpdateOptions{}) if err == nil { return true, nil } @@ -236,8 +237,9 @@ func (g *generatedCRDWrapper) UpdateStatus(sr *v1.Config, dbg string) error { } func (g *generatedCRDWrapper) Update(sr *v1.Config) error { + srCopy := sr.DeepCopy() return wait.Poll(3*time.Second, 30*time.Second, func() (bool, error) { - _, err := g.client.Update(context.TODO(), sr, metav1.UpdateOptions{}) + _, err := g.client.Update(context.TODO(), srCopy, metav1.UpdateOptions{}) if err == nil { return true, nil } @@ -263,5 +265,9 @@ func (g *generatedCRDWrapper) Create(sr *v1.Config) error { } func (g *generatedCRDWrapper) Get(name string) (*v1.Config, error) { - return g.lister.Get(name) + c, err := g.lister.Get(name) + if err == nil && c != nil { + return c.DeepCopy(), nil + } + return c, err } diff --git a/pkg/stub/templates.go b/pkg/stub/templates.go index 33cb57040..4d4f6674b 100644 --- a/pkg/stub/templates.go +++ b/pkg/stub/templates.go @@ -16,7 +16,7 @@ func (h *Handler) processTemplateWatchEvent(t *templatev1.Template, deleted bool // we can observe high fetch rates on the config object) // imagestream image import tracking necessitates, after initial install or upgrade, requires the // fetch of the config object, so we did not rework to ordering of the version check within that method - if t.Annotations != nil && !deleted { + if t != nil && t.Annotations != nil && !deleted { isv, ok := t.Annotations[v1.SamplesVersionAnnotation] logrus.Debugf("Comparing template/%s version %s ok %v with git version %s", t.Name, isv, ok, h.version) if ok && isv == h.version { @@ -51,6 +51,9 @@ func (h *Handler) processTemplateWatchEvent(t *templatev1.Template, deleted bool // set t to nil so upsert will create t = nil } + if t != nil { + t = t.DeepCopy() + } err = h.upsertTemplate(template, t, cfg) if err != nil { if kerrors.IsAlreadyExists(err) { diff --git a/pkg/util/util.go b/pkg/util/util.go index f538d687a..64f5fede0 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -100,7 +100,7 @@ func Condition(s *samplev1.Config, c samplev1.ConfigConditionType) *samplev1.Con if s.Status.Conditions != nil { for _, rc := range s.Status.Conditions { if rc.Type == c { - return &rc + return rc.DeepCopy() } } } diff --git a/test/e2e/cluster_samples_operator_test.go b/test/e2e/cluster_samples_operator_test.go index 512fb3b69..3b9d4e09f 100644 --- a/test/e2e/cluster_samples_operator_test.go +++ b/test/e2e/cluster_samples_operator_test.go @@ -916,8 +916,6 @@ func TestSpecManagementStateField(t *testing.T) { verifyDeletedTemplatesNotRecreated(t) verifyClusterOperatorConditionsComplete(t, cfg.Status.Version, cfg.Status.ManagementState) - // get timestamp to check against in progress condition - now = kapis.Now() err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { // now switch back to default managed for any subsequent tests // and confirm all the default samples content exists @@ -946,22 +944,10 @@ func TestSpecManagementStateField(t *testing.T) { t.Fatalf("cfg status mgmt never went to managed %#v", verifyOperatorUp(t)) } - // wait for it to get into pending - err = wait.PollImmediate(1*time.Second, 3*time.Minute, func() (bool, error) { - cfg, err = crClient.SamplesV1().Configs().Get(context.TODO(), samplesapi.ConfigName, metav1.GetOptions{}) - if err != nil { - t.Logf("%v", err) - return false, nil - } - if util.Condition(cfg, samplesapi.ImageChangesInProgress).LastUpdateTime.After(now.Time) { - return true, nil - } - return false, nil - }) - if err != nil { - dumpPod(t) - t.Fatalf("error waiting for Config to get into pending: %v samples resource %#v", err, cfg) - } + // note, with the copy of cfg.Spec.ManagementState occurring in a separate UpdateStatus call, the + // subsequent event enables our optimized, only create missing samples, path, when means we + // do not bother with setting in progress to true, so there is no longer a need to check its progression + // now wait for it to get out of pending err = verifyConditionsCompleteSamplesAdded(t) if err != nil {