Skip to content

Commit

Permalink
Support recovery from in the middle of a rename.
Browse files Browse the repository at this point in the history
  • Loading branch information
brendandburns committed May 2, 2015
1 parent 262c34e commit 5d1a6b6
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 6 deletions.
18 changes: 14 additions & 4 deletions pkg/kubectl/cmd/rollingupdate.go
Expand Up @@ -129,16 +129,26 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg
return err
}

updaterClient := kubectl.NewRollingUpdaterClient(client)

var newRc *api.ReplicationController
// fetch rc
oldRc, err := client.ReplicationControllers(cmdNamespace).Get(oldName)
if err != nil {
return err
if !apierrors.IsNotFound(err) || len(image) == 0 || len(args) > 1 {
return err
}
// We're in the middle of a rename, look for an RC with a source annotation of oldName
newRc, err := kubectl.FindSourceController(updaterClient, cmdNamespace, oldName)
if err != nil {
return err
}
return kubectl.Rename(kubectl.NewRollingUpdaterClient(client), newRc, oldName)
}

keepOldName := false

mapper, typer := f.Object()
var newRc *api.ReplicationController

if len(filename) != 0 {
obj, err := resource.NewBuilder(mapper, typer, f.ClientMapperForCommand()).
Expand All @@ -161,7 +171,7 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg

if len(args) >= 2 {
newName = args[1]
} else {
} else if oldRc != nil {
newName, _ = kubectl.GetNextControllerAnnotation(oldRc)
}

Expand Down Expand Up @@ -217,7 +227,7 @@ func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, arg
filename, oldName)
}

updater := kubectl.NewRollingUpdater(newRc.Namespace, kubectl.NewRollingUpdaterClient(client))
updater := kubectl.NewRollingUpdater(newRc.Namespace, updaterClient)

var hasLabel bool
for key, oldValue := range oldRc.Spec.Selector {
Expand Down
28 changes: 26 additions & 2 deletions pkg/kubectl/rolling_updater.go
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
)

Expand Down Expand Up @@ -97,6 +98,20 @@ func SetNextControllerAnnotation(rc *api.ReplicationController, name string) {
rc.Annotations[nextControllerAnnotation] = name
}

func FindSourceController(r RollingUpdaterClient, namespace, name string) (*api.ReplicationController, error) {
list, err := r.ListReplicationControllers(namespace, labels.Everything())
if err != nil {
return nil, err
}
for ix := range list.Items {
rc := &list.Items[ix]
if rc.Annotations != nil && rc.Annotations[sourceIdAnnotation] == name {
return rc, nil
}
}
return nil, fmt.Errorf("couldn't find a replication controller with source id == %s/%s", namespace, name)
}

// Update all pods for a ReplicationController (oldRc) by creating a new
// controller (newRc) with 0 replicas, and synchronously resizing oldRc,newRc
// by 1 until oldRc has 0 replicas and newRc has the original # of desired
Expand Down Expand Up @@ -268,19 +283,24 @@ func (r *RollingUpdater) updateAndWait(rc *api.ReplicationController, interval,
}

func (r *RollingUpdater) rename(rc *api.ReplicationController, newName string) error {
return Rename(r.c, rc, newName)
}

func Rename(c RollingUpdaterClient, rc *api.ReplicationController, newName string) error {
oldName := rc.Name
rc.Name = newName
rc.ResourceVersion = ""

_, err := r.c.CreateReplicationController(rc.Namespace, rc)
_, err := c.CreateReplicationController(rc.Namespace, rc)
if err != nil {
return err
}
return r.c.DeleteReplicationController(rc.Namespace, oldName)
return c.DeleteReplicationController(rc.Namespace, oldName)
}

// RollingUpdaterClient abstracts access to ReplicationControllers.
type RollingUpdaterClient interface {
ListReplicationControllers(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error)
GetReplicationController(namespace, name string) (*api.ReplicationController, error)
UpdateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error)
CreateReplicationController(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error)
Expand All @@ -297,6 +317,10 @@ type realRollingUpdaterClient struct {
client client.Interface
}

func (c *realRollingUpdaterClient) ListReplicationControllers(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error) {
return c.client.ReplicationControllers(namespace).List(selector)
}

func (c *realRollingUpdaterClient) GetReplicationController(namespace, name string) (*api.ReplicationController, error) {
return c.client.ReplicationControllers(namespace).Get(name)
}
Expand Down
149 changes: 149 additions & 0 deletions pkg/kubectl/rolling_updater_test.go
Expand Up @@ -20,12 +20,14 @@ import (
"bytes"
"fmt"
"io/ioutil"
"reflect"
"testing"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
)

Expand Down Expand Up @@ -380,15 +382,162 @@ func TestRollingUpdater_preserveCleanup(t *testing.T) {
}
}

func TestRename(t *testing.T) {
tests := []struct {
namespace string
newName string
oldName string
err error
expectError bool
}{
{
namespace: "default",
newName: "bar",
oldName: "foo",
},
{
namespace: "default",
newName: "bar",
oldName: "foo",
err: fmt.Errorf("Test Error"),
expectError: true,
},
}
for _, test := range tests {
fakeClient := &rollingUpdaterClientImpl{
CreateReplicationControllerFn: func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error) {
if namespace != test.namespace {
t.Errorf("unexepected namespace: %s, expected %s", namespace, test.namespace)
}
if rc.Name != test.newName {
t.Errorf("unexepected name: %s, expected %s", rc.Name, test.newName)
}
return rc, test.err
},
DeleteReplicationControllerFn: func(namespace, name string) error {
if namespace != test.namespace {
t.Errorf("unexepected namespace: %s, expected %s", namespace, test.namespace)
}
if name != test.oldName {
t.Errorf("unexepected name: %s, expected %s", name, test.oldName)
}
return nil
},
}
err := Rename(fakeClient, &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: test.namespace, Name: test.oldName}}, test.newName)
if err != nil && !test.expectError {
t.Errorf("unexpected error: %v", err)
}
if err == nil && test.expectError {
t.Errorf("unexpected non-error")
}
}
}

func TestFindSourceController(t *testing.T) {
ctrl1 := api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Annotations: map[string]string{
sourceIdAnnotation: "bar",
},
},
}
ctrl2 := api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Annotations: map[string]string{
sourceIdAnnotation: "foo",
},
},
}
ctrl3 := api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Annotations: map[string]string{
sourceIdAnnotation: "baz",
},
},
}
tests := []struct {
list *api.ReplicationControllerList
expectedController *api.ReplicationController
err error
name string
expectError bool
}{
{
list: &api.ReplicationControllerList{},
expectError: true,
},
{
list: &api.ReplicationControllerList{
Items: []api.ReplicationController{ctrl1},
},
name: "foo",
expectError: true,
},
{
list: &api.ReplicationControllerList{
Items: []api.ReplicationController{ctrl1},
},
name: "bar",
expectedController: &ctrl1,
},
{
list: &api.ReplicationControllerList{
Items: []api.ReplicationController{ctrl1, ctrl2},
},
name: "bar",
expectedController: &ctrl1,
},
{
list: &api.ReplicationControllerList{
Items: []api.ReplicationController{ctrl1, ctrl2},
},
name: "foo",
expectedController: &ctrl2,
},
{
list: &api.ReplicationControllerList{
Items: []api.ReplicationController{ctrl1, ctrl2, ctrl3},
},
name: "baz",
expectedController: &ctrl3,
},
}
for _, test := range tests {
fakeClient := rollingUpdaterClientImpl{
ListReplicationControllersFn: func(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error) {
return test.list, test.err
},
}
ctrl, err := FindSourceController(&fakeClient, "default", test.name)
if test.expectError && err == nil {
t.Errorf("unexpected non-error")
}
if !test.expectError && err != nil {
t.Errorf("unexpected error")
}
if !reflect.DeepEqual(ctrl, test.expectedController) {
t.Errorf("expected:\n%v\ngot:\n%v\n", test.expectedController, ctrl)
}
}
}

// rollingUpdaterClientImpl is a dynamic RollingUpdaterClient.
type rollingUpdaterClientImpl struct {
ListReplicationControllersFn func(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error)
GetReplicationControllerFn func(namespace, name string) (*api.ReplicationController, error)
UpdateReplicationControllerFn func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error)
CreateReplicationControllerFn func(namespace string, rc *api.ReplicationController) (*api.ReplicationController, error)
DeleteReplicationControllerFn func(namespace, name string) error
ControllerHasDesiredReplicasFn func(rc *api.ReplicationController) wait.ConditionFunc
}

func (c *rollingUpdaterClientImpl) ListReplicationControllers(namespace string, selector labels.Selector) (*api.ReplicationControllerList, error) {
return c.ListReplicationControllersFn(namespace, selector)
}

func (c *rollingUpdaterClientImpl) GetReplicationController(namespace, name string) (*api.ReplicationController, error) {
return c.GetReplicationControllerFn(namespace, name)
}
Expand Down

0 comments on commit 5d1a6b6

Please sign in to comment.