Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #14917 upstream release 1.1 #15321

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func NewCMServer() *CMServer {
NamespaceSyncPeriod: 5 * time.Minute,
PVClaimBinderSyncPeriod: 10 * time.Second,
HorizontalPodAutoscalerSyncPeriod: 30 * time.Second,
DeploymentControllerSyncPeriod: 1 * time.Minute,
DeploymentControllerSyncPeriod: 30 * time.Second,
RegisterRetryCount: 10,
PodEvictionTimeout: 5 * time.Minute,
ClusterName: "kubernetes",
Expand Down
71 changes: 27 additions & 44 deletions pkg/controller/deployment/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/experimental"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
Expand All @@ -32,14 +33,20 @@ import (
)

type DeploymentController struct {
client client.Interface
expClient client.ExperimentalInterface
client client.Interface
expClient client.ExperimentalInterface
eventRecorder record.EventRecorder
}

func New(client client.Interface) *DeploymentController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(client.Events(""))

return &DeploymentController{
client: client,
expClient: client.Experimental(),
client: client,
expClient: client.Experimental(),
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}),
}
}

Expand Down Expand Up @@ -92,9 +99,7 @@ func (d *DeploymentController) reconcileRollingUpdateDeployment(deployment exper
return err
}

allRCs := []*api.ReplicationController{}
allRCs = append(allRCs, oldRCs...)
allRCs = append(allRCs, newRC)
allRCs := append(oldRCs, newRC)

// Scale up, if we can.
scaledUp, err := d.scaleUp(allRCs, newRC, deployment)
Expand Down Expand Up @@ -153,26 +158,6 @@ func (d *DeploymentController) getNewRC(deployment experimental.Deployment) (*ap
return createdRC, nil
}

func (d *DeploymentController) getPodsForRCs(replicationControllers []*api.ReplicationController) ([]api.Pod, error) {
allPods := []api.Pod{}
for _, rc := range replicationControllers {
podList, err := d.client.Pods(rc.ObjectMeta.Namespace).List(labels.SelectorFromSet(rc.Spec.Selector), fields.Everything())
if err != nil {
return allPods, fmt.Errorf("error listing pods: %v", err)
}
allPods = append(allPods, podList.Items...)
}
return allPods, nil
}

func (d *DeploymentController) getReplicaCountForRCs(replicationControllers []*api.ReplicationController) int {
totalReplicaCount := 0
for _, rc := range replicationControllers {
totalReplicaCount += rc.Spec.Replicas
}
return totalReplicaCount
}

func (d *DeploymentController) scaleUp(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment experimental.Deployment) (bool, error) {
if newRC.Spec.Replicas == deployment.Spec.Replicas {
// Scaling up not required.
Expand All @@ -186,11 +171,7 @@ func (d *DeploymentController) scaleUp(allRCs []*api.ReplicationController, newR
maxSurge = util.GetValueFromPercent(maxSurge, deployment.Spec.Replicas)
}
// Find the total number of pods
allPods, err := d.getPodsForRCs(allRCs)
if err != nil {
return false, err
}
currentPodCount := len(allPods)
currentPodCount := deploymentUtil.GetReplicaCountForRCs(allRCs)
// Check if we can scale up.
maxTotalPods := deployment.Spec.Replicas + maxSurge
if currentPodCount >= maxTotalPods {
Expand All @@ -200,12 +181,16 @@ func (d *DeploymentController) scaleUp(allRCs []*api.ReplicationController, newR
// Scale up.
scaleUpCount := maxTotalPods - currentPodCount
scaleUpCount = int(math.Min(float64(scaleUpCount), float64(deployment.Spec.Replicas-newRC.Spec.Replicas)))
_, err = d.scaleRC(newRC, newRC.Spec.Replicas+scaleUpCount)
newReplicasCount := newRC.Spec.Replicas + scaleUpCount
_, err = d.scaleRC(newRC, newReplicasCount)
if err == nil {
d.eventRecorder.Eventf(&deployment, "ScalingRC", "Scaled up rc %s to %d", newRC.Name, newReplicasCount)
}
return true, err
}

func (d *DeploymentController) scaleDown(allRCs []*api.ReplicationController, oldRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment experimental.Deployment) (bool, error) {
oldPodsCount := d.getReplicaCountForRCs(oldRCs)
oldPodsCount := deploymentUtil.GetReplicaCountForRCs(oldRCs)
if oldPodsCount == 0 {
// Cant scale down further
return false, nil
Expand All @@ -220,13 +205,9 @@ func (d *DeploymentController) scaleDown(allRCs []*api.ReplicationController, ol
// Check if we can scale down.
minAvailable := deployment.Spec.Replicas - maxUnavailable
// Find the number of ready pods.
// TODO: Use MinReadySeconds once https://github.com/kubernetes/kubernetes/pull/12894 is merged.
readyPodCount := 0
allPods, err := d.getPodsForRCs(allRCs)
for _, pod := range allPods {
if api.IsPodReady(&pod) {
readyPodCount++
}
readyPodCount, err := deploymentUtil.GetAvailablePodsForRCs(d.client, allRCs)
if err != nil {
return false, fmt.Errorf("could not find available pods: %v", err)
}

if readyPodCount <= minAvailable {
Expand All @@ -245,18 +226,20 @@ func (d *DeploymentController) scaleDown(allRCs []*api.ReplicationController, ol
}
// Scale down.
scaleDownCount := int(math.Min(float64(targetRC.Spec.Replicas), float64(totalScaleDownCount)))
_, err = d.scaleRC(targetRC, targetRC.Spec.Replicas-scaleDownCount)
newReplicasCount := targetRC.Spec.Replicas - scaleDownCount
_, err = d.scaleRC(targetRC, newReplicasCount)
if err != nil {
return false, err
}
d.eventRecorder.Eventf(&deployment, "ScalingRC", "Scaled down rc %s to %d", targetRC.Name, newReplicasCount)
totalScaleDownCount -= scaleDownCount
}
return true, err
}

func (d *DeploymentController) updateDeploymentStatus(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment experimental.Deployment) error {
totalReplicas := d.getReplicaCountForRCs(allRCs)
updatedReplicas := d.getReplicaCountForRCs([]*api.ReplicationController{newRC})
totalReplicas := deploymentUtil.GetReplicaCountForRCs(allRCs)
updatedReplicas := deploymentUtil.GetReplicaCountForRCs([]*api.ReplicationController{newRC})
newDeployment := deployment
// TODO: Reconcile this with API definition. API definition talks about ready pods, while this just computes created pods.
newDeployment.Status = experimental.DeploymentStatus{
Expand Down
4 changes: 4 additions & 0 deletions pkg/kubectl/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -1435,6 +1435,10 @@ func (dd *DeploymentDescriber) Describe(namespace, name string) (string, error)
}
fmt.Fprintf(out, "NewReplicationController:\t%s\n", printReplicationControllersByLabels(newRCs))
}
events, err := dd.Events(namespace).Search(d)
if err == nil && events != nil {
DescribeEvents(events, out)
}
return nil
})
}
Expand Down
37 changes: 37 additions & 0 deletions pkg/util/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,40 @@ func GetPodTemplateSpecHash(template *api.PodTemplateSpec) uint32 {
util.DeepHashObject(podTemplateSpecHasher, template)
return podTemplateSpecHasher.Sum32()
}

// Returns the sum of Replicas of the given replication controllers.
func GetReplicaCountForRCs(replicationControllers []*api.ReplicationController) int {
totalReplicaCount := 0
for _, rc := range replicationControllers {
totalReplicaCount += rc.Spec.Replicas
}
return totalReplicaCount
}

// Returns the number of available pods corresponding to the given RCs.
func GetAvailablePodsForRCs(c client.Interface, rcs []*api.ReplicationController) (int, error) {
// TODO: Use MinReadySeconds once https://github.com/kubernetes/kubernetes/pull/12894 is merged.
allPods, err := getPodsForRCs(c, rcs)
if err != nil {
return 0, err
}
readyPodCount := 0
for _, pod := range allPods {
if api.IsPodReady(&pod) {
readyPodCount++
}
}
return readyPodCount, nil
}

func getPodsForRCs(c client.Interface, replicationControllers []*api.ReplicationController) ([]api.Pod, error) {
allPods := []api.Pod{}
for _, rc := range replicationControllers {
podList, err := c.Pods(rc.ObjectMeta.Namespace).List(labels.SelectorFromSet(rc.Spec.Selector), fields.Everything())
if err != nil {
return allPods, fmt.Errorf("error listing pods: %v", err)
}
allPods = append(allPods, podList.Items...)
}
return allPods, nil
}