Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean-up and fixes in federated replica set #31744

Merged
merged 1 commit into from
Aug 31, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var (
clusterAvailableDelay = 20 * time.Second
clusterUnavailableDelay = 60 * time.Second
allReplicaSetReviewDealy = 2 * time.Minute
updateTimeout = 30 * time.Second
)

func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.FederatedReplicaSetPreferences, error) {
Expand Down Expand Up @@ -85,6 +86,8 @@ type ReplicaSetController struct {
replicasetDeliverer *fedutil.DelayingDeliverer
clusterDeliverer *fedutil.DelayingDeliverer
replicasetWorkQueue workqueue.Interface
// For updating members of federation.
fedUpdater fedutil.FederatedUpdater

replicaSetBackoff *flowcontrol.Backoff

Expand Down Expand Up @@ -170,6 +173,23 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
),
)

frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer,
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.ReplicaSet)
_, err := client.Extensions().ReplicaSets(rs.Namespace).Create(rs)
return err
},
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.ReplicaSet)
_, err := client.Extensions().ReplicaSets(rs.Namespace).Update(rs)
return err
},
func(client kubeclientset.Interface, obj runtime.Object) error {
rs := obj.(*extensionsv1.ReplicaSet)
err := client.Extensions().ReplicaSets(rs.Namespace).Delete(rs.Name, &api.DeleteOptions{})
return err
})

return frsc
}

Expand Down Expand Up @@ -293,11 +313,25 @@ func (frsc *ReplicaSetController) worker() {
return
}
key := item.(string)
err := frsc.reconcileReplicaSet(key)
status, err := frsc.reconcileReplicaSet(key)
frsc.replicasetWorkQueue.Done(item)
if err != nil {
glog.Errorf("Error syncing cluster controller: %v", err)
frsc.deliverReplicaSetByKey(key, 0, true)
} else {
switch status {
case statusAllOk:
break
case statusError:
frsc.deliverReplicaSetByKey(key, 0, true)
case statusNeedRecheck:
frsc.deliverReplicaSetByKey(key, replicaSetReviewDelay, false)
case statusNotSynced:
frsc.deliverReplicaSetByKey(key, clusterAvailableDelay, false)
default:
glog.Errorf("Unhandled reconciliation status: %s", status)
frsc.deliverReplicaSetByKey(key, replicaSetReviewDelay, false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually - when default can happen? It would mean a different status, which suggests an error. So probably you should print error here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a safety mechanism for someone missing to add appropriate handling here. Will add an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}
}
}
Expand Down Expand Up @@ -352,10 +386,18 @@ func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, cluster
return result
}

func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {
type reconciliationStatus string

const (
statusAllOk = reconciliationStatus("ALL_OK")
statusNeedRecheck = reconciliationStatus("RECHECK")
statusError = reconciliationStatus("ERROR")
statusNotSynced = reconciliationStatus("NOSYNC")
)

func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliationStatus, error) {
if !frsc.isSynced() {
frsc.deliverReplicaSetByKey(key, clusterAvailableDelay, false)
return nil
return statusNotSynced, nil
}

glog.V(4).Infof("Start reconcile replicaset %q", key)
Expand All @@ -364,31 +406,31 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {

obj, exists, err := frsc.replicaSetStore.Store.GetByKey(key)
if err != nil {
return err
return statusError, err
}
if !exists {
// don't delete local replicasets for now
return nil
// don't delete local replicasets for now. Do not reconcile it anymore.
return statusAllOk, nil
}
frs := obj.(*extensionsv1.ReplicaSet)

clusters, err := frsc.fedReplicaSetInformer.GetReadyClusters()
if err != nil {
return err
return statusError, err
}

// collect current status and do schedule
allPods, err := frsc.fedPodInformer.GetTargetStore().List()
if err != nil {
return err
return statusError, err
}
podStatus, err := AnalysePods(frs, allPods, time.Now())
current := make(map[string]int64)
estimatedCapacity := make(map[string]int64)
for _, cluster := range clusters {
lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil {
return err
return statusError, err
}
if exists {
lrs := lrsObj.(*extensionsv1.ReplicaSet)
Expand All @@ -405,61 +447,65 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) error {
glog.V(4).Infof("Start syncing local replicaset %s: %v", key, scheduleResult)

fedStatus := extensionsv1.ReplicaSetStatus{ObservedGeneration: frs.Generation}
operations := make([]fedutil.FederatedOperation, 0)
for clusterName, replicas := range scheduleResult {
// TODO: updater or parallelizer doesnn't help as results are needed for updating fed rs status
clusterClient, err := frsc.fedReplicaSetInformer.GetClientsetForCluster(clusterName)
if err != nil {
return err
}

lrsObj, exists, err := frsc.fedReplicaSetInformer.GetTargetStore().GetByKey(clusterName, key)
if err != nil {
return err
} else if !exists {
return statusError, err
}

lrs := &extensionsv1.ReplicaSet{
ObjectMeta: fedutil.CopyObjectMeta(frs.ObjectMeta),
Spec: frs.Spec,
}
specReplicas := int32(replicas)
lrs.Spec.Replicas = &specReplicas

if !exists {
if replicas > 0 {
lrs := &extensionsv1.ReplicaSet{
ObjectMeta: apiv1.ObjectMeta{
Name: frs.Name,
Namespace: frs.Namespace,
Labels: frs.Labels,
Annotations: frs.Annotations,
},
Spec: frs.Spec,
}
specReplicas := int32(replicas)
lrs.Spec.Replicas = &specReplicas
lrs, err = clusterClient.Extensions().ReplicaSets(frs.Namespace).Create(lrs)
if err != nil {
return err
}
fedStatus.Replicas += lrs.Status.Replicas
fedStatus.FullyLabeledReplicas += lrs.Status.FullyLabeledReplicas
operations = append(operations, fedutil.FederatedOperation{
Type: fedutil.OperationTypeAdd,
Obj: lrs,
ClusterName: clusterName,
})
}
} else {
lrs := lrsObj.(*extensionsv1.ReplicaSet)
lrsExpectedSpec := frs.Spec
specReplicas := int32(replicas)
lrsExpectedSpec.Replicas = &specReplicas
if !reflect.DeepEqual(lrs.Spec, lrsExpectedSpec) {
lrs.Spec = lrsExpectedSpec
lrs, err = clusterClient.Extensions().ReplicaSets(frs.Namespace).Update(lrs)
if err != nil {
return err
}
currentLrs := lrsObj.(*extensionsv1.ReplicaSet)
// Update existing replica set, if needed.
if !fedutil.ObjectMetaEquivalent(lrs.ObjectMeta, currentLrs.ObjectMeta) ||
!reflect.DeepEqual(lrs.Spec, currentLrs.Spec) {
operations = append(operations, fedutil.FederatedOperation{
Type: fedutil.OperationTypeUpdate,
Obj: lrs,
ClusterName: clusterName,
})
}
fedStatus.Replicas += lrs.Status.Replicas
fedStatus.FullyLabeledReplicas += lrs.Status.FullyLabeledReplicas
fedStatus.Replicas += currentLrs.Status.Replicas
fedStatus.FullyLabeledReplicas += currentLrs.Status.FullyLabeledReplicas
// leave the replicaset even the replicas dropped to 0
}
}
if fedStatus.Replicas != frs.Status.Replicas || fedStatus.FullyLabeledReplicas != frs.Status.FullyLabeledReplicas {
frs.Status = fedStatus
_, err = frsc.fedClient.Extensions().ReplicaSets(frs.Namespace).UpdateStatus(frs)
if err != nil {
return err
return statusError, err
}
}

return nil
if len(operations) == 0 {
// Everything is in order
return statusAllOk, nil
}
err = frsc.fedUpdater.Update(operations, updateTimeout)
if err != nil {
glog.Errorf("Failed to execute updates for %s: %v", key, err)
return statusError, err
}

// Some operations were made, reconcile after a while.
return statusNeedRecheck, nil
}

func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() {
Expand Down