diff --git a/pkg/stub/handler.go b/pkg/stub/handler.go index 6dcdaaa18..5a3d7860f 100644 --- a/pkg/stub/handler.go +++ b/pkg/stub/handler.go @@ -93,6 +93,8 @@ func NewSamplesOperatorHandler(kubeconfig *restclient.Config, h.imagestreamFile = make(map[string]string) h.templateFile = make(map[string]string) + h.imagestreamRetry = make(map[string]metav1.Time) + h.mapsMutex = sync.Mutex{} h.version = os.Getenv("RELEASE_VERSION") @@ -131,6 +133,8 @@ type Handler struct { imagestreamFile map[string]string templateFile map[string]string + imagestreamRetry map[string]metav1.Time + mapsMutex sync.Mutex upsertInProgress bool diff --git a/pkg/stub/handler_test.go b/pkg/stub/handler_test.go index 42bd28cb1..ae5113b5c 100644 --- a/pkg/stub/handler_test.go +++ b/pkg/stub/handler_test.go @@ -474,7 +474,14 @@ func TestImageStreamErrorRetry(t *testing.T) { Name: "foofoo", }, }, - }, + { + Name: "bar", + Generation: &tagVersion, + From: &corev1.ObjectReference{ + Kind: "DockerImage", + Name: "foofoo", + }, + }}, }, Status: imagev1.ImageStreamStatus{ Tags: []imagev1.NamedTagEventList{ @@ -483,12 +490,21 @@ func TestImageStreamErrorRetry(t *testing.T) { Conditions: []imagev1.TagEventCondition{ { Generation: tagVersion, - Status: corev1.ConditionFalse, - Message: "failed import", + Status: corev1.ConditionFalse, + Message: "failed import", }, }, }, - }, + { + Tag: "bar", + Conditions: []imagev1.TagEventCondition{ + { + Generation: tagVersion, + Status: corev1.ConditionFalse, + Message: "failed import", + }, + }, + }}, }, } @@ -498,6 +514,13 @@ func TestImageStreamErrorRetry(t *testing.T) { t.Fatalf("Import Error Condition not true: %#v", cfg) } + fakeisclient := h.imageclientwrapper.(*fakeImageStreamClientWrapper) + fakeimporter := fakeisclient.ImageStreamImports("foo").(*fakeImageStreamImporter) + + if fakeimporter.count != 2 { + t.Fatalf("incorrect amount of import calls %d", fakeimporter.count) + } + initialImportErrorLastUpdateTime := cfg.Condition(v1.ImportImageErrorsExist).LastUpdateTime h.processImageStreamWatchEvent(is, false) // refetch to see if updated @@ -507,26 +530,32 @@ func TestImageStreamErrorRetry(t *testing.T) { } // now let's push back 15 minutes and update - importError.LastUpdateTime.Time = metav1.Now().Add(-15 * time.Minute) + lastUpdateTime, _ := h.imagestreamRetry[is.Name] + lastUpdateTime.Time = metav1.Now().Add(-15 * time.Minute) + h.imagestreamRetry[is.Name] = lastUpdateTime cfg.ConditionUpdate(importError) // save a copy for compare - fifteenMinutesAgo := importError.LastUpdateTime + fifteenMinutesAgo := lastUpdateTime h.processImageStreamWatchEvent(is, false) // refetch and make sure it has changed - if cfg.Condition(v1.ImportImageErrorsExist).LastUpdateTime.Equal(&fifteenMinutesAgo) { + lastUpdateTime, _ = h.imagestreamRetry[is.Name] + if lastUpdateTime.Equal(&fifteenMinutesAgo) { t.Fatalf("Import Error Condition should have been updated: old update time %s new update time %s", initialImportErrorLastUpdateTime.String(), cfg.Condition(v1.ImportImageErrorsExist).LastUpdateTime.String()) } tagVersion = int64(2) - is.Status.Tags[0].Conditions[0] = imagev1.TagEventCondition{} - is.Status.Tags[0].Items = []imagev1.TagEvent{ - { - Generation: tagVersion, - }, + for _, tag := range is.Status.Tags { + tag.Conditions[0] = imagev1.TagEventCondition{} + tag.Items = []imagev1.TagEvent{ + { + Generation: tagVersion, + }, + } } + h.processImageStreamWatchEvent(is, false) if cfg.ConditionTrue(v1.ImportImageErrorsExist) { @@ -1318,6 +1347,7 @@ func NewTestHandler() Handler { h.imagestreamFile = make(map[string]string) h.templateFile = make(map[string]string) + h.imagestreamRetry = make(map[string]metav1.Time) h.version = TestVersion return h @@ -1408,11 +1438,12 @@ func (f *fakeResourceFileLister) List(dir string) ([]os.FileInfo, error) { } type fakeImageStreamClientWrapper struct { - streams map[string]*imagev1.ImageStream - upsertkeys map[string]bool - geterrors map[string]error - upserterrors map[string]error - listerrors map[string]error + streams map[string]*imagev1.ImageStream + upsertkeys map[string]bool + geterrors map[string]error + upserterrors map[string]error + listerrors map[string]error + fakeImageStreamImporter *fakeImageStreamImporter } func (f *fakeImageStreamClientWrapper) Get(name string) (*imagev1.ImageStream, error) { @@ -1462,12 +1493,18 @@ func (f *fakeImageStreamClientWrapper) Watch() (watch.Interface, error) { } func (f *fakeImageStreamClientWrapper) ImageStreamImports(namespace string) imagev1client.ImageStreamImportInterface { - return &fakeImageStreamImporter{} + if f.fakeImageStreamImporter == nil { + f.fakeImageStreamImporter = &fakeImageStreamImporter{} + } + return f.fakeImageStreamImporter } -type fakeImageStreamImporter struct {} +type fakeImageStreamImporter struct { + count int +} func (f *fakeImageStreamImporter) Create(*imagev1.ImageStreamImport) (*imagev1.ImageStreamImport, error) { + f.count++ return &imagev1.ImageStreamImport{}, nil } diff --git a/pkg/stub/imagestreams.go b/pkg/stub/imagestreams.go index 5e1d050a9..630025fb5 100644 --- a/pkg/stub/imagestreams.go +++ b/pkg/stub/imagestreams.go @@ -186,6 +186,8 @@ func (h *Handler) upsertImageStream(imagestreamInOperatorImage, imagestreamInClu // whether we are now skipping this imagestream, or are upserting it, remove any prior errors from the import error // condition; in the skip case, we don't want errors to a now skipped stream blocking availability status; in the upsert // case, any errors will cause the imagestream controller to attempt another image import + h.mapsMutex.Lock() + defer h.mapsMutex.Unlock() h.clearStreamFromImportError(imagestreamInOperatorImage.Name, opcfg.Condition(v1.ImportImageErrorsExist), opcfg) if _, isok := h.skippedImagestreams[imagestreamInOperatorImage.Name]; isok { @@ -317,6 +319,7 @@ func (h *Handler) coreUpdateDockerPullSpec(oldreg, newreg string, oldies []strin return oldreg } +// clearStreamFromImportError assumes the caller has call h.mapsMutex.Lock() and Unlock() appropriately func (h *Handler) clearStreamFromImportError(name string, importError *v1.ConfigCondition, cfg *v1.Config) *v1.ConfigCondition { if cfg.NameInReason(importError.Reason, name) { logrus.Printf("clearing imagestream %s from the import image error condition", name) @@ -333,6 +336,7 @@ func (h *Handler) clearStreamFromImportError(name string, importError *v1.Config importError.Status = corev1.ConditionFalse importError.Reason = "" importError.Message = "" + delete(h.imagestreamRetry, name) } else { importError.Status = corev1.ConditionTrue } @@ -351,6 +355,9 @@ func (h *Handler) processImportStatus(is *imagev1.ImageStream, cfg *v1.Config) ( importError := cfg.Condition(v1.ImportImageErrorsExist) nonMatchDetail := "" anyConditionUpdate := false + // in case we have to manipulate imagestream retry map + h.mapsMutex.Lock() + defer h.mapsMutex.Unlock() // need to check for error conditions outside of the spec/status comparison because in an error scenario // you can end up with less status tags than spec tags (especially if one tag refs another), but don't cite @@ -361,6 +368,19 @@ func (h *Handler) processImportStatus(is *imagev1.ImageStream, cfg *v1.Config) ( // so reaching this point means we have a prior upsert in progress, and we just want to track the status logrus.Debugf("checking tag spec/status for %s spec len %d status len %d", is.Name, len(is.Spec.Tags), len(is.Status.Tags)) + // get the retry time for this imagestream + now := kapis.Now() + lastRetryTime, ok := h.imagestreamRetry[is.Name] + retryIfNeeded := false + if !ok { + retryIfNeeded = true + } else { + // a little bit less than the 15 minute relist interval + tenMinutesAgo := now.Time.Add(-10 * time.Minute) + retryIfNeeded = lastRetryTime.Time.Before(tenMinutesAgo) + } + + for _, statusTag := range is.Status.Tags { // if an error occurred with the latest generation, let's give up as we are no longer "in progress" // in that case as well, but mark the import failure @@ -386,11 +406,10 @@ func (h *Handler) processImportStatus(is *imagev1.ImageStream, cfg *v1.Config) ( // add this imagestream to the Reason field; // we don't want to initiate imports repeatedly, but we do want to retry periodically as part // of relist - now := kapis.Now() - // a little bit less than the 15 minute relist interval - tenMinutesAgo := now.Time.Add(-10 * time.Minute) + if !cfg.NameInReason(importError.Reason, is.Name) || - importError.LastUpdateTime.Time.Before(tenMinutesAgo){ + retryIfNeeded { + h.imagestreamRetry[is.Name] = now if !cfg.NameInReason(importError.Reason, is.Name) { importError.Reason = importError.Reason + is.Name + " " importError.Message = importError.Message + "" + message + "" @@ -420,7 +439,6 @@ func (h *Handler) processImportStatus(is *imagev1.ImageStream, cfg *v1.Config) ( logrus.Printf("initiated an imagestreamimport retry for imagestream/tag %s/%s", is.Name, statusTag.Tag) } - break } } }