Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Federation] Add a worker queue to the generic sync controller. #44987

Merged
merged 1 commit into from
May 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions federation/pkg/federation-controller/sync/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
],
)

Expand Down
104 changes: 70 additions & 34 deletions federation/pkg/federation-controller/sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientv1 "k8s.io/client-go/pkg/api/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
"k8s.io/kubernetes/federation/pkg/federatedtypes"
Expand Down Expand Up @@ -70,6 +72,9 @@ type FederationSyncController struct {
// Informer controller for resources that should be federated.
controller cache.Controller

// Work queue allowing parallel processing of resources
workQueue workqueue.Interface

// Backoff manager
backoff *flowcontrol.Backoff

Expand Down Expand Up @@ -110,6 +115,7 @@ func newFederationSyncController(client federationclientset.Interface, adapter f
clusterAvailableDelay: time.Second * 20,
smallDelay: time.Second * 3,
updateTimeout: time.Second * 30,
workQueue: workqueue.New(),
backoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
eventRecorder: recorder,
adapter: adapter,
Expand Down Expand Up @@ -215,17 +221,55 @@ func (s *FederationSyncController) Run(stopChan <-chan struct{}) {
go func() {
<-stopChan
s.informer.Stop()
s.workQueue.ShutDown()
}()
s.deliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
namespacedName := item.Value.(*types.NamespacedName)
s.reconcile(*namespacedName)
s.workQueue.Add(item)
})
s.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) {
s.reconcileOnClusterChange()
})

// TODO: Allow multiple workers.
go wait.Until(s.worker, time.Second, stopChan)

util.StartBackoffGC(s.backoff, stopChan)
}

type reconciliationStatus int

const (
statusAllOK reconciliationStatus = iota
statusNeedsRecheck
statusError
statusNotSynced
)

func (s *FederationSyncController) worker() {
for {
obj, quit := s.workQueue.Get()
if quit {
return
}

item := obj.(*util.DelayingDelivererItem)
namespacedName := item.Value.(*types.NamespacedName)
status := s.reconcile(*namespacedName)
s.workQueue.Done(item)

switch status {
case statusAllOK:
break
case statusError:
s.deliver(*namespacedName, 0, true)
case statusNeedsRecheck:
s.deliver(*namespacedName, s.reviewDelay, false)
case statusNotSynced:
s.deliver(*namespacedName, s.reviewDelay, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't notice until I went to rebase on this, but shouldn't this be s.ClusterAvailableDelay?

}
}
}

func (s *FederationSyncController) deliverObj(obj pkgruntime.Object, delay time.Duration, failed bool) {
namespacedName := s.adapter.NamespacedName(obj)
s.deliver(namespacedName, delay, failed)
Expand Down Expand Up @@ -272,79 +316,72 @@ func (s *FederationSyncController) reconcileOnClusterChange() {
}
}

func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName) {
func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName) reconciliationStatus {
if !s.isSynced() {
s.deliver(namespacedName, s.clusterAvailableDelay, false)
return
return statusNotSynced
}

key := namespacedName.String()
kind := s.adapter.Kind()
cachedObj, exist, err := s.store.GetByKey(key)
if err != nil {
glog.Errorf("Failed to query main %s store for %v: %v", kind, key, err)
s.deliver(namespacedName, 0, true)
return
glog.Errorf("failed to query main %s store for %v: %v", kind, key, err)
return statusError
}

if !exist {
// Not federated, ignoring.
return
return statusAllOK
}

// Create a copy before modifying the resource to prevent racing
// with other readers.
copiedObj, err := api.Scheme.DeepCopy(cachedObj)
if err != nil {
glog.Errorf("Error in retrieving %s from store: %v", kind, err)
s.deliver(namespacedName, 0, true)
return
glog.Errorf("error in retrieving %s from store: %v", kind, err)
return statusError
}
if !s.adapter.IsExpectedType(copiedObj) {
glog.Errorf("Object is not the expected type: %v", copiedObj)
s.deliver(namespacedName, 0, true)
return
glog.Errorf("object is not the expected type: %v", copiedObj)
return statusError
}
obj := copiedObj.(pkgruntime.Object)
meta := s.adapter.ObjectMeta(obj)

if meta.DeletionTimestamp != nil {
if err := s.delete(obj, namespacedName); err != nil {
glog.Errorf("Failed to delete %s %s: %v", kind, namespacedName, err)
s.eventRecorder.Eventf(obj, api.EventTypeWarning, "DeleteFailed",
"%s delete failed: %v", strings.ToTitle(kind), err)
s.deliver(namespacedName, 0, true)
glog.Errorf("failed to delete %s %s: %v", kind, namespacedName, err)
return statusError
}
return
return statusAllOK
}

glog.V(3).Infof("Ensuring delete object from underlying clusters finalizer for %s: %s",
kind, namespacedName)
// Add the required finalizers before creating the resource in underlying clusters.
obj, err = s.deletionHelper.EnsureFinalizers(obj)
if err != nil {
glog.Errorf("Failed to ensure delete object from underlying clusters finalizer in %s %s: %v",
glog.Errorf("failed to ensure delete object from underlying clusters finalizer in %s %s: %v",
kind, namespacedName, err)
s.deliver(namespacedName, 0, false)
return
return statusError
}

glog.V(3).Infof("Syncing %s %s in underlying clusters", kind, namespacedName)

clusters, err := s.informer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get cluster list: %v", err)
s.deliver(namespacedName, s.clusterAvailableDelay, false)
return
glog.Errorf("failed to get cluster list: %v", err)
return statusNotSynced
}

operations := make([]util.FederatedOperation, 0)
for _, cluster := range clusters {
clusterObj, found, err := s.informer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
glog.Errorf("Failed to get %s from %s: %v", key, cluster.Name, err)
s.deliver(namespacedName, 0, true)
return
glog.Errorf("failed to get %s from %s: %v", key, cluster.Name, err)
return statusError
}

// The data should not be modified.
Expand Down Expand Up @@ -374,18 +411,17 @@ func (s *FederationSyncController) reconcile(namespacedName types.NamespacedName

if len(operations) == 0 {
// Everything is in order
return
return statusAllOK
}
err = s.updater.Update(operations, s.updateTimeout)

err = s.updater.Update(operations, s.updateTimeout)
if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", key, err)
s.deliver(namespacedName, 0, true)
return
glog.Errorf("failed to execute updates for %s: %v", key, err)
return statusError
}

// Evertyhing is in order but lets be double sure
s.deliver(namespacedName, s.reviewDelay, false)
// Evertyhing is in order but let's be double sure
return statusNeedsRecheck
}

// delete deletes the given resource or returns error if the deletion was not complete.
Expand Down