Skip to content

Commit

Permalink
emit events for each new payload
Browse files Browse the repository at this point in the history
  • Loading branch information
deads2k committed Jul 22, 2020
1 parent 28e4400 commit 7d3621f
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 28 deletions.
3 changes: 2 additions & 1 deletion pkg/cvo/cvo.go
Expand Up @@ -195,7 +195,7 @@ func New(
kubeClient: kubeClient,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: namespace}),

queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "clusterversion"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "clusterversion"),
availableUpdatesQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "availableupdates"),
upgradeableQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "upgradeable"),

Expand Down Expand Up @@ -268,6 +268,7 @@ func (optr *Operator) InitializeFromPayload(restConfig *rest.Config, burstRestCo
Steps: 3,
},
optr.exclude,
optr.eventRecorder,
)

return nil
Expand Down
20 changes: 11 additions & 9 deletions pkg/cvo/cvo_scenarios_test.go
Expand Up @@ -11,16 +11,16 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/google/uuid"

"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
dynamicfake "k8s.io/client-go/dynamic/fake"
clientgotesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

configv1 "github.com/openshift/api/config/v1"
Expand Down Expand Up @@ -70,13 +70,14 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak
})

o := &Operator{
namespace: "test",
name: "version",
namespace: "test",
name: "version",
enableDefaultClusterVersion: true,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cvo-loop-test"),
client: client,
cvLister: &clientCVLister{client: client},
exclude: "exclude-test",
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cvo-loop-test"),
client: client,
cvLister: &clientCVLister{client: client},
exclude: "exclude-test",
eventRecorder: record.NewFakeRecorder(100),
}

dynamicScheme := runtime.NewScheme()
Expand All @@ -92,6 +93,7 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]runtime.Object, *fak
Steps: 1,
},
"exclude-test",
record.NewFakeRecorder(100),
)
o.configSync = worker

Expand Down
10 changes: 7 additions & 3 deletions pkg/cvo/cvo_test.go
Expand Up @@ -30,6 +30,7 @@ import (
kfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
ktesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"

Expand Down Expand Up @@ -1593,9 +1594,9 @@ func TestOperator_sync(t *testing.T) {
Actual: configv1.Update{Image: "image/image:v4.0.1", Version: "0.0.1-abc"},
},
optr: Operator{
releaseImage: "image/image:v4.0.1",
namespace: "test",
name: "default",
releaseImage: "image/image:v4.0.1",
namespace: "test",
name: "default",
defaultUpstreamServer: "http://localhost:8080/graph",
availableUpdates: &availableUpdates{
Upstream: "",
Expand Down Expand Up @@ -2259,6 +2260,7 @@ func TestOperator_sync(t *testing.T) {
}
optr.configSync = &fakeSyncRecorder{Returns: expectStatus}
}
optr.eventRecorder = record.NewFakeRecorder(100)

err := optr.sync(optr.queueKey())
if err != nil && tt.wantErr == nil {
Expand Down Expand Up @@ -2626,6 +2628,7 @@ func TestOperator_availableUpdatesSync(t *testing.T) {
optr.proxyLister = &clientProxyLister{client: optr.client}
optr.coLister = &clientCOLister{client: optr.client}
optr.cvLister = &clientCVLister{client: optr.client}
optr.eventRecorder = record.NewFakeRecorder(100)

if tt.handler != nil {
s := httptest.NewServer(http.HandlerFunc(tt.handler))
Expand Down Expand Up @@ -3129,6 +3132,7 @@ func TestOperator_upgradeableSync(t *testing.T) {
optr.coLister = &clientCOLister{client: optr.client}
optr.cvLister = &clientCVLister{client: optr.client}
optr.upgradeableChecks = optr.defaultUpgradeableChecks()
optr.eventRecorder = record.NewFakeRecorder(100)

err := optr.upgradeableSync(optr.queueKey())
if err != nil && tt.wantErr == nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/cvo/metrics_test.go
Expand Up @@ -12,6 +12,7 @@ import (
dto "github.com/prometheus/client_model/go"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"

configv1 "github.com/openshift/api/config/v1"
)
Expand Down Expand Up @@ -512,6 +513,7 @@ func Test_operatorMetrics_Collect(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.optr.eventRecorder = record.NewFakeRecorder(100)
if tt.optr.cvLister == nil {
tt.optr.cvLister = &cvLister{}
}
Expand Down Expand Up @@ -588,7 +590,8 @@ func Test_operatorMetrics_CollectTransitions(t *testing.T) {
},
},
optr: &Operator{
coLister: &coLister{},
coLister: &coLister{},
eventRecorder: record.NewFakeRecorder(100),
},
wants: func(t *testing.T, metrics []prometheus.Metric) {
if len(metrics) != 5 {
Expand Down
2 changes: 2 additions & 0 deletions pkg/cvo/status_test.go
Expand Up @@ -7,6 +7,7 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/client-go/tools/record"

configv1 "github.com/openshift/api/config/v1"
"github.com/openshift/client-go/config/clientset/versioned/fake"
Expand Down Expand Up @@ -184,6 +185,7 @@ func TestOperator_syncFailingStatus(t *testing.T) {
},
},
),
eventRecorder: record.NewFakeRecorder(100),
},
wantErr: func(t *testing.T, err error) {
if err == nil || err.Error() != "bad" {
Expand Down
6 changes: 4 additions & 2 deletions pkg/cvo/sync_test.go
Expand Up @@ -9,6 +9,8 @@ import (
"sync"
"testing"

"k8s.io/client-go/tools/record"

"github.com/davecgh/go-spew/spew"

"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -124,7 +126,7 @@ func Test_SyncWorker_apply(t *testing.T) {
testMapper.RegisterGVK(schema.GroupVersionKind{"test.cvo.io", "v1", "TestB"}, newTestBuilder(r, test.reactors))
testMapper.AddToMap(resourcebuilder.Mapper)

worker := &SyncWorker{}
worker := &SyncWorker{eventRecorder: record.NewFakeRecorder(100)}
worker.builder = NewResourceBuilder(nil, nil, nil)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -310,7 +312,7 @@ func Test_SyncWorker_apply_generic(t *testing.T) {
dynamicClient := dynamicfake.NewSimpleDynamicClient(dynamicScheme)

up := &payload.Update{ReleaseImage: "test", ReleaseVersion: "v0.0.0", Manifests: manifests}
worker := &SyncWorker{}
worker := &SyncWorker{eventRecorder: record.NewFakeRecorder(100)}
worker.backoff.Steps = 1
worker.builder = &testResourceBuilder{
client: dynamicClient,
Expand Down
27 changes: 21 additions & 6 deletions pkg/cvo/sync_worker.go
Expand Up @@ -10,6 +10,10 @@ import (
"sync"
"time"

corev1 "k8s.io/api/core/v1"

"k8s.io/client-go/tools/record"

"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
"k8s.io/klog"
Expand Down Expand Up @@ -130,6 +134,7 @@ type SyncWorker struct {
retriever PayloadRetriever
builder payload.ResourceBuilder
preconditions precondition.List
eventRecorder record.EventRecorder

// minimumReconcileInterval is the minimum time between reconcile attempts, and is
// used to define the maximum backoff interval when syncOnce() returns an error.
Expand All @@ -156,11 +161,12 @@ type SyncWorker struct {

// NewSyncWorker initializes a ConfigSyncWorker that will retrieve payloads to disk, apply them via builder
// to a server, and obey limits about how often to reconcile or retry on errors.
func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder, reconcileInterval time.Duration, backoff wait.Backoff, exclude string) *SyncWorker {
func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder, reconcileInterval time.Duration, backoff wait.Backoff, exclude string, eventRecorder record.EventRecorder) *SyncWorker {
return &SyncWorker{
retriever: retriever,
builder: builder,
backoff: backoff,
retriever: retriever,
builder: builder,
backoff: backoff,
eventRecorder: eventRecorder,

minimumReconcileInterval: reconcileInterval,

Expand All @@ -177,8 +183,8 @@ func NewSyncWorker(retriever PayloadRetriever, builder payload.ResourceBuilder,
// NewSyncWorkerWithPreconditions initializes a ConfigSyncWorker that will retrieve payloads to disk, apply them via builder
// to a server, and obey limits about how often to reconcile or retry on errors.
// It allows providing preconditions for loading payload.
func NewSyncWorkerWithPreconditions(retriever PayloadRetriever, builder payload.ResourceBuilder, preconditions precondition.List, reconcileInterval time.Duration, backoff wait.Backoff, exclude string) *SyncWorker {
worker := NewSyncWorker(retriever, builder, reconcileInterval, backoff, exclude)
func NewSyncWorkerWithPreconditions(retriever PayloadRetriever, builder payload.ResourceBuilder, preconditions precondition.List, reconcileInterval time.Duration, backoff wait.Backoff, exclude string, eventRecorder record.EventRecorder) *SyncWorker {
worker := NewSyncWorker(retriever, builder, reconcileInterval, backoff, exclude, eventRecorder)
worker.preconditions = preconditions
return worker
}
Expand Down Expand Up @@ -474,6 +480,8 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
validPayload := w.payload
if validPayload == nil || !equalUpdate(configv1.Update{Image: validPayload.ReleaseImage}, configv1.Update{Image: update.Image}) {
klog.V(4).Infof("Loading payload")
cvoObjectRef := &corev1.ObjectReference{APIVersion: "config.openshift.io/v1", Kind: "ClusterVersion", Name: "version"}
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "RetrievePayload", "retrieving payload version=%q image=%q", update.Version, update.Image)
reporter.Report(SyncWorkerStatus{
Generation: work.Generation,
Step: "RetrievePayload",
Expand All @@ -483,6 +491,7 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
})
info, err := w.retriever.RetrievePayload(ctx, update)
if err != nil {
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "RetrievePayloadFailed", "retrieving payload failed version=%q image=%q failure=%v", update.Version, update.Image, err)
reporter.Report(SyncWorkerStatus{
Generation: work.Generation,
Failure: err,
Expand All @@ -494,8 +503,10 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
return err
}

w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "VerifyPayload", "verifying payload version=%q image=%q", update.Version, update.Image)
payloadUpdate, err := payload.LoadUpdate(info.Directory, update.Image, w.exclude)
if err != nil {
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "VerifyPayloadFailed", "verifying payload failed version=%q image=%q failure=%v", update.Version, update.Image, err)
reporter.Report(SyncWorkerStatus{
Generation: work.Generation,
Failure: err,
Expand Down Expand Up @@ -527,7 +538,9 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
if err := precondition.Summarize(w.preconditions.RunAll(ctx, precondition.ReleaseContext{DesiredVersion: payloadUpdate.ReleaseVersion})); err != nil {
if update.Force {
klog.V(4).Infof("Forcing past precondition failures: %s", err)
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "PreconditionsForced", "preconditions forced for payload loaded version=%q image=%q failures=%v", update.Version, update.Image, err)
} else {
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeWarning, "PreconditionsFailed", "preconditions failed for payload loaded version=%q image=%q failures=%v", update.Version, update.Image, err)
reporter.Report(SyncWorkerStatus{
Generation: work.Generation,
Failure: err,
Expand All @@ -540,9 +553,11 @@ func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, maxWorkers in
return err
}
}
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "PreconditionsPassed", "preconditions passed for payload loaded version=%q image=%q", update.Version, update.Image)
}

w.payload = payloadUpdate
w.eventRecorder.Eventf(cvoObjectRef, corev1.EventTypeNormal, "PayloadLoaded", "payload loaded version=%q image=%q", update.Version, update.Image)
klog.V(4).Infof("Payload loaded from %s with hash %s", payloadUpdate.ReleaseImage, payloadUpdate.ManifestHash)
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/cvo/sync_worker_test.go
Expand Up @@ -5,6 +5,8 @@ import (
"testing"
"time"

"k8s.io/client-go/tools/record"

configv1 "github.com/openshift/api/config/v1"
)

Expand Down Expand Up @@ -77,7 +79,7 @@ func Test_statusWrapper_ReportProgress(t *testing.T) {
w := &statusWrapper{
previousStatus: &tt.previous,
}
w.w = &SyncWorker{report: make(chan SyncWorkerStatus, 1)}
w.w = &SyncWorker{report: make(chan SyncWorkerStatus, 1), eventRecorder: record.NewFakeRecorder(100)}
w.Report(tt.next)
close(w.w.report)
if tt.want {
Expand Down Expand Up @@ -130,7 +132,7 @@ func Test_statusWrapper_ReportGeneration(t *testing.T) {
w := &statusWrapper{
previousStatus: &tt.previous,
}
w.w = &SyncWorker{report: make(chan SyncWorkerStatus, 1)}
w.w = &SyncWorker{report: make(chan SyncWorkerStatus, 1), eventRecorder: record.NewFakeRecorder(100)}
w.Report(tt.next)
close(w.w.report)

Expand Down
9 changes: 5 additions & 4 deletions pkg/start/start_integration_test.go
Expand Up @@ -28,6 +28,7 @@ import (
randutil "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog"

configv1 "github.com/openshift/api/config/v1"
Expand Down Expand Up @@ -240,7 +241,7 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) {
options.PayloadOverride = filepath.Join(dir, "ignored")
controllers := options.NewControllerContext(cb)

worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "")
worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", record.NewFakeRecorder(100))
controllers.CVO.SetSyncWorkerForTesting(worker)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -392,7 +393,7 @@ func TestIntegrationCVO_initializeAndHandleError(t *testing.T) {
options.ResyncInterval = 3 * time.Second
controllers := options.NewControllerContext(cb)

worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Duration: time.Second, Factor: 1.2}, "")
worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Duration: time.Second, Factor: 1.2}, "", record.NewFakeRecorder(100))
controllers.CVO.SetSyncWorkerForTesting(worker)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -497,7 +498,7 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) {
options.NodeName = "test-node"
controllers := options.NewControllerContext(cb)

worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "")
worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", record.NewFakeRecorder(100))
controllers.CVO.SetSyncWorkerForTesting(worker)

lock, err := createResourceLock(cb, ns, ns)
Expand Down Expand Up @@ -669,7 +670,7 @@ metadata:
t.Fatal(err)
}

worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "")
worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}, "", record.NewFakeRecorder(100))
controllers.CVO.SetSyncWorkerForTesting(worker)

ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit 7d3621f

Please sign in to comment.