Skip to content

Commit

Permalink
Merge pull request #371 from gabemontero/bypass-k8s-panic-46
Browse files Browse the repository at this point in the history
Bug 1950809: add DeepCopy to avoid SharedInformer cache mutation
  • Loading branch information
openshift-merge-robot committed Apr 28, 2021
2 parents 5d0e832 + c7d17be commit 45bc9dd
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 254 deletions.
1 change: 1 addition & 0 deletions pkg/metrics/metrics.go
Expand Up @@ -114,6 +114,7 @@ func (sc *samplesCollector) Collect(ch chan<- prometheus.Metric) {
logrus.Infof("metrics sample config retrieval failed with: %s", err.Error())
return
}
cfg = cfg.DeepCopy()

if cfg.Spec.ManagementState == operatorv1.Removed ||
cfg.Spec.ManagementState == operatorv1.Unmanaged {
Expand Down
12 changes: 10 additions & 2 deletions pkg/stub/files.go
Expand Up @@ -16,7 +16,11 @@ import (
func (h *Handler) processFiles(dir string, files []os.FileInfo, opcfg *v1.Config) error {
for _, file := range files {
if file.IsDir() {
logrus.Printf("processing subdir %s from dir %s", file.Name(), dir)
if opcfg.Status.Version != h.version {
logrus.Printf("processing subdir %s from dir %s", file.Name(), dir)
} else {
logrus.Debugf("processing subdir %s from dir %s", file.Name(), dir)
}
subfiles, err := h.Filefinder.List(dir + "/" + file.Name())
if err != nil {
return h.processError(opcfg, v1.SamplesExist, corev1.ConditionUnknown, err, "error reading in content: %v")
Expand All @@ -28,7 +32,11 @@ func (h *Handler) processFiles(dir string, files []os.FileInfo, opcfg *v1.Config

continue
}
logrus.Printf("processing file %s from dir %s", file.Name(), dir)
if opcfg.Status.Version != h.version {
logrus.Printf("processing file %s from dir %s", file.Name(), dir)
} else {
logrus.Debugf("processing file %s from dir %s", file.Name(), dir)
}

if strings.HasSuffix(dir, "imagestreams") {
path := dir + "/" + file.Name()
Expand Down
129 changes: 63 additions & 66 deletions pkg/stub/handler.go
Expand Up @@ -161,6 +161,8 @@ func (h *Handler) prepSamplesWatchEvent(kind, name string, annotations map[strin
return nil, "", false, false, err
}

cfg = cfg.DeepCopy()

if cfg.DeletionTimestamp != nil {
// we do no return the cfg in this case because we do not want to bother with any progress tracking
logrus.Printf("Received watch event %s but not upserting since deletion of the Config is in progress", kind+"/"+name)
Expand Down Expand Up @@ -195,7 +197,7 @@ func (h *Handler) prepSamplesWatchEvent(kind, name string, annotations map[strin
case "imagestream":
filePath, inInventory = h.imagestreamFile[name]
if !inInventory {
logrus.Printf("watch stream event %s not part of operators inventory", name)
logrus.Debugf("watch stream event %s not part of operators inventory", name)
// we now have cases where sample providers are deleting entire imagestreams;
// let's make sure there are no stale entries with inprogress / importerror
processing := util.Condition(cfg, v1.ImageChangesInProgress)
Expand Down Expand Up @@ -253,7 +255,9 @@ func (h *Handler) prepSamplesWatchEvent(kind, name string, annotations map[strin
// we have gotten events for items early in the migration list but we have not
// finished processing the list
// avoid (re)upsert, but check import status
logrus.Printf("watch event for %s/%s while migration in progress, image in progress is false; will not update sample because of this event", kind, name)
if util.ConditionTrue(cfg, v1.MigrationInProgress) {
logrus.Printf("watch event for %s/%s while migration in progress, image in progress is false; will not update sample because of this event", kind, name)
}
return cfg, "", false, false, nil
}

Expand Down Expand Up @@ -633,6 +637,7 @@ func (h *Handler) Handle(event util.Event) error {
if err != nil {
return err
}
cfg = h.refetchCfgMinimizeConflicts(cfg)
h.GoodConditionUpdate(cfg, corev1.ConditionFalse, v1.SamplesExist)
dbg := "exist false update"
logrus.Printf("CRDUPDATE %s", dbg)
Expand All @@ -643,6 +648,7 @@ func (h *Handler) Handle(event util.Event) error {
}
} else {
logrus.Println("Initiating finalizer processing for a SampleResource delete attempt")
cfg = h.refetchCfgMinimizeConflicts(cfg)
h.RemoveFinalizer(cfg)
dbg := "remove finalizer update"
logrus.Printf("CRDUPDATE %s", dbg)
Expand Down Expand Up @@ -745,6 +751,7 @@ func (h *Handler) Handle(event util.Event) error {

// once the status version is in sync, we can turn off the migration condition
if util.ConditionTrue(cfg, v1.MigrationInProgress) {
cfg = h.refetchCfgMinimizeConflicts(cfg)
h.GoodConditionUpdate(cfg, corev1.ConditionFalse, v1.MigrationInProgress)
dbg := " turn migration off"
logrus.Printf("CRDUPDATE %s", dbg)
Expand All @@ -766,22 +773,36 @@ func (h *Handler) Handle(event util.Event) error {
// in progress
if configChangeRequiresUpsert &&
util.ConditionTrue(cfg, v1.ImageChangesInProgress) {
cfg = h.refetchCfgMinimizeConflicts(cfg)
h.GoodConditionUpdate(cfg, corev1.ConditionFalse, v1.ImageChangesInProgress)
dbg := "change in progress from true to false for config change"
logrus.Printf("CRDUPDATE %s", dbg)
// do not transfer config changes to status since we are just turning off in progress
// to start a new createSamples cycle
return h.crdwrapper.UpdateStatus(cfg, dbg)
}
}

cfg.Status.ManagementState = operatorsv1api.Managed
cfg = h.refetchCfgMinimizeConflicts(cfg)
if cfg.Status.ManagementState != operatorsv1api.Managed {
cfg.Status.ManagementState = operatorsv1api.Managed
dbg := "change status management state to managed"
logrus.Printf("CRDUPDATE %s", dbg)
return h.crdwrapper.UpdateStatus(cfg, dbg)
}

// if coming from remove turn off
cfg = h.refetchCfgMinimizeConflicts(cfg)
if util.ConditionTrue(cfg, v1.RemovePending) {
now := kapis.Now()
condition := util.Condition(cfg, v1.RemovePending)
condition.LastTransitionTime = now
condition.LastUpdateTime = now
condition.Status = corev1.ConditionFalse
util.ConditionUpdate(cfg, condition)
dbg := "change remove pending to false"
logrus.Printf("CRDUPDATE %s", dbg)
return h.crdwrapper.UpdateStatus(cfg, dbg)
}

cfg = h.refetchCfgMinimizeConflicts(cfg)
Expand Down Expand Up @@ -809,30 +830,13 @@ func (h *Handler) Handle(event util.Event) error {
dbg := "upd status version"
logrus.Printf("CRDUPDATE %s", dbg)
return h.crdwrapper.UpdateStatus(cfg, dbg)
/*if cfg.ConditionFalse(cfg, v1.ImportImageErrorsExist) {
cfg.Status.Version = h.version
logrus.Printf("The samples are now at version %s", cfg.Status.Version)
logrus.Println("CRDUPDATE upd status version")
return h.crdwrapper.UpdateStatus(cfg)
}
logrus.Printf("An image import error occurred applying the latest configuration on version %s, problem resolution needed", h.version
return nil
*/
}

if len(cfg.Spec.Architectures) == 0 {
cfg = h.updateCfgArch(cfg)
}

h.StoreCurrentValidConfig(cfg)

// now that we have stored the skip lists in status,
// cycle through the skip lists and update the managed flag if needed
for _, name := range cfg.Status.SkippedTemplates {
for _, name := range cfg.Spec.SkippedTemplates {
h.setSampleManagedLabelToFalse("template", name)
}
for _, name := range cfg.Status.SkippedImagestreams {
for _, name := range cfg.Spec.SkippedImagestreams {
h.setSampleManagedLabelToFalse("imagestream", name)
}

Expand All @@ -841,15 +845,22 @@ func (h *Handler) Handle(event util.Event) error {
if configChangeRequiresImportErrorUpdate && !configChangeRequiresUpsert {
dbg := "config change did not require upsert but did change import errors"
logrus.Printf("CRDUPDATE %s", dbg)
cfg = h.refetchCfgMinimizeConflicts(cfg)
// we have now "processed" the config and are executing changes, transfer current spec to status
h.StoreCurrentValidConfig(cfg)
return h.crdwrapper.UpdateStatus(cfg, dbg)
}

if configChanged && !configChangeRequiresUpsert && util.ConditionTrue(cfg, v1.SamplesExist) {
dbg := "bypassing upserts for non invasive config change after initial create"
logrus.Printf("CRDUPDATE %s", dbg)
cfg = h.refetchCfgMinimizeConflicts(cfg)
// we have now "processed" the config and are executing changes, transfer current spec to status
h.StoreCurrentValidConfig(cfg)
return h.crdwrapper.UpdateStatus(cfg, dbg)
}

cfg = h.refetchCfgMinimizeConflicts(cfg)
if !util.ConditionTrue(cfg, v1.ImageChangesInProgress) {
// pass in true to force rebuild of maps, which we do here because at this point
// we have taken on some form of config change
Expand All @@ -868,15 +879,14 @@ func (h *Handler) Handle(event util.Event) error {
if abortForDelete {
// a delete has been initiated; let's revert in progress to false and allow the delete to complete,
// including its removal of any sample we might have upserted with the above createSamples call
h.GoodConditionUpdate(h.refetchCfgMinimizeConflicts(cfg), corev1.ConditionFalse, v1.ImageChangesInProgress)
h.GoodConditionUpdate(cfg, corev1.ConditionFalse, v1.ImageChangesInProgress)
// note, the imagestream watch cache gets cleared once the deletion/finalizer processing commences
dbg := "progressing false because delete has arrived"
logrus.Printf("CRDUPDATE %s", dbg)
return h.crdwrapper.UpdateStatus(cfg, dbg)
}

if err != nil {
cfg = h.refetchCfgMinimizeConflicts(cfg)
h.processError(cfg, v1.SamplesExist, corev1.ConditionUnknown, err, "error creating samples: %v")
dbg := "setting samples exists to unknown"
logrus.Printf("CRDUPDATE %s", dbg)
Expand All @@ -887,7 +897,6 @@ func (h *Handler) Handle(event util.Event) error {
return err
}
now := kapis.Now()
cfg = h.refetchCfgMinimizeConflicts(cfg)
progressing := util.Condition(cfg, v1.ImageChangesInProgress)
progressing.Reason = ""
progressing.Message = ""
Expand All @@ -909,6 +918,8 @@ func (h *Handler) Handle(event util.Event) error {
progressing.Status = corev1.ConditionTrue
} else {
logrus.Debugln("there are no imagestreams available for import")
// we have now "processed" the config and are executing changes, transfer current spec to status
h.StoreCurrentValidConfig(cfg)
// see if there are unskipped templates, to set SamplesExists to true
if !util.ConditionTrue(cfg, v1.SamplesExist) {
templatesProcessed := false
Expand All @@ -921,7 +932,6 @@ func (h *Handler) Handle(event util.Event) error {
break
}
if templatesProcessed {
cfg = h.refetchCfgMinimizeConflicts(cfg)
h.GoodConditionUpdate(cfg, corev1.ConditionTrue, v1.SamplesExist)
dbg := "exist true update templates only"
logrus.Printf("CRDUPDATE %s", dbg)
Expand All @@ -938,13 +948,16 @@ func (h *Handler) Handle(event util.Event) error {
// of the "make a change" flow in our state machine
cfg = h.initConditions(cfg)

// we have now "processed" the config and are executing changes, transfer current spec to status
h.StoreCurrentValidConfig(cfg)

dbg := "progressing true update"
logrus.Printf("CRDUPDATE %s", dbg)
return h.crdwrapper.UpdateStatus(cfg, dbg)
}

cfg = h.refetchCfgMinimizeConflicts(cfg)
if !util.ConditionTrue(cfg, v1.SamplesExist) {
cfg = h.refetchCfgMinimizeConflicts(cfg)
h.GoodConditionUpdate(cfg, corev1.ConditionTrue, v1.SamplesExist)
dbg := "exist true update"
logrus.Printf("CRDUPDATE %s", dbg)
Expand All @@ -954,45 +967,23 @@ func (h *Handler) Handle(event util.Event) error {
// it is possible that all the cached imagestream events show that
// the image imports are complete; hence we would not get any more
// events until the next relist to clear out in progress; so let's
// cycle through them here now
if cache.AllUpsertEventsArrived() && util.ConditionTrue(cfg, v1.ImageChangesInProgress) {
keysToClear := []string{}
anyChange := false
ac := false
// clear it out now
if cache.AllUpsertEventsArrived() &&
util.ConditionTrue(cfg, v1.ImageChangesInProgress) &&
cfg.Spec.ManagementState == cfg.Status.ManagementState &&
cfg.Status.Version == h.version {
cfg = h.refetchCfgMinimizeConflicts(cfg)
for key, is := range cache.GetUpsertImageStreams() {
if is == nil {
// never got update, refetch
var e error
is, e = h.imageclientwrapper.Get(key)
if e != nil {
keysToClear = append(keysToClear, key)
anyChange = true
continue
}
}
cfg, _, ac = h.processImportStatus(is, cfg)
anyChange = anyChange || ac
}
for _, key := range keysToClear {
cache.RemoveUpsert(key)
util.ClearNameInReason(cfg, util.Condition(cfg, v1.ImageChangesInProgress).Reason, key)
util.ClearNameInReason(cfg, util.Condition(cfg, v1.ImportImageErrorsExist).Reason, key)
}
if len(strings.TrimSpace(util.Condition(cfg, v1.ImageChangesInProgress).Reason)) == 0 {
h.GoodConditionUpdate(cfg, corev1.ConditionFalse, v1.ImageChangesInProgress)
logrus.Println("The last in progress imagestream has completed (config event loop)")
}
if anyChange {
dbg := "updating in progress after examining cached imagestream events"
logrus.Printf("CRDUPDATE %s", dbg)
err = h.crdwrapper.UpdateStatus(cfg, dbg)
if err == nil && util.ConditionFalse(cfg, v1.ImageChangesInProgress) {
// only clear out cache if we got the update through
cache.ClearUpsertsCache()
}
return err
h.GoodConditionUpdate(cfg, corev1.ConditionFalse, v1.ImageChangesInProgress)
dbg := "updating in progress after examining cached imagestream events"
logrus.Printf("CRDUPDATE %s", dbg)

err = h.crdwrapper.UpdateStatus(cfg, dbg)
if err == nil {
// only clear out cache if we got the update through
cache.ClearUpsertsCache()
}
return err

}
}
return nil
Expand All @@ -1006,6 +997,7 @@ func (h *Handler) setSampleManagedLabelToFalse(kind, name string) error {
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
stream, err = h.imageclientwrapper.Get(name)
if err == nil && stream != nil && stream.Labels != nil {
stream = stream.DeepCopy()
label, _ := stream.Labels[v1.SamplesManagedLabel]
if label == "true" {
stream.Labels[v1.SamplesManagedLabel] = "false"
Expand All @@ -1019,6 +1011,7 @@ func (h *Handler) setSampleManagedLabelToFalse(kind, name string) error {
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
tpl, err = h.templateclientwrapper.Get(name)
if err == nil && tpl != nil && tpl.Labels != nil {
tpl = tpl.DeepCopy()
label, _ := tpl.Labels[v1.SamplesManagedLabel]
if label == "true" {
tpl.Labels[v1.SamplesManagedLabel] = "false"
Expand Down Expand Up @@ -1090,6 +1083,9 @@ func (h *Handler) createSamples(cfg *v1.Config, updateIfPresent, registryChanged
is = nil
}

if is != nil {
is = is.DeepCopy()
}
err = h.upsertImageStream(imagestream, is, cfg)
if err != nil {
if updateIfPresent {
Expand Down Expand Up @@ -1135,6 +1131,7 @@ func (h *Handler) createSamples(cfg *v1.Config, updateIfPresent, registryChanged
t = nil
}

t = t.DeepCopy()
err = h.upsertTemplate(template, t, cfg)
if err != nil {
return false, err
Expand All @@ -1149,9 +1146,9 @@ func (h *Handler) createSamples(cfg *v1.Config, updateIfPresent, registryChanged
func (h *Handler) refetchCfgMinimizeConflicts(cfg *v1.Config) *v1.Config {
c, e := h.crdwrapper.Get(cfg.Name)
if e == nil {
return c
return c.DeepCopy()
}
return cfg
return cfg.DeepCopy()
}

func getTemplateClient(restconfig *restclient.Config) (*templatev1client.TemplateV1Client, error) {
Expand Down

0 comments on commit 45bc9dd

Please sign in to comment.