Skip to content

Commit

Permalink
Merge pull request #338 from gabemontero/more-aggressive-progress-false
Browse files Browse the repository at this point in the history
set progressing false on imagestream events as well if no active streams
  • Loading branch information
openshift-merge-robot committed Nov 17, 2020
2 parents 840c02c + 2c44279 commit 39ed058
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 86 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ The samples resource maintains the following conditions in its status:
-- False when all imagestream changes are acknowledged
-- The list of pending imagestreams will be represented by a configmap for each imagestream in the samples operator's namespace (where the imagestream name is the configmap name). When the imagestream has completed imports, the respective configmap for the imagestream is deleted.
- ImportCredentialsExist
-- A 'samples-registry-credentials' secret has been copied into the openshift namespace
-- Credentials for pulling from `registry.redhat.io` exist in the `pull-secret` Secret in the `openshift-config` namespace
- ConfigurationValid
-- True or false based on whether any of the restricted changes noted above have been submitted
- RemovePending
Expand All @@ -78,7 +78,7 @@ The samples resource maintains the following conditions in its status:
-- True when an error has occurred
-- The configmap associated with the imagestream will remain in the samples operator's namespace. There will be a key in the configmap's data map for each imagestreamtag, where the value of the entry will be the error message reported in the imagestream status.
- MigrationInProgress
-- True when the samples operator has detected that its version is different than the samples operator version the current samples set were installed with
-- This condition is deprecated as of the 4.7 branch of this repository. Upgrade tracking is now achieved via the other conditions and both the per imagestream configmaps and the imagestream-to-image configmap.

# CVO Cluster Operator Status

Expand Down
23 changes: 21 additions & 2 deletions pkg/stub/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,34 @@ func (h *Handler) buildFileMaps(cfg *v1.Config, forceRebuild bool) error {
cm = &corev1.ConfigMap{}
cm.Name = util.IST2ImageMap
cm.Namespace = v1.OperatorNamespace
cm.Annotations = map[string]string{}
cm.Annotations[v1.SamplesVersionAnnotation] = h.version
cm.Data = map[string]string{}
for key, value := range h.imagestreatagToImage {
cm.Data[key] = value
}
_, err = h.configmapclientwrapper.Create(cm)
if err != nil {
if _, err = h.configmapclientwrapper.Create(cm); err != nil {
return err
}
}
if err == nil {
if cm.Annotations == nil {
cm.Annotations = map[string]string{}
}
version, ok := cm.Annotations[v1.SamplesVersionAnnotation]
if !ok || version != h.version {
logrus.Printf("Updating %s configmap to version %s", util.IST2ImageMap, h.version)
cm.Annotations[v1.SamplesVersionAnnotation] = h.version
cm.Data = map[string]string{}
for key, value := range h.imagestreatagToImage {
cm.Data[key] = value
}
_, err = h.configmapclientwrapper.Update(cm)
if err != nil {
return err
}
}
}

return err
}
Expand Down
33 changes: 16 additions & 17 deletions pkg/stub/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,35 +166,34 @@ func (h *Handler) storeImageStreamTagError(isName, tagName, message string) erro
return err
}

func (h *Handler) clearImageStreamTagError(isName, tagName string) error {
func (h *Handler) clearImageStreamTagError(isName string, tags []string) error {
cm, err := h.configmapclientwrapper.Get(isName)
if err != nil {
if !kerrors.IsNotFound(err) {
logrus.Warningf("unexpected error on get of configmap %s: %s", isName, err.Error())
return err
}
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: isName,
Namespace: v1.OperatorNamespace,
},
Data: map[string]string{},
}
cm, err = h.configmapclientwrapper.Create(cm)
if err != nil && !kerrors.IsAlreadyExists(err) {
return err
}
logrus.Printf("clearImageStreamTagError: stream %s already deleted so no worries on clearing tags", isName)
return nil
}
if cm.Data == nil {
return nil
}
_, hasTag := cm.Data[tagName]
if !hasTag {
return nil
hasTag := false
for _, tagName := range tags {
_, ok := cm.Data[tagName]
if !ok {
continue
}
hasTag = true
logrus.Printf("clearing error messages from configmap for stream %s and tag %s", isName, tagName)
delete(cm.Data, tagName)

}
delete(cm.Data, tagName)

_, err = h.configmapclientwrapper.Update(cm)
if hasTag {
_, err = h.configmapclientwrapper.Update(cm)
}
return err

}
153 changes: 117 additions & 36 deletions pkg/stub/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ func (h *Handler) prepSamplesWatchEvent(kind, name string, annotations map[strin
return cfg, filePath, true, nil
}

if util.ConditionFalse(cfg, v1.ImageChangesInProgress) &&
(util.ConditionTrue(cfg, v1.MigrationInProgress) || h.version != cfg.Status.Version) {
if h.shouldSetVersion(cfg) {
// we have gotten events for items early in the migration list but we have not
// finished processing the list
// avoid (re)upsert, but check import status
Expand Down Expand Up @@ -759,15 +758,7 @@ func (h *Handler) Handle(event util.Event) error {
util.ConditionTrue(cfg, v1.SamplesExist) &&
util.ConditionFalse(cfg, v1.ImageChangesInProgress) &&
h.version == cfg.Status.Version {
logrus.Debugf("At steady state: config the same and exists is true, in progress false, and version correct")

// once the status version is in sync, we can turn off the migration condition
if util.ConditionTrue(cfg, v1.MigrationInProgress) {
h.GoodConditionUpdate(cfg, corev1.ConditionFalse, v1.MigrationInProgress)
dbg := " turn migration off"
logrus.Printf("CRDUPDATE %s", dbg)
return h.crdwrapper.UpdateStatus(cfg, dbg)
}
logrus.Printf("At steady state: config the same and exists is true, in progress false, and version correct")

// migration inevitably means we need to refresh the file cache as samples are added and
// deleted between releases, so force file map building
Expand Down Expand Up @@ -803,30 +794,7 @@ func (h *Handler) Handle(event util.Event) error {
}

cfg = h.refetchCfgMinimizeConflicts(cfg)
if util.ConditionFalse(cfg, v1.MigrationInProgress) &&
len(cfg.Status.Version) > 0 &&
h.version != cfg.Status.Version {
// delete ist to image map as we will need to build a new one;
// we do not remove this map during delete/remove processing
// to facilitate disconnected users deciding which images to mirror
err := h.configmapclientwrapper.Delete(util.IST2ImageMap)
if err != nil {
// simply retry
return err
}
logrus.Printf("Undergoing migration from %s to %s", cfg.Status.Version, h.version)
h.GoodConditionUpdate(cfg, corev1.ConditionTrue, v1.MigrationInProgress)
dbg := "turn migration on"
logrus.Printf("CRDUPDATE %s", dbg)
return h.crdwrapper.UpdateStatus(cfg, dbg)
}

cfg = h.refetchCfgMinimizeConflicts(cfg)
if !configChanged &&
util.ConditionTrue(cfg, v1.SamplesExist) &&
util.ConditionFalse(cfg, v1.ImageChangesInProgress) &&
util.Condition(cfg, v1.MigrationInProgress).LastUpdateTime.Before(&util.Condition(cfg, v1.ImageChangesInProgress).LastUpdateTime) &&
h.version != cfg.Status.Version {
if !configChanged && h.shouldSetVersion(cfg) {
if util.ConditionTrue(cfg, v1.ImportImageErrorsExist) {
logrus.Printf("An image import error occurred applying the latest configuration on version %s; this operator will periodically retry the import, or an administrator can investigate and remedy manually", h.version)
}
Expand Down Expand Up @@ -1073,7 +1041,7 @@ func (h *Handler) createSamples(cfg *v1.Config, updateIfPresent, registryChanged

// if after initial startup, and not migration, the only cfg change was changing the registry, since that does not impact
// the templates, we can move on
if len(unskippedTemplates) == 0 && registryChanged && util.ConditionTrue(cfg, v1.SamplesExist) && util.ConditionFalse(cfg, v1.MigrationInProgress) {
if len(unskippedTemplates) == 0 && registryChanged && h.sufficientSteadyStateForAnOperator(cfg) {
return false, nil
}

Expand Down Expand Up @@ -1138,3 +1106,116 @@ func GetNamespace() string {
b, _ := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/" + corev1.ServiceAccountNamespaceKey)
return string(b)
}

func (h *Handler) shouldSetProgressingFalse(cfg *v1.Config) bool {
present := h.numOfManagedImageStreamsPresent()
managed := h.numOfManagedImageStreams()
if present < managed {
logrus.Printf("shouldSetProgressingFalse: only %d of the %d imagestreams exist", present, managed)
}
return util.ConditionTrue(cfg, v1.SamplesExist) &&
util.ConditionTrue(cfg, v1.ImageChangesInProgress) &&
(len(h.activeImageStreams()) == 0 || util.ConditionTrue(cfg, v1.ImportImageErrorsExist)) &&
!h.upsertInProgress &&
present >= managed
}

func (h *Handler) shouldSetVersion(cfg *v1.Config) bool {
return util.ConditionTrue(cfg, v1.SamplesExist) &&
util.ConditionFalse(cfg, v1.ImageChangesInProgress) &&
!h.upsertInProgress &&
h.version != cfg.Status.Version &&
h.areStreamsAtCorrectVersion()
}

func (h *Handler) sufficientSteadyStateForAnOperator(cfg *v1.Config) bool {
return util.ConditionTrue(cfg, v1.SamplesExist) &&
util.ConditionFalse(cfg, v1.ImageChangesInProgress) &&
h.version == cfg.Status.Version
}

func (h *Handler) areStreamsAtCorrectVersion() bool {
for key, _ := range h.imagestreamFile {
// remember, our get wrapper goes through the lister, so we are only hitting the controller cache
skipped, ok := h.skippedImagestreams[key]
if skipped && ok {
continue
}
is, err := h.imageclientwrapper.Get(key)
if err != nil {
if !h.upsertInProgress {
// not all imagestreams may be created yet during initial or bulk creations
logrus.Warningf("areStreamsAtCorrectVersion: get of stream %s got error: %s", key, err.Error())
}
return false
}
if is == nil {
if !h.upsertInProgress {
// not all imagestreams may be created yet during initial or bulk creations
logrus.Warningf("areStreamsAtCorrectVersion: got nil for stream %s but no error", key)
}
return false
}
if is.Annotations == nil {
logrus.Warningf("areStreamsAtCorrectVersion: stream %s has no annotation map", key)
return false
}
isv, ok := is.Annotations[v1.SamplesVersionAnnotation]
if !ok {
logrus.Warningf("areStreamsAtCorrectVersion: stream %s does not have version annotation", key)
return false
}
if isv != h.version {
logrus.Warningf("areStreamsAtCorrectVersion: stream %s is at version %s instead of %s", key, isv, h.version)
return false
}
}
logrus.Printf("areStreamsAtCorrectVersion: all streams are now at the correct version %s", h.version)
return true
}

func (h *Handler) numOfManagedImageStreams() int {
rc := len(h.imagestreamFile) - len(h.skippedImagestreams)
return rc
}

func (h *Handler) numOfManagedImageStreamsPresent() int {
rc := 0
for key, _ := range h.imagestreamFile {
// remember, our get wrapper goes through the lister, so we are only hitting the controller cache
skipped, ok := h.skippedImagestreams[key]
if skipped && ok {
continue
}
is, err := h.imageclientwrapper.Get(key)
if err != nil {
if !h.upsertInProgress {
// not all imagestreams may be created yet during initial or bulk creations
logrus.Warningf("numOfManagedImageStreamsPresent: get of stream %s got error: %s", key, err.Error())
}
continue
}
if is == nil {
if !h.upsertInProgress {
// not all imagestreams may be created yet during initial or bulk creations
logrus.Warningf("numOfManagedImageStreamsPresent: got nil for stream %s but no error", key)
}
continue
}
if is.Labels == nil {
logrus.Warningf("numOfManagedImageStreamsPresent: stream %s has no labels map", key)
continue
}
managed, ok := is.Labels[v1.SamplesManagedLabel]
if !ok {
logrus.Warningf("numOfManagedImageStreamsPresent: stream %s does not have managed label", key)
continue
}
if managed != "true" {
logrus.Warningf("numOfManagedImageStreamsPresent: stream %s has managed value of %s but is not in internal skip list", key, managed)
continue
}
rc++
}
return rc
}
86 changes: 86 additions & 0 deletions pkg/stub/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,92 @@ func TestImageStreamImportError(t *testing.T) {
}
}

func TestImageStreamTagImportErrorRecovery(t *testing.T) {
two := int64(2)
one := int64(1)
stream := &imagev1.ImageStream{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Annotations: map[string]string{
v1.SamplesVersionAnnotation: TestVersion,
},
},
Spec: imagev1.ImageStreamSpec{
Tags: []imagev1.TagReference{
{
Name: "1",
Generation: &two,
},
{
Name: "2",
Generation: &one,
},
},
},
Status: imagev1.ImageStreamStatus{
Tags: []imagev1.NamedTagEventList{
{
Tag: "1",
Items: []imagev1.TagEvent{
{
Generation: two,
},
},
Conditions: []imagev1.TagEventCondition{
{
Status: corev1.ConditionFalse,
Generation: two,
},
},
},
{
Tag: "2",
Items: []imagev1.TagEvent{
{
Generation: two,
},
},
},
},
},
}
h, cfg, _ := setup()
mimic(&h, x86ContentRootDir)
dir := h.GetBaseDir(v1.X86Architecture, cfg)
files, _ := h.Filefinder.List(dir)
h.processFiles(dir, files, cfg)
importError := util.Condition(cfg, v1.ImportImageErrorsExist)
importError.Status = corev1.ConditionTrue
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
Data: map[string]string{
"1": "could not import",
"2": "could not import",
},
}
fakecmclient := h.configmapclientwrapper.(*fakeConfigMapClientWrapper)
fakecmclient.configMaps["foo"] = cm
util.ConditionUpdate(cfg, importError)
err := h.processImageStreamWatchEvent(stream, false)
if err != nil {
t.Fatalf("processImageStreamWatchEvent error %#v", err)
}
cm, stillHasCM := fakecmclient.configMaps["foo"]
if !stillHasCM {
t.Fatalf("partially clean imagestream did not keep configmap")
}
_, stillHasCleanTag := cm.Data["2"]
if stillHasCleanTag {
t.Fatalf("partially clean imagestream still has clean tag in configmap")
}
_, stillHasBrokenTag := cm.Data["1"]
if !stillHasBrokenTag {
t.Fatalf("partially clean imagestream does not have tag that is still broke in configmap")
}
}

func TestImageStreamImportErrorRecovery(t *testing.T) {
two := int64(2)
one := int64(1)
Expand Down

0 comments on commit 39ed058

Please sign in to comment.