Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Garbage collection by default for RS and RC #41282

Merged
merged 1 commit into from
Feb 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion cmd/kube-controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func startReplicationController(ctx ControllerContext) (bool, error) {
ctx.ClientBuilder.ClientOrDie("replication-controller"),
replicationcontroller.BurstReplicas,
int(ctx.Options.LookupCacheSizeForRC),
ctx.Options.EnableGarbageCollector,
).Run(int(ctx.Options.ConcurrentRCSyncs), ctx.Stop)
return true, nil
}
Expand Down
1 change: 0 additions & 1 deletion cmd/kube-controller-manager/app/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func startReplicaSetController(ctx ControllerContext) (bool, error) {
ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
int(ctx.Options.LookupCacheSizeForRS),
ctx.Options.EnableGarbageCollector,
).Run(int(ctx.Options.ConcurrentRSSyncs), ctx.Stop)
return true, nil
}
61 changes: 22 additions & 39 deletions pkg/controller/replicaset/replica_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,10 @@ type ReplicaSetController struct {

// Controllers that need to be synced
queue workqueue.RateLimitingInterface

// garbageCollectorEnabled denotes if the garbage collector is enabled. RC
// manager behaves differently if GC is enabled.
garbageCollectorEnabled bool
}

// NewReplicaSetController configures a replica set controller with the specified event recorder
func NewReplicaSetController(rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicaSetController {
func NewReplicaSetController(rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int) *ReplicaSetController {
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.Core().RESTClient().GetRateLimiter())
}
Expand All @@ -118,7 +114,6 @@ func NewReplicaSetController(rsInformer extensionsinformers.ReplicaSetInformer,
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicaset"),
garbageCollectorEnabled: garbageCollectorEnabled,
}

rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -487,19 +482,15 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte
defer wg.Done()
var err error

if rsc.garbageCollectorEnabled {
var trueVar = true
controllerRef := &metav1.OwnerReference{
APIVersion: getRSKind().GroupVersion().String(),
Kind: getRSKind().Kind,
Name: rs.Name,
UID: rs.UID,
Controller: &trueVar,
}
err = rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
} else {
err = rsc.podControl.CreatePods(rs.Namespace, &rs.Spec.Template, rs)
var trueVar = true
controllerRef := &metav1.OwnerReference{
APIVersion: getRSKind().GroupVersion().String(),
Kind: getRSKind().Kind,
Name: rs.Name,
UID: rs.UID,
Controller: &trueVar,
}
err = rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
if err != nil {
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name)
Expand Down Expand Up @@ -595,27 +586,19 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
// modify them, you need to copy it first.
// TODO: Do the List and Filter in a single pass, or use an index.
var filteredPods []*v1.Pod
if rsc.garbageCollectorEnabled {
// list all pods to include the pods that don't match the rs`s selector
// anymore but has the stale controller ref.
pods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
if err != nil {
return err
}
cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, getRSKind())
filteredPods, err = cm.ClaimPods(pods)
if err != nil {
// Something went wrong with adoption or release.
// Requeue and try again so we don't leave orphans sitting around.
rsc.queue.Add(key)
return err
}
} else {
pods, err := rsc.podLister.Pods(rs.Namespace).List(selector)
if err != nil {
return err
}
filteredPods = controller.FilterActivePods(pods)
// list all pods to include the pods that don't match the rs`s selector
// anymore but has the stale controller ref.
pods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
if err != nil {
return err
}
cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, getRSKind())
filteredPods, err = cm.ClaimPods(pods)
if err != nil {
// Something went wrong with adoption or release.
// Requeue and try again so we don't leave orphans sitting around.
rsc.queue.Add(key)
return err
}

var manageReplicasErr error
Expand Down
44 changes: 23 additions & 21 deletions pkg/controller/replicaset/replica_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func testNewReplicaSetControllerFromClient(client clientset.Interface, stopCh ch
client,
burstReplicas,
lookupCacheSize,
false,
)

ret.podListerSynced = alwaysReady
Expand Down Expand Up @@ -147,7 +146,7 @@ func newReplicaSet(replicas int, selectorMap map[string]string) *extensions.Repl
}

// create a pod with the given phase for the given rs (same selectors and namespace)
func newPod(name string, rs *extensions.ReplicaSet, status v1.PodPhase, lastTransitionTime *metav1.Time) *v1.Pod {
func newPod(name string, rs *extensions.ReplicaSet, status v1.PodPhase, lastTransitionTime *metav1.Time, properlyOwned bool) *v1.Pod {
var conditions []v1.PodCondition
if status == v1.PodRunning {
condition := v1.PodCondition{Type: v1.PodReady, Status: v1.ConditionTrue}
Expand All @@ -156,11 +155,17 @@ func newPod(name string, rs *extensions.ReplicaSet, status v1.PodPhase, lastTran
}
conditions = append(conditions, condition)
}
var controllerReference metav1.OwnerReference
if properlyOwned {
var trueVar = true
controllerReference = metav1.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar}
}
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: rs.Namespace,
Labels: rs.Spec.Selector.MatchLabels,
Name: name,
Namespace: rs.Namespace,
Labels: rs.Spec.Selector.MatchLabels,
OwnerReferences: []metav1.OwnerReference{controllerReference},
},
Status: v1.PodStatus{Phase: status, Conditions: conditions},
}
Expand All @@ -172,7 +177,7 @@ func newPodList(store cache.Store, count int, status v1.PodPhase, labelMap map[s
var trueVar = true
controllerReference := metav1.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar}
for i := 0; i < count; i++ {
pod := newPod(fmt.Sprintf("%s%d", name, i), rs, status, nil)
pod := newPod(fmt.Sprintf("%s%d", name, i), rs, status, nil, false)
pod.ObjectMeta.Labels = labelMap
pod.OwnerReferences = []metav1.OwnerReference{controllerReference}
if store != nil {
Expand Down Expand Up @@ -532,7 +537,6 @@ func TestWatchControllers(t *testing.T) {
client,
BurstReplicas,
0,
false,
)
informers.Start(stopCh)

Expand Down Expand Up @@ -1135,12 +1139,10 @@ func TestDeletionTimestamp(t *testing.T) {
}

// setupManagerWithGCEnabled creates a RS manager with a fakePodControl
// and with garbageCollectorEnabled set to true
func setupManagerWithGCEnabled(stopCh chan struct{}, objs ...runtime.Object) (manager *ReplicaSetController, fakePodControl *controller.FakePodControl, informers informers.SharedInformerFactory) {
c := fakeclientset.NewSimpleClientset(objs...)
fakePodControl = &controller.FakePodControl{}
manager, informers = testNewReplicaSetControllerFromClient(c, stopCh, BurstReplicas, 0)
manager.garbageCollectorEnabled = true

manager.podControl = fakePodControl
return manager, fakePodControl, informers
Expand All @@ -1156,7 +1158,7 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
var trueVar = true
otherControllerReference := metav1.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1beta1", Kind: "ReplicaSet", Name: "AnotherRS", Controller: &trueVar}
// add to podLister a matching Pod controlled by another controller. Expect no patch.
pod := newPod("pod", rs, v1.PodRunning, nil)
pod := newPod("pod", rs, v1.PodRunning, nil, true)
pod.OwnerReferences = []metav1.OwnerReference{otherControllerReference}
informers.Core().V1().Pods().Informer().GetIndexer().Add(pod)
err := manager.syncReplicaSet(getKey(rs, t))
Expand All @@ -1178,7 +1180,7 @@ func TestPatchPodWithOtherOwnerRef(t *testing.T) {
// ref, but has an owner ref pointing to other object. Expect a patch to
// take control of it.
unrelatedOwnerReference := metav1.OwnerReference{UID: uuid.NewUUID(), APIVersion: "batch/v1", Kind: "Job", Name: "Job"}
pod := newPod("pod", rs, v1.PodRunning, nil)
pod := newPod("pod", rs, v1.PodRunning, nil, false)
pod.OwnerReferences = []metav1.OwnerReference{unrelatedOwnerReference}
informers.Core().V1().Pods().Informer().GetIndexer().Add(pod)

Expand All @@ -1200,7 +1202,7 @@ func TestPatchPodWithCorrectOwnerRef(t *testing.T) {
// add to podLister a matching pod that has an ownerRef pointing to the rs,
// but ownerRef.Controller is false. Expect a patch to take control it.
rsOwnerReference := metav1.OwnerReference{UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name}
pod := newPod("pod", rs, v1.PodRunning, nil)
pod := newPod("pod", rs, v1.PodRunning, nil, false)
pod.OwnerReferences = []metav1.OwnerReference{rsOwnerReference}
informers.Core().V1().Pods().Informer().GetIndexer().Add(pod)

Expand All @@ -1221,8 +1223,8 @@ func TestPatchPodFails(t *testing.T) {
informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(rs)
// add to podLister two matching pods. Expect two patches to take control
// them.
informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod1", rs, v1.PodRunning, nil))
informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod2", rs, v1.PodRunning, nil))
informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod1", rs, v1.PodRunning, nil, false))
informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod2", rs, v1.PodRunning, nil, false))
// let both patches fail. The rs controller will assume it fails to take
// control of the pods and requeue to try again.
fakePodControl.Err = fmt.Errorf("Fake Error")
Expand All @@ -1249,9 +1251,9 @@ func TestPatchExtraPodsThenDelete(t *testing.T) {
informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(rs)
// add to podLister three matching pods. Expect three patches to take control
// them, and later delete one of them.
informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod1", rs, v1.PodRunning, nil))
informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod2", rs, v1.PodRunning, nil))
informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod3", rs, v1.PodRunning, nil))
informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod1", rs, v1.PodRunning, nil, false))
informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod2", rs, v1.PodRunning, nil, false))
informers.Core().V1().Pods().Informer().GetIndexer().Add(newPod("pod3", rs, v1.PodRunning, nil, false))
err := manager.syncReplicaSet(getKey(rs, t))
if err != nil {
t.Fatal(err)
Expand All @@ -1268,7 +1270,7 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
manager, fakePodControl, informers := setupManagerWithGCEnabled(stopCh, rs)
informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(rs)
// put one pod in the podLister
pod := newPod("pod", rs, v1.PodRunning, nil)
pod := newPod("pod", rs, v1.PodRunning, nil, false)
pod.ResourceVersion = "1"
var trueVar = true
rsOwnerReference := metav1.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar}
Expand Down Expand Up @@ -1346,7 +1348,7 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
now := metav1.Now()
rs.DeletionTimestamp = &now
informers.Extensions().V1beta1().ReplicaSets().Informer().GetIndexer().Add(rs)
pod1 := newPod("pod1", rs, v1.PodRunning, nil)
pod1 := newPod("pod1", rs, v1.PodRunning, nil, false)
informers.Core().V1().Pods().Informer().GetIndexer().Add(pod1)

// no patch, no create
Expand Down Expand Up @@ -1425,12 +1427,12 @@ func TestAvailableReplicas(t *testing.T) {

// First pod becomes ready 20s ago
moment := metav1.Time{Time: time.Now().Add(-2e10)}
pod := newPod("pod", rs, v1.PodRunning, &moment)
pod := newPod("pod", rs, v1.PodRunning, &moment, true)
informers.Core().V1().Pods().Informer().GetIndexer().Add(pod)

// Second pod becomes ready now
otherMoment := metav1.Now()
otherPod := newPod("otherPod", rs, v1.PodRunning, &otherMoment)
otherPod := newPod("otherPod", rs, v1.PodRunning, &otherMoment, true)
informers.Core().V1().Pods().Informer().GetIndexer().Add(otherPod)

// This response body is just so we don't err out decoding the http response
Expand Down
67 changes: 24 additions & 43 deletions pkg/controller/replication/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,10 @@ type ReplicationManager struct {

// Controllers that need to be synced
queue workqueue.RateLimitingInterface

// garbageCollectorEnabled denotes if the garbage collector is enabled. RC
// manager behaves differently if GC is enabled.
garbageCollectorEnabled bool
}

// NewReplicationManager configures a replication manager with the specified event recorder
func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer coreinformers.ReplicationControllerInformer, kubeClient clientset.Interface, burstReplicas int, lookupCacheSize int) *ReplicationManager {
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().RESTClient().GetRateLimiter())
}
Expand All @@ -114,7 +110,6 @@ func NewReplicationManager(podInformer coreinformers.PodInformer, rcInformer cor
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicationmanager"),
garbageCollectorEnabled: garbageCollectorEnabled,
}

rcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -484,19 +479,15 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.Repl
go func() {
defer wg.Done()
var err error
if rm.garbageCollectorEnabled {
var trueVar = true
controllerRef := &metav1.OwnerReference{
APIVersion: getRCKind().GroupVersion().String(),
Kind: getRCKind().Kind,
Name: rc.Name,
UID: rc.UID,
Controller: &trueVar,
}
err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef)
} else {
err = rm.podControl.CreatePods(rc.Namespace, rc.Spec.Template, rc)
var trueVar = true
controllerRef := &metav1.OwnerReference{
APIVersion: getRCKind().GroupVersion().String(),
Kind: getRCKind().Kind,
Name: rc.Name,
UID: rc.UID,
Controller: &trueVar,
}
err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef)
if err != nil {
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
Expand Down Expand Up @@ -610,31 +601,21 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
// modify them, you need to copy it first.
// TODO: Do the List and Filter in a single pass, or use an index.
var filteredPods []*v1.Pod
if rm.garbageCollectorEnabled {
// list all pods to include the pods that don't match the rc's selector
// anymore but has the stale controller ref.
pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Everything())
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error getting pods for rc %q: %v", key, err))
rm.queue.Add(key)
return err
}
cm := controller.NewPodControllerRefManager(rm.podControl, rc, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), getRCKind())
filteredPods, err = cm.ClaimPods(pods)
if err != nil {
// Something went wrong with adoption or release.
// Requeue and try again so we don't leave orphans sitting around.
rm.queue.Add(key)
return err
}
} else {
pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated())
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error getting pods for rc %q: %v", key, err))
rm.queue.Add(key)
return err
}
filteredPods = controller.FilterActivePods(pods)
// list all pods to include the pods that don't match the rc's selector
// anymore but has the stale controller ref.
pods, err := rm.podLister.Pods(rc.Namespace).List(labels.Everything())
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error getting pods for rc %q: %v", key, err))
rm.queue.Add(key)
return err
}
cm := controller.NewPodControllerRefManager(rm.podControl, rc, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), getRCKind())
filteredPods, err = cm.ClaimPods(pods)
if err != nil {
// Something went wrong with adoption or release.
// Requeue and try again so we don't leave orphans sitting around.
rm.queue.Add(key)
return err
}

var manageReplicasErr error
Expand Down