Skip to content

Commit

Permalink
retry every failed IST; track retry on imagestream basis vs. global b…
Browse files Browse the repository at this point in the history
…asis
  • Loading branch information
gabemontero authored and openshift-cherrypick-robot committed Nov 6, 2019
1 parent 533c999 commit 9ddd413
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 24 deletions.
4 changes: 4 additions & 0 deletions pkg/stub/handler.go
Expand Up @@ -93,6 +93,8 @@ func NewSamplesOperatorHandler(kubeconfig *restclient.Config,
h.imagestreamFile = make(map[string]string)
h.templateFile = make(map[string]string)

h.imagestreamRetry = make(map[string]metav1.Time)

h.mapsMutex = sync.Mutex{}
h.version = os.Getenv("RELEASE_VERSION")

Expand Down Expand Up @@ -131,6 +133,8 @@ type Handler struct {
imagestreamFile map[string]string
templateFile map[string]string

imagestreamRetry map[string]metav1.Time

mapsMutex sync.Mutex

upsertInProgress bool
Expand Down
75 changes: 56 additions & 19 deletions pkg/stub/handler_test.go
Expand Up @@ -474,7 +474,14 @@ func TestImageStreamErrorRetry(t *testing.T) {
Name: "foofoo",
},
},
},
{
Name: "bar",
Generation: &tagVersion,
From: &corev1.ObjectReference{
Kind: "DockerImage",
Name: "foofoo",
},
}},
},
Status: imagev1.ImageStreamStatus{
Tags: []imagev1.NamedTagEventList{
Expand All @@ -483,12 +490,21 @@ func TestImageStreamErrorRetry(t *testing.T) {
Conditions: []imagev1.TagEventCondition{
{
Generation: tagVersion,
Status: corev1.ConditionFalse,
Message: "failed import",
Status: corev1.ConditionFalse,
Message: "failed import",
},
},
},
},
{
Tag: "bar",
Conditions: []imagev1.TagEventCondition{
{
Generation: tagVersion,
Status: corev1.ConditionFalse,
Message: "failed import",
},
},
}},
},
}

Expand All @@ -498,6 +514,13 @@ func TestImageStreamErrorRetry(t *testing.T) {
t.Fatalf("Import Error Condition not true: %#v", cfg)
}

fakeisclient := h.imageclientwrapper.(*fakeImageStreamClientWrapper)
fakeimporter := fakeisclient.ImageStreamImports("foo").(*fakeImageStreamImporter)

if fakeimporter.count != 2 {
t.Fatalf("incorrect amount of import calls %d", fakeimporter.count)
}

initialImportErrorLastUpdateTime := cfg.Condition(v1.ImportImageErrorsExist).LastUpdateTime
h.processImageStreamWatchEvent(is, false)
// refetch to see if updated
Expand All @@ -507,26 +530,32 @@ func TestImageStreamErrorRetry(t *testing.T) {
}

// now let's push back 15 minutes and update
importError.LastUpdateTime.Time = metav1.Now().Add(-15 * time.Minute)
lastUpdateTime, _ := h.imagestreamRetry[is.Name]
lastUpdateTime.Time = metav1.Now().Add(-15 * time.Minute)
h.imagestreamRetry[is.Name] = lastUpdateTime
cfg.ConditionUpdate(importError)
// save a copy for compare
fifteenMinutesAgo := importError.LastUpdateTime
fifteenMinutesAgo := lastUpdateTime

h.processImageStreamWatchEvent(is, false)

// refetch and make sure it has changed
if cfg.Condition(v1.ImportImageErrorsExist).LastUpdateTime.Equal(&fifteenMinutesAgo) {
lastUpdateTime, _ = h.imagestreamRetry[is.Name]
if lastUpdateTime.Equal(&fifteenMinutesAgo) {
t.Fatalf("Import Error Condition should have been updated: old update time %s new update time %s", initialImportErrorLastUpdateTime.String(), cfg.Condition(v1.ImportImageErrorsExist).LastUpdateTime.String())
}

tagVersion = int64(2)
is.Status.Tags[0].Conditions[0] = imagev1.TagEventCondition{}
is.Status.Tags[0].Items = []imagev1.TagEvent{
{
Generation: tagVersion,
},
for _, tag := range is.Status.Tags {
tag.Conditions[0] = imagev1.TagEventCondition{}
tag.Items = []imagev1.TagEvent{
{
Generation: tagVersion,
},
}
}


h.processImageStreamWatchEvent(is, false)

if cfg.ConditionTrue(v1.ImportImageErrorsExist) {
Expand Down Expand Up @@ -1318,6 +1347,7 @@ func NewTestHandler() Handler {

h.imagestreamFile = make(map[string]string)
h.templateFile = make(map[string]string)
h.imagestreamRetry = make(map[string]metav1.Time)
h.version = TestVersion

return h
Expand Down Expand Up @@ -1408,11 +1438,12 @@ func (f *fakeResourceFileLister) List(dir string) ([]os.FileInfo, error) {
}

type fakeImageStreamClientWrapper struct {
streams map[string]*imagev1.ImageStream
upsertkeys map[string]bool
geterrors map[string]error
upserterrors map[string]error
listerrors map[string]error
streams map[string]*imagev1.ImageStream
upsertkeys map[string]bool
geterrors map[string]error
upserterrors map[string]error
listerrors map[string]error
fakeImageStreamImporter *fakeImageStreamImporter
}

func (f *fakeImageStreamClientWrapper) Get(name string) (*imagev1.ImageStream, error) {
Expand Down Expand Up @@ -1462,12 +1493,18 @@ func (f *fakeImageStreamClientWrapper) Watch() (watch.Interface, error) {
}

func (f *fakeImageStreamClientWrapper) ImageStreamImports(namespace string) imagev1client.ImageStreamImportInterface {
return &fakeImageStreamImporter{}
if f.fakeImageStreamImporter == nil {
f.fakeImageStreamImporter = &fakeImageStreamImporter{}
}
return f.fakeImageStreamImporter
}

type fakeImageStreamImporter struct {}
type fakeImageStreamImporter struct {
count int
}

func (f *fakeImageStreamImporter) Create(*imagev1.ImageStreamImport) (*imagev1.ImageStreamImport, error) {
f.count++
return &imagev1.ImageStreamImport{}, nil
}

Expand Down
28 changes: 23 additions & 5 deletions pkg/stub/imagestreams.go
Expand Up @@ -186,6 +186,8 @@ func (h *Handler) upsertImageStream(imagestreamInOperatorImage, imagestreamInClu
// whether we are now skipping this imagestream, or are upserting it, remove any prior errors from the import error
// condition; in the skip case, we don't want errors to a now skipped stream blocking availability status; in the upsert
// case, any errors will cause the imagestream controller to attempt another image import
h.mapsMutex.Lock()
defer h.mapsMutex.Unlock()
h.clearStreamFromImportError(imagestreamInOperatorImage.Name, opcfg.Condition(v1.ImportImageErrorsExist), opcfg)

if _, isok := h.skippedImagestreams[imagestreamInOperatorImage.Name]; isok {
Expand Down Expand Up @@ -317,6 +319,7 @@ func (h *Handler) coreUpdateDockerPullSpec(oldreg, newreg string, oldies []strin
return oldreg
}

// clearStreamFromImportError assumes the caller has call h.mapsMutex.Lock() and Unlock() appropriately
func (h *Handler) clearStreamFromImportError(name string, importError *v1.ConfigCondition, cfg *v1.Config) *v1.ConfigCondition {
if cfg.NameInReason(importError.Reason, name) {
logrus.Printf("clearing imagestream %s from the import image error condition", name)
Expand All @@ -333,6 +336,7 @@ func (h *Handler) clearStreamFromImportError(name string, importError *v1.Config
importError.Status = corev1.ConditionFalse
importError.Reason = ""
importError.Message = ""
delete(h.imagestreamRetry, name)
} else {
importError.Status = corev1.ConditionTrue
}
Expand All @@ -351,6 +355,9 @@ func (h *Handler) processImportStatus(is *imagev1.ImageStream, cfg *v1.Config) (
importError := cfg.Condition(v1.ImportImageErrorsExist)
nonMatchDetail := ""
anyConditionUpdate := false
// in case we have to manipulate imagestream retry map
h.mapsMutex.Lock()
defer h.mapsMutex.Unlock()

// need to check for error conditions outside of the spec/status comparison because in an error scenario
// you can end up with less status tags than spec tags (especially if one tag refs another), but don't cite
Expand All @@ -361,6 +368,19 @@ func (h *Handler) processImportStatus(is *imagev1.ImageStream, cfg *v1.Config) (
// so reaching this point means we have a prior upsert in progress, and we just want to track the status
logrus.Debugf("checking tag spec/status for %s spec len %d status len %d", is.Name, len(is.Spec.Tags), len(is.Status.Tags))

// get the retry time for this imagestream
now := kapis.Now()
lastRetryTime, ok := h.imagestreamRetry[is.Name]
retryIfNeeded := false
if !ok {
retryIfNeeded = true
} else {
// a little bit less than the 15 minute relist interval
tenMinutesAgo := now.Time.Add(-10 * time.Minute)
retryIfNeeded = lastRetryTime.Time.Before(tenMinutesAgo)
}


for _, statusTag := range is.Status.Tags {
// if an error occurred with the latest generation, let's give up as we are no longer "in progress"
// in that case as well, but mark the import failure
Expand All @@ -386,11 +406,10 @@ func (h *Handler) processImportStatus(is *imagev1.ImageStream, cfg *v1.Config) (
// add this imagestream to the Reason field;
// we don't want to initiate imports repeatedly, but we do want to retry periodically as part
// of relist
now := kapis.Now()
// a little bit less than the 15 minute relist interval
tenMinutesAgo := now.Time.Add(-10 * time.Minute)

if !cfg.NameInReason(importError.Reason, is.Name) ||
importError.LastUpdateTime.Time.Before(tenMinutesAgo){
retryIfNeeded {
h.imagestreamRetry[is.Name] = now
if !cfg.NameInReason(importError.Reason, is.Name) {
importError.Reason = importError.Reason + is.Name + " "
importError.Message = importError.Message + "<imagestream/" + is.Name + ">" + message + "<imagestream/" + is.Name + ">"
Expand Down Expand Up @@ -420,7 +439,6 @@ func (h *Handler) processImportStatus(is *imagev1.ImageStream, cfg *v1.Config) (
logrus.Printf("initiated an imagestreamimport retry for imagestream/tag %s/%s", is.Name, statusTag.Tag)

}
break
}
}
}
Expand Down

0 comments on commit 9ddd413

Please sign in to comment.