Skip to content

Commit

Permalink
status: Generation should be set by what the sync worker is doing
Browse files Browse the repository at this point in the history
The primary mechanism where by a reconciling controller communicates
to a client is its status, and the client has to wait until the
controller observes its write before assuming the controller is ready.

When writing a client that is triggering an upgrade, we need to:

1. Set desiredUpdate and read resulting generation
2. Wait until status.generation == generation from 1
3. Wait until the update that is marked partial in status history is complete

This fixes the sync worker to be the one who propagates generation, so
that a client doesn't see 2 above when syncing a previous generation.
  • Loading branch information
smarterclayton committed Jan 25, 2019
1 parent cab1434 commit 8ea7884
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 26 deletions.
6 changes: 3 additions & 3 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func New(
kubeClient: kubeClient,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: namespace}),

queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "clusterversion"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "clusterversion"),
availableUpdatesQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "availableupdates"),
}

Expand Down Expand Up @@ -337,7 +337,7 @@ func (optr *Operator) sync(key string) error {
// inform the config sync loop about our desired state
reconciling := resourcemerge.IsOperatorStatusConditionTrue(config.Status.Conditions, configv1.OperatorAvailable) &&
resourcemerge.IsOperatorStatusConditionFalse(config.Status.Conditions, configv1.OperatorProgressing)
status := optr.configSync.Update(desired, config.Spec.Overrides, reconciling)
status := optr.configSync.Update(config.Generation, desired, config.Spec.Overrides, reconciling)

// write cluster version status
return optr.syncStatus(original, config, status, errs)
Expand Down Expand Up @@ -448,7 +448,7 @@ func versionString(update configv1.Update) string {
func (optr *Operator) currentVersion() configv1.Update {
return configv1.Update{
Version: optr.releaseVersion,
Image: optr.releaseImage,
Image: optr.releaseImage,
}
}

Expand Down
17 changes: 13 additions & 4 deletions pkg/cvo/cvo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,7 @@ func TestOperator_sync(t *testing.T) {
{
name: "version is live and was recently synced, do nothing",
syncStatus: &SyncWorkerStatus{
Generation: 2,
Reconciling: true,
Completed: 1,
VersionHash: "xyz",
Expand Down Expand Up @@ -1114,6 +1115,7 @@ func TestOperator_sync(t *testing.T) {
{
name: "new available updates, version is live and was recently synced, sync",
syncStatus: &SyncWorkerStatus{
Generation: 2,
Reconciling: true,
Completed: 1,
Actual: configv1.Update{Image: "image/image:v4.0.1", Version: "0.0.1-abc"},
Expand Down Expand Up @@ -1195,6 +1197,7 @@ func TestOperator_sync(t *testing.T) {
{
name: "new available updates for the default upstream URL, client has no upstream",
syncStatus: &SyncWorkerStatus{
Generation: 2,
Reconciling: true,
Completed: 1,
Actual: configv1.Update{Image: "image/image:v4.0.1", Version: "0.0.1-abc"},
Expand Down Expand Up @@ -1277,6 +1280,7 @@ func TestOperator_sync(t *testing.T) {
{
name: "new available updates but for a different channel",
syncStatus: &SyncWorkerStatus{
Generation: 2,
Reconciling: true,
Completed: 1,
Actual: configv1.Update{Image: "image/image:v4.0.1", Version: "0.0.1-abc"},
Expand Down Expand Up @@ -1354,6 +1358,7 @@ func TestOperator_sync(t *testing.T) {
{
name: "user requested a version, sync loop hasn't started",
syncStatus: &SyncWorkerStatus{
Generation: 2,
Reconciling: true,
Completed: 1,
Actual: configv1.Update{Image: "image/image:v4.0.1", Version: "4.0.1"},
Expand All @@ -1365,7 +1370,7 @@ func TestOperator_sync(t *testing.T) {
client: fakeClientsetWithUpdates(&configv1.ClusterVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "default",
Generation: 2,
Generation: 3,
},
Spec: configv1.ClusterVersionSpec{
ClusterID: configv1.ClusterID(id),
Expand All @@ -1388,7 +1393,7 @@ func TestOperator_sync(t *testing.T) {
expectUpdateStatus(t, act[1], "clusterversions", "", &configv1.ClusterVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "default",
Generation: 2,
Generation: 3,
},
Spec: configv1.ClusterVersionSpec{
ClusterID: configv1.ClusterID(id),
Expand All @@ -1415,6 +1420,7 @@ func TestOperator_sync(t *testing.T) {
{
name: "user requested a version that isn't in the updates or history",
syncStatus: &SyncWorkerStatus{
Generation: 2,
Reconciling: true,
Completed: 1,
Actual: configv1.Update{Image: "image/image:v4.0.1", Version: "4.0.1"},
Expand All @@ -1426,7 +1432,7 @@ func TestOperator_sync(t *testing.T) {
client: fakeClientsetWithUpdates(&configv1.ClusterVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "default",
Generation: 2,
Generation: 3,
},
Spec: configv1.ClusterVersionSpec{
ClusterID: configv1.ClusterID(id),
Expand All @@ -1453,7 +1459,7 @@ func TestOperator_sync(t *testing.T) {
expectUpdateStatus(t, act[1], "clusterversions", "", &configv1.ClusterVersion{
ObjectMeta: metav1.ObjectMeta{
Name: "default",
Generation: 2,
Generation: 3,
},
Spec: configv1.ClusterVersionSpec{
ClusterID: configv1.ClusterID(id),
Expand Down Expand Up @@ -1487,6 +1493,7 @@ func TestOperator_sync(t *testing.T) {
{
name: "user requested a version has duplicates",
syncStatus: &SyncWorkerStatus{
Generation: 2,
Reconciling: true,
Completed: 1,
Actual: configv1.Update{Image: "image/image:v4.0.1", Version: "4.0.1"},
Expand Down Expand Up @@ -1561,6 +1568,7 @@ func TestOperator_sync(t *testing.T) {
{
name: "image hash matches content hash, act as reconcile, no need to apply",
syncStatus: &SyncWorkerStatus{
Generation: 2,
Reconciling: true,
Completed: 1,
VersionHash: "y_Kc5IQiIyU=",
Expand Down Expand Up @@ -1620,6 +1628,7 @@ func TestOperator_sync(t *testing.T) {
{
name: "image hash does not match content hash, act as reconcile, no need to apply",
syncStatus: &SyncWorkerStatus{
Generation: 2,
Reconciling: true,
Completed: 1,
VersionHash: "y_Kc5IQiIyU=",
Expand Down
2 changes: 1 addition & 1 deletion pkg/cvo/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (optr *Operator) syncStatus(original, config *configv1.ClusterVersion, stat
original = config.DeepCopy()
}

config.Status.Generation = config.Generation
config.Status.Generation = status.Generation
if len(status.VersionHash) > 0 {
config.Status.VersionHash = status.VersionHash
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cvo/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus {

func (r *fakeSyncRecorder) Start(stopCh <-chan struct{}) {}

func (r *fakeSyncRecorder) Update(desired configv1.Update, overrides []configv1.ComponentOverride, reconciling bool) *SyncWorkerStatus {
func (r *fakeSyncRecorder) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, reconciling bool) *SyncWorkerStatus {
r.Updates = append(r.Updates, desired)
return r.Returns
}
Expand Down
32 changes: 21 additions & 11 deletions pkg/cvo/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
// ConfigSyncWorker abstracts how the image is synchronized to the server. Introduced for testing.
type ConfigSyncWorker interface {
Start(stopCh <-chan struct{})
Update(desired configv1.Update, overrides []configv1.ComponentOverride, reconciling bool) *SyncWorkerStatus
Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, reconciling bool) *SyncWorkerStatus
StatusCh() <-chan SyncWorkerStatus
}

Expand All @@ -42,6 +42,7 @@ type StatusReporter interface {

// SyncWork represents the work that should be done in a sync iteration.
type SyncWork struct {
Generation int64
Desired configv1.Update
Overrides []configv1.ComponentOverride
Reconciling bool
Expand All @@ -55,6 +56,8 @@ func (w SyncWork) Empty() bool {

// SyncWorkerStatus is the status of the sync worker at a given time.
type SyncWorkerStatus struct {
Generation int64

Step string
Failure error

Expand Down Expand Up @@ -145,13 +148,14 @@ func (w *SyncWorker) StatusCh() <-chan SyncWorkerStatus {
// the initial state or whatever the last recorded status was.
// TODO: in the future it may be desirable for changes that alter desired to wait briefly before returning,
// giving the sync loop the opportunity to observe our change and begin working towards it.
func (w *SyncWorker) Update(desired configv1.Update, overrides []configv1.ComponentOverride, reconciling bool) *SyncWorkerStatus {
func (w *SyncWorker) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, reconciling bool) *SyncWorkerStatus {
w.lock.Lock()
defer w.lock.Unlock()

work := &SyncWork{
Desired: desired,
Overrides: overrides,
Generation: generation,
Desired: desired,
Overrides: overrides,
}

if work.Empty() || equalSyncWork(w.work, work) {
Expand All @@ -164,7 +168,11 @@ func (w *SyncWorker) Update(desired configv1.Update, overrides []configv1.Compon
if reconciling {
work.Reconciling = true
}
w.status = SyncWorkerStatus{Reconciling: work.Reconciling, Actual: work.Desired}
w.status = SyncWorkerStatus{
Generation: generation,
Reconciling: work.Reconciling,
Actual: work.Desired,
}
}

// notify the sync loop that we changed config
Expand Down Expand Up @@ -310,6 +318,8 @@ func (w *SyncWorker) calculateNext(work *SyncWork) bool {
work.Overrides = w.work.Overrides
}

work.Generation = w.work.Generation

return changed
}

Expand Down Expand Up @@ -368,7 +378,7 @@ func (w *SyncWorker) Status() *SyncWorkerStatus {
// the update could not be completely applied. The status is updated as we progress.
// Cancelling the context will abort the execution of the sync.
func (w *SyncWorker) syncOnce(ctx context.Context, work *SyncWork, reporter StatusReporter) error {
glog.V(4).Infof("Running sync %s", versionString(work.Desired))
glog.V(4).Infof("Running sync %s on generation %d", versionString(work.Desired), work.Generation)
update := work.Desired

// cache the image until the release image changes
Expand Down Expand Up @@ -420,14 +430,14 @@ func (w *SyncWorker) apply(ctx context.Context, image *updatePayload, work *Sync
setAppliedAndPending(version, total, done)
fraction := float32(i) / float32(len(tasks))

reporter.Report(SyncWorkerStatus{Fraction: fraction, Step: "ApplyResources", Reconciling: work.Reconciling, VersionHash: image.ManifestHash, Actual: update})
reporter.Report(SyncWorkerStatus{Generation: work.Generation, Fraction: fraction, Step: "ApplyResources", Reconciling: work.Reconciling, VersionHash: image.ManifestHash, Actual: update})

glog.V(4).Infof("Running sync for %s", task)
glog.V(5).Infof("Manifest: %s", string(task.manifest.Raw))

if contextIsCancelled(ctx) {
err := fmt.Errorf("update was cancelled at %d/%d", i, len(tasks))
reporter.Report(SyncWorkerStatus{Failure: err, Fraction: fraction, Step: "ApplyResources", Reconciling: work.Reconciling, VersionHash: image.ManifestHash, Actual: update})
reporter.Report(SyncWorkerStatus{Generation: work.Generation, Failure: err, Fraction: fraction, Step: "ApplyResources", Reconciling: work.Reconciling, VersionHash: image.ManifestHash, Actual: update})
return err
}

Expand All @@ -438,7 +448,7 @@ func (w *SyncWorker) apply(ctx context.Context, image *updatePayload, work *Sync
}

if err := task.Run(version, w.builder); err != nil {
reporter.Report(SyncWorkerStatus{Failure: err, Fraction: fraction, Step: "ApplyResources", Reconciling: work.Reconciling, VersionHash: image.ManifestHash, Actual: update})
reporter.Report(SyncWorkerStatus{Generation: work.Generation, Failure: err, Fraction: fraction, Step: "ApplyResources", Reconciling: work.Reconciling, VersionHash: image.ManifestHash, Actual: update})
cause := errors.Cause(err)
if task.requeued == 0 && shouldRequeueOnErr(cause, task.manifest) {
task.requeued++
Expand All @@ -453,7 +463,7 @@ func (w *SyncWorker) apply(ctx context.Context, image *updatePayload, work *Sync

setAppliedAndPending(version, total, done)
work.Completed++
reporter.Report(SyncWorkerStatus{Fraction: 1, Completed: work.Completed, Reconciling: true, VersionHash: image.ManifestHash, Actual: update})
reporter.Report(SyncWorkerStatus{Generation: work.Generation, Fraction: 1, Completed: work.Completed, Reconciling: true, VersionHash: image.ManifestHash, Actual: update})

return nil
}
Expand Down Expand Up @@ -483,7 +493,7 @@ func runThrottledStatusNotifier(stopCh <-chan struct{}, interval time.Duration,
return
case next := <-ch:
// only throttle if we aren't on an edge
if next.Actual == last.Actual && next.Reconciling == last.Reconciling && (next.Failure != nil) == (last.Failure != nil) {
if next.Generation == last.Generation && next.Actual == last.Actual && next.Reconciling == last.Reconciling && (next.Failure != nil) == (last.Failure != nil) {
if err := throttle.Wait(ctx); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to throttle status notification: %v", err))
}
Expand Down
Loading

0 comments on commit 8ea7884

Please sign in to comment.