Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kubectl: Make scaling smarter #18169

Merged
merged 1 commit into from
Jan 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/client/unversioned/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func JobHasDesiredParallelism(c ExtensionsInterface, job *extensions.Job) wait.C
}

// desired parallelism can be either the exact number, in which case return immediately
if job.Status.Active == *job.Spec.Parallelism {
if job.Spec.Parallelism != nil && job.Status.Active == *job.Spec.Parallelism {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parallelism will never be nil, since it's defaulted if not present. No need to change that.

return true, nil
}
// otherwise count successful
Expand Down
4 changes: 4 additions & 0 deletions pkg/kubectl/cmd/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func RunScale(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []stri
errs := []error{}
for _, info := range infos {
if err := scaler.Scale(info.Namespace, info.Name, uint(count), precondition, retry, waitForReplicas); err != nil {
if scaleErr, ok := err.(kubectl.ScaleError); ok && scaleErr.FailureType == kubectl.AlreadyScaled {
cmdutil.PrintSuccess(mapper, shortOutput, out, info.Mapping.Resource, info.Name, "already scaled")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kubernetes/rh-ux @kubernetes/goog-ux

continue
}
errs = append(errs, err)
continue
}
Expand Down
26 changes: 21 additions & 5 deletions pkg/kubectl/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package kubectl

import (
goerrors "errors"
"fmt"
"strconv"
"time"
Expand Down Expand Up @@ -79,8 +80,13 @@ const (
ScaleGetFailure ScaleErrorType = iota
ScaleUpdateFailure
ScaleUpdateInvalidFailure
// AlreadyScaled is not really an error but we need a way to surface to the client that
// the scaling didn't happen because we already have the desired state the user asked for.
AlreadyScaled
)

var alreadyScaledErr = goerrors.New("desired replicas already equals the requested replicas")

// A ScaleError is returned when a scale request passes
// preconditions but fails to actually scale the controller.
type ScaleError struct {
Expand Down Expand Up @@ -112,12 +118,14 @@ func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name s
case nil:
return true, nil
case ScaleError:
// if it's invalid we shouldn't keep waiting
if e.FailureType == ScaleUpdateInvalidFailure {
switch e.FailureType {
case ScaleUpdateInvalidFailure:
// if it's invalid we shouldn't keep waiting
return false, err
}
if e.FailureType == ScaleUpdateFailure {
case ScaleUpdateFailure:
return false, nil
case AlreadyScaled:
return false, err
}
}
return false, err
Expand Down Expand Up @@ -149,8 +157,10 @@ func (scaler *ReplicationControllerScaler) ScaleSimple(namespace, name string, p
return err
}
}
if controller.Spec.Replicas == int(newSize) {
return ScaleError{AlreadyScaled, controller.ResourceVersion, alreadyScaledErr}
}
controller.Spec.Replicas = int(newSize)
// TODO: do retry on 409 errors here?
if _, err := scaler.c.ReplicationControllers(namespace).Update(controller); err != nil {
if errors.IsInvalid(err) {
return ScaleError{ScaleUpdateInvalidFailure, controller.ResourceVersion, err}
Expand Down Expand Up @@ -216,6 +226,9 @@ func (scaler *JobScaler) ScaleSimple(namespace, name string, preconditions *Scal
}
}
parallelism := int(newSize)
if job.Spec.Parallelism != nil && *job.Spec.Parallelism == parallelism {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again no need for nil check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cringe in the face of accessing a pointer w/o checking for nil first. I am going to leave this around since it doesn't harm having safer code:)

return ScaleError{AlreadyScaled, job.ResourceVersion, alreadyScaledErr}
}
job.Spec.Parallelism = &parallelism
if _, err := scaler.c.Jobs(namespace).Update(job); err != nil {
if errors.IsInvalid(err) {
Expand Down Expand Up @@ -279,6 +292,9 @@ func (scaler *DeploymentScaler) ScaleSimple(namespace, name string, precondition
}
}
scale := extensions.ScaleFromDeployment(deployment)
if scale.Spec.Replicas == int(newSize) {
return ScaleError{AlreadyScaled, deployment.ResourceVersion, alreadyScaledErr}
}
scale.Spec.Replicas = int(newSize)
if _, err := scaler.c.Scales(namespace).Update("Deployment", scale); err != nil {
if errors.IsInvalid(err) {
Expand Down
76 changes: 76 additions & 0 deletions pkg/kubectl/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,31 @@ func TestReplicationControllerScaleFailsPreconditions(t *testing.T) {
}
}

func TestReplicationControllerAlreadyScaled(t *testing.T) {
fake := testclient.NewSimpleFake(&api.ReplicationController{
Spec: api.ReplicationControllerSpec{
Replicas: 3,
},
})
scaler := ReplicationControllerScaler{fake}
count := uint(3)
name := "foo"
err := scaler.Scale("default", name, count, nil, nil, nil)
if err == nil {
t.Fatal("expected AlreadyScaled error, got nil")
}
if scaleErr, ok := err.(ScaleError); !ok || scaleErr.FailureType != AlreadyScaled {
t.Fatalf("expected AlreadyScaled error, got %s", scaleErr.FailureType)
}
actions := fake.Actions()
if len(actions) != 1 {
t.Fatalf("unexpected actions: %v, expected 1 action (get)", actions)
}
if action, ok := actions[0].(testclient.GetAction); !ok || action.GetResource() != "replicationcontrollers" || action.GetName() != name {
t.Errorf("unexpected action: %v, expected get-replicationController %s", actions[0], name)
}
}

func TestValidateReplicationController(t *testing.T) {
tests := []struct {
preconditions ScalePrecondition
Expand Down Expand Up @@ -362,6 +387,32 @@ func TestJobScaleFailsPreconditions(t *testing.T) {
}
}

func TestJobAlreadyScaled(t *testing.T) {
three := 3
fake := testclient.NewSimpleFake(&extensions.Job{
Spec: extensions.JobSpec{
Parallelism: &three,
},
})
scaler := JobScaler{&testclient.FakeExperimental{fake}}
count := uint(3)
name := "foo"
err := scaler.Scale("default", name, count, nil, nil, nil)
if err == nil {
t.Fatal("expected AlreadyScaled error, got nil")
}
if scaleErr, ok := err.(ScaleError); !ok || scaleErr.FailureType != AlreadyScaled {
t.Fatalf("expected AlreadyScaled error, got %s", scaleErr.FailureType)
}
actions := fake.Actions()
if len(actions) != 1 {
t.Fatalf("unexpected actions: %v, expected 1 action (get)", actions)
}
if action, ok := actions[0].(testclient.GetAction); !ok || action.GetResource() != "jobs" || action.GetName() != name {
t.Errorf("unexpected action: %v, expected get-job %s", actions[0], name)
}
}

func TestValidateJob(t *testing.T) {
zero, ten, twenty := 0, 10, 20
tests := []struct {
Expand Down Expand Up @@ -626,6 +677,31 @@ func TestDeploymentScaleFailsPreconditions(t *testing.T) {
}
}

func TestDeploymentAlreadyScaled(t *testing.T) {
fake := testclient.NewSimpleFake(&extensions.Deployment{
Spec: extensions.DeploymentSpec{
Replicas: 3,
},
})
scaler := DeploymentScaler{&testclient.FakeExperimental{fake}}
count := uint(3)
name := "foo"
err := scaler.Scale("default", name, count, nil, nil, nil)
if err == nil {
t.Fatal("expected AlreadyScaled error, got nil")
}
if scaleErr, ok := err.(ScaleError); !ok || scaleErr.FailureType != AlreadyScaled {
t.Fatalf("expected AlreadyScaled error, got %s", scaleErr.FailureType)
}
actions := fake.Actions()
if len(actions) != 1 {
t.Fatalf("unexpected actions: %v, expected 1 action (get)", actions)
}
if action, ok := actions[0].(testclient.GetAction); !ok || action.GetResource() != "deployments" || action.GetName() != name {
t.Errorf("unexpected action: %v, expected get-deployment %s", actions[0], name)
}
}

func TestValidateDeployment(t *testing.T) {
zero, ten, twenty := 0, 10, 20
tests := []struct {
Expand Down
56 changes: 29 additions & 27 deletions pkg/kubectl/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
client "k8s.io/kubernetes/pkg/client/unversioned"
Expand Down Expand Up @@ -83,30 +82,6 @@ func ReaperForReplicationController(c client.Interface, timeout time.Duration) (
return &ReplicationControllerReaper{c, Interval, timeout}, nil
}

type ReplicationControllerReaper struct {
client.Interface
pollInterval, timeout time.Duration
}
type DaemonSetReaper struct {
client.Interface
pollInterval, timeout time.Duration
}
type JobReaper struct {
client.Interface
pollInterval, timeout time.Duration
}
type PodReaper struct {
client.Interface
}
type ServiceReaper struct {
client.Interface
}

type objInterface interface {
Delete(name string) error
Get(name string) (meta.Object, error)
}

// getOverlappingControllers finds rcs that this controller overlaps, as well as rcs overlapping this controller.
func getOverlappingControllers(c client.ReplicationControllerInterface, rc *api.ReplicationController) ([]api.ReplicationController, error) {
rcs, err := c.List(api.ListOptions{})
Expand All @@ -124,6 +99,11 @@ func getOverlappingControllers(c client.ReplicationControllerInterface, rc *api.
return matchingRCs, nil
}

type ReplicationControllerReaper struct {
client.Interface
pollInterval, timeout time.Duration
}

func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
rc := reaper.ReplicationControllers(namespace)
scaler, err := ScalerFor(api.Kind("ReplicationController"), *reaper)
Expand Down Expand Up @@ -181,7 +161,9 @@ func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
waitForReplicas := NewRetryParams(reaper.pollInterval, timeout)
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil {
return err
if scaleErr, ok := err.(ScaleError); !ok || scaleErr.FailureType != AlreadyScaled {
return err
}
}
}
if err := rc.Delete(name); err != nil {
Expand All @@ -190,6 +172,11 @@ func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout
return nil
}

type DaemonSetReaper struct {
client.Interface
pollInterval, timeout time.Duration
}

func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
ds, err := reaper.Extensions().DaemonSets(namespace).Get(name)
if err != nil {
Expand Down Expand Up @@ -227,6 +214,11 @@ func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duratio
return nil
}

type JobReaper struct {
client.Interface
pollInterval, timeout time.Duration
}

func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
jobs := reaper.Extensions().Jobs(namespace)
pods := reaper.Pods(namespace)
Expand All @@ -248,7 +240,9 @@ func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gra
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
waitForJobs := NewRetryParams(reaper.pollInterval, timeout)
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForJobs); err != nil {
return err
if scaleErr, ok := err.(ScaleError); !ok || scaleErr.FailureType != AlreadyScaled {
return err
}
}
// at this point only dead pods are left, that should be removed
selector, _ := extensions.LabelSelectorAsSelector(job.Spec.Selector)
Expand Down Expand Up @@ -276,6 +270,10 @@ func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gra
return nil
}

type PodReaper struct {
client.Interface
}

func (reaper *PodReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
pods := reaper.Pods(namespace)
_, err := pods.Get(name)
Expand All @@ -289,6 +287,10 @@ func (reaper *PodReaper) Stop(namespace, name string, timeout time.Duration, gra
return nil
}

type ServiceReaper struct {
client.Interface
}

func (reaper *ServiceReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
services := reaper.Services(namespace)
_, err := services.Get(name)
Expand Down