Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "kubectl: Make scaling smarter" #20030

Merged
merged 1 commit into from
Jan 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/client/unversioned/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func JobHasDesiredParallelism(c ExtensionsInterface, job *extensions.Job) wait.C
}

// desired parallelism can be either the exact number, in which case return immediately
if job.Spec.Parallelism != nil && job.Status.Active == *job.Spec.Parallelism {
if job.Status.Active == *job.Spec.Parallelism {
return true, nil
}
// otherwise count successful
Expand Down
4 changes: 0 additions & 4 deletions pkg/kubectl/cmd/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,6 @@ func RunScale(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []stri
errs := []error{}
for _, info := range infos {
if err := scaler.Scale(info.Namespace, info.Name, uint(count), precondition, retry, waitForReplicas); err != nil {
if scaleErr, ok := err.(kubectl.ScaleError); ok && scaleErr.FailureType == kubectl.AlreadyScaled {
cmdutil.PrintSuccess(mapper, shortOutput, out, info.Mapping.Resource, info.Name, "already scaled")
continue
}
errs = append(errs, err)
continue
}
Expand Down
26 changes: 5 additions & 21 deletions pkg/kubectl/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package kubectl

import (
goerrors "errors"
"fmt"
"strconv"
"time"
Expand Down Expand Up @@ -80,13 +79,8 @@ const (
ScaleGetFailure ScaleErrorType = iota
ScaleUpdateFailure
ScaleUpdateInvalidFailure
// AlreadyScaled is not really an error but we need a way to surface to the client that
// the scaling didn't happen because we already have the desired state the user asked for.
AlreadyScaled
)

var alreadyScaledErr = goerrors.New("desired replicas already equals the requested replicas")

// A ScaleError is returned when a scale request passes
// preconditions but fails to actually scale the controller.
type ScaleError struct {
Expand Down Expand Up @@ -118,14 +112,12 @@ func ScaleCondition(r Scaler, precondition *ScalePrecondition, namespace, name s
case nil:
return true, nil
case ScaleError:
switch e.FailureType {
case ScaleUpdateInvalidFailure:
// if it's invalid we shouldn't keep waiting
// if it's invalid we shouldn't keep waiting
if e.FailureType == ScaleUpdateInvalidFailure {
return false, err
case ScaleUpdateFailure:
}
if e.FailureType == ScaleUpdateFailure {
return false, nil
case AlreadyScaled:
return false, err
}
}
return false, err
Expand Down Expand Up @@ -157,10 +149,8 @@ func (scaler *ReplicationControllerScaler) ScaleSimple(namespace, name string, p
return err
}
}
if controller.Spec.Replicas == int(newSize) {
return ScaleError{AlreadyScaled, controller.ResourceVersion, alreadyScaledErr}
}
controller.Spec.Replicas = int(newSize)
// TODO: do retry on 409 errors here?
if _, err := scaler.c.ReplicationControllers(namespace).Update(controller); err != nil {
if errors.IsInvalid(err) {
return ScaleError{ScaleUpdateInvalidFailure, controller.ResourceVersion, err}
Expand Down Expand Up @@ -226,9 +216,6 @@ func (scaler *JobScaler) ScaleSimple(namespace, name string, preconditions *Scal
}
}
parallelism := int(newSize)
if job.Spec.Parallelism != nil && *job.Spec.Parallelism == parallelism {
return ScaleError{AlreadyScaled, job.ResourceVersion, alreadyScaledErr}
}
job.Spec.Parallelism = &parallelism
if _, err := scaler.c.Jobs(namespace).Update(job); err != nil {
if errors.IsInvalid(err) {
Expand Down Expand Up @@ -292,9 +279,6 @@ func (scaler *DeploymentScaler) ScaleSimple(namespace, name string, precondition
}
}
scale := extensions.ScaleFromDeployment(deployment)
if scale.Spec.Replicas == int(newSize) {
return ScaleError{AlreadyScaled, deployment.ResourceVersion, alreadyScaledErr}
}
scale.Spec.Replicas = int(newSize)
if _, err := scaler.c.Scales(namespace).Update("Deployment", scale); err != nil {
if errors.IsInvalid(err) {
Expand Down
76 changes: 0 additions & 76 deletions pkg/kubectl/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,31 +132,6 @@ func TestReplicationControllerScaleFailsPreconditions(t *testing.T) {
}
}

func TestReplicationControllerAlreadyScaled(t *testing.T) {
fake := testclient.NewSimpleFake(&api.ReplicationController{
Spec: api.ReplicationControllerSpec{
Replicas: 3,
},
})
scaler := ReplicationControllerScaler{fake}
count := uint(3)
name := "foo"
err := scaler.Scale("default", name, count, nil, nil, nil)
if err == nil {
t.Fatal("expected AlreadyScaled error, got nil")
}
if scaleErr, ok := err.(ScaleError); !ok || scaleErr.FailureType != AlreadyScaled {
t.Fatalf("expected AlreadyScaled error, got %s", scaleErr.FailureType)
}
actions := fake.Actions()
if len(actions) != 1 {
t.Fatalf("unexpected actions: %v, expected 1 action (get)", actions)
}
if action, ok := actions[0].(testclient.GetAction); !ok || action.GetResource() != "replicationcontrollers" || action.GetName() != name {
t.Errorf("unexpected action: %v, expected get-replicationController %s", actions[0], name)
}
}

func TestValidateReplicationController(t *testing.T) {
tests := []struct {
preconditions ScalePrecondition
Expand Down Expand Up @@ -387,32 +362,6 @@ func TestJobScaleFailsPreconditions(t *testing.T) {
}
}

func TestJobAlreadyScaled(t *testing.T) {
three := 3
fake := testclient.NewSimpleFake(&extensions.Job{
Spec: extensions.JobSpec{
Parallelism: &three,
},
})
scaler := JobScaler{&testclient.FakeExperimental{fake}}
count := uint(3)
name := "foo"
err := scaler.Scale("default", name, count, nil, nil, nil)
if err == nil {
t.Fatal("expected AlreadyScaled error, got nil")
}
if scaleErr, ok := err.(ScaleError); !ok || scaleErr.FailureType != AlreadyScaled {
t.Fatalf("expected AlreadyScaled error, got %s", scaleErr.FailureType)
}
actions := fake.Actions()
if len(actions) != 1 {
t.Fatalf("unexpected actions: %v, expected 1 action (get)", actions)
}
if action, ok := actions[0].(testclient.GetAction); !ok || action.GetResource() != "jobs" || action.GetName() != name {
t.Errorf("unexpected action: %v, expected get-job %s", actions[0], name)
}
}

func TestValidateJob(t *testing.T) {
zero, ten, twenty := 0, 10, 20
tests := []struct {
Expand Down Expand Up @@ -677,31 +626,6 @@ func TestDeploymentScaleFailsPreconditions(t *testing.T) {
}
}

func TestDeploymentAlreadyScaled(t *testing.T) {
fake := testclient.NewSimpleFake(&extensions.Deployment{
Spec: extensions.DeploymentSpec{
Replicas: 3,
},
})
scaler := DeploymentScaler{&testclient.FakeExperimental{fake}}
count := uint(3)
name := "foo"
err := scaler.Scale("default", name, count, nil, nil, nil)
if err == nil {
t.Fatal("expected AlreadyScaled error, got nil")
}
if scaleErr, ok := err.(ScaleError); !ok || scaleErr.FailureType != AlreadyScaled {
t.Fatalf("expected AlreadyScaled error, got %s", scaleErr.FailureType)
}
actions := fake.Actions()
if len(actions) != 1 {
t.Fatalf("unexpected actions: %v, expected 1 action (get)", actions)
}
if action, ok := actions[0].(testclient.GetAction); !ok || action.GetResource() != "deployments" || action.GetName() != name {
t.Errorf("unexpected action: %v, expected get-deployment %s", actions[0], name)
}
}

func TestValidateDeployment(t *testing.T) {
zero, ten, twenty := 0, 10, 20
tests := []struct {
Expand Down
56 changes: 27 additions & 29 deletions pkg/kubectl/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
client "k8s.io/kubernetes/pkg/client/unversioned"
Expand Down Expand Up @@ -82,6 +83,30 @@ func ReaperForReplicationController(c client.Interface, timeout time.Duration) (
return &ReplicationControllerReaper{c, Interval, timeout}, nil
}

type ReplicationControllerReaper struct {
client.Interface
pollInterval, timeout time.Duration
}
type DaemonSetReaper struct {
client.Interface
pollInterval, timeout time.Duration
}
type JobReaper struct {
client.Interface
pollInterval, timeout time.Duration
}
type PodReaper struct {
client.Interface
}
type ServiceReaper struct {
client.Interface
}

type objInterface interface {
Delete(name string) error
Get(name string) (meta.Object, error)
}

// getOverlappingControllers finds rcs that this controller overlaps, as well as rcs overlapping this controller.
func getOverlappingControllers(c client.ReplicationControllerInterface, rc *api.ReplicationController) ([]api.ReplicationController, error) {
rcs, err := c.List(api.ListOptions{})
Expand All @@ -99,11 +124,6 @@ func getOverlappingControllers(c client.ReplicationControllerInterface, rc *api.
return matchingRCs, nil
}

type ReplicationControllerReaper struct {
client.Interface
pollInterval, timeout time.Duration
}

func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
rc := reaper.ReplicationControllers(namespace)
scaler, err := ScalerFor(api.Kind("ReplicationController"), *reaper)
Expand Down Expand Up @@ -161,9 +181,7 @@ func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
waitForReplicas := NewRetryParams(reaper.pollInterval, timeout)
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil {
if scaleErr, ok := err.(ScaleError); !ok || scaleErr.FailureType != AlreadyScaled {
return err
}
return err
}
}
if err := rc.Delete(name); err != nil {
Expand All @@ -172,11 +190,6 @@ func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout
return nil
}

type DaemonSetReaper struct {
client.Interface
pollInterval, timeout time.Duration
}

func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
ds, err := reaper.Extensions().DaemonSets(namespace).Get(name)
if err != nil {
Expand Down Expand Up @@ -214,11 +227,6 @@ func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duratio
return nil
}

type JobReaper struct {
client.Interface
pollInterval, timeout time.Duration
}

func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
jobs := reaper.Extensions().Jobs(namespace)
pods := reaper.Pods(namespace)
Expand All @@ -240,9 +248,7 @@ func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gra
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
waitForJobs := NewRetryParams(reaper.pollInterval, timeout)
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForJobs); err != nil {
if scaleErr, ok := err.(ScaleError); !ok || scaleErr.FailureType != AlreadyScaled {
return err
}
return err
}
// at this point only dead pods are left, that should be removed
selector, _ := extensions.LabelSelectorAsSelector(job.Spec.Selector)
Expand Down Expand Up @@ -270,10 +276,6 @@ func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gra
return nil
}

type PodReaper struct {
client.Interface
}

func (reaper *PodReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
pods := reaper.Pods(namespace)
_, err := pods.Get(name)
Expand All @@ -287,10 +289,6 @@ func (reaper *PodReaper) Stop(namespace, name string, timeout time.Duration, gra
return nil
}

type ServiceReaper struct {
client.Interface
}

func (reaper *ServiceReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
services := reaper.Services(namespace)
_, err := services.Get(name)
Expand Down