Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add update to the pod etcd handler. #1713

Merged
merged 1 commit into from
Oct 14, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions pkg/api/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package validation

import (
"reflect"
"strings"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
Expand Down Expand Up @@ -338,6 +339,30 @@ func ValidatePod(pod *api.Pod) errs.ErrorList {
return allErrs
}

// ValidatePodUpdate tests to see if the update is legal
func ValidatePodUpdate(newPod, oldPod *api.Pod) errs.ErrorList {
allErrs := errs.ErrorList{}

if len(newPod.DesiredState.Manifest.Containers) != len(oldPod.DesiredState.Manifest.Containers) {
allErrs = append(allErrs, errs.NewFieldInvalid("DesiredState.Manifest.Containers", newPod.DesiredState.Manifest.Containers))
return allErrs
}
pod := *newPod
pod.Labels = oldPod.Labels
pod.TypeMeta.ResourceVersion = oldPod.TypeMeta.ResourceVersion
// Tricky, we need to copy the container list so that we don't overwrite the update
var newContainers []api.Container
for ix, container := range pod.DesiredState.Manifest.Containers {
container.Image = oldPod.DesiredState.Manifest.Containers[ix].Image
newContainers = append(newContainers, container)
}
pod.DesiredState.Manifest.Containers = newContainers
if !reflect.DeepEqual(&pod, oldPod) {
allErrs = append(allErrs, errs.NewFieldInvalid("DesiredState.Manifest.Containers", newPod.DesiredState.Manifest.Containers))
}
return allErrs
}

// ValidateService tests if required fields in the service are set.
func ValidateService(service *api.Service) errs.ErrorList {
allErrs := errs.ErrorList{}
Expand Down
173 changes: 173 additions & 0 deletions pkg/api/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,179 @@ func TestValidatePod(t *testing.T) {
}
}

func TestValidatePodUpdate(t *testing.T) {
tests := []struct {
a api.Pod
b api.Pod
isValid bool
test string
}{
{api.Pod{}, api.Pod{}, true, "nothing"},
{
api.Pod{
TypeMeta: api.TypeMeta{ID: "foo"},
},
api.Pod{
TypeMeta: api.TypeMeta{ID: "bar"},
},
false,
"ids",
},
{
api.Pod{
TypeMeta: api.TypeMeta{ID: "foo"},
Labels: map[string]string{
"foo": "bar",
},
},
api.Pod{
TypeMeta: api.TypeMeta{ID: "foo"},
Labels: map[string]string{
"bar": "foo",
},
},
true,
"labels",
},
{
api.Pod{
TypeMeta: api.TypeMeta{ID: "foo"},
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
{
Image: "foo:V1",
},
},
},
},
},
api.Pod{
TypeMeta: api.TypeMeta{ID: "foo"},
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
{
Image: "foo:V2",
},
{
Image: "bar:V2",
},
},
},
},
},
false,
"more containers",
},
{
api.Pod{
TypeMeta: api.TypeMeta{ID: "foo"},
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
{
Image: "foo:V1",
},
},
},
},
},
api.Pod{
TypeMeta: api.TypeMeta{ID: "foo"},
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
{
Image: "foo:V2",
},
},
},
},
},
true,
"image change",
},
{
api.Pod{
TypeMeta: api.TypeMeta{ID: "foo"},
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
{
Image: "foo:V1",
CPU: 100,
},
},
},
},
},
api.Pod{
TypeMeta: api.TypeMeta{ID: "foo"},
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
{
Image: "foo:V2",
CPU: 1000,
},
},
},
},
},
false,
"cpu change",
},
{
api.Pod{
TypeMeta: api.TypeMeta{ID: "foo"},
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
{
Image: "foo:V1",
Ports: []api.Port{
{HostPort: 8080, ContainerPort: 80},
},
},
},
},
},
},
api.Pod{
TypeMeta: api.TypeMeta{ID: "foo"},
DesiredState: api.PodState{
Manifest: api.ContainerManifest{
Containers: []api.Container{
{
Image: "foo:V2",
Ports: []api.Port{
{HostPort: 8000, ContainerPort: 80},
},
},
},
},
},
},
false,
"port change",
},
}

for _, test := range tests {
errs := ValidatePodUpdate(&test.a, &test.b)
if test.isValid {
if len(errs) != 0 {
t.Errorf("unexpected invalid: %s %v, %v", test.test, test.a, test.b)
}
} else {
if len(errs) == 0 {
t.Errorf("unexpected valid: %s %v, %v", test.test, test.a, test.b)
}
}
}
}

func TestValidateService(t *testing.T) {
testCases := []struct {
name string
Expand Down
41 changes: 40 additions & 1 deletion pkg/registry/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"strconv"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/constraint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
Expand Down Expand Up @@ -198,7 +200,44 @@ func (r *Registry) assignPod(podID string, machine string) error {
}

func (r *Registry) UpdatePod(ctx api.Context, pod *api.Pod) error {
return fmt.Errorf("unimplemented!")
var podOut api.Pod
podKey := makePodKey(pod.ID)
err := r.EtcdHelper.ExtractObj(podKey, &podOut, false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From here to the set should be in an AtomicUpdate function. As written it's racy. The atomic update thingy will do the read for you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not racy - it'll reject the write if the version changes in between (SetObj compareandswaps), whereas atomic just tries to keep merging. So technically this path is more correct right now than AtomicUpdate which should really be AtomicMergeHelper

----- Original Message -----

@@ -198,7 +198,37 @@ func (r *Registry) assignPod(podID string, machine
string) error {
}

func (r *Registry) UpdatePod(ctx api.Context, pod *api.Pod) error {

  • return fmt.Errorf("unimplemented!")
  • var podOut api.Pod
  • podKey := makePodKey(pod.ID)
  • err := r.EtcdHelper.ExtractObj(podKey, &podOut, false)

From here to the set should be in an AtomicUpdate function. As written it's
racy. The atomic update thingy will do the read for you.


Reply to this email directly or view it on GitHub:
https://github.com/GoogleCloudPlatform/kubernetes/pull/1713/files#r18782464

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok. Should be exactly the same behavior, then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, actually I want the behavior. I don't want it to keep trying to do the update, it should try once, and then reject if the update failed.

if err != nil {
return err
}
scheduled := podOut.DesiredState.Host != ""
if scheduled {
pod.DesiredState.Host = podOut.DesiredState.Host
// If it's already been scheduled, limit the types of updates we'll accept.
errs := validation.ValidatePodUpdate(pod, &podOut)
if len(errs) != 0 {
return errors.NewInvalid("Pod", pod.ID, errs)
}
}
// There's no race with the scheduler, because either this write will fail because the host
// has been updated, or the host update will fail because this pod has been updated.
err = r.EtcdHelper.SetObj(podKey, pod)
if err != nil {
return err
}
if !scheduled {
// never scheduled, just update.
return nil
}
containerKey := makeContainerKey(podOut.DesiredState.Host)
return r.AtomicUpdate(containerKey, &api.ContainerManifestList{}, func(in runtime.Object) (runtime.Object, error) {
manifests := in.(*api.ContainerManifestList)
for ix := range manifests.Items {
if manifests.Items[ix].ID == pod.ID {
manifests.Items[ix] = pod.DesiredState.Manifest
return manifests, nil
}
}
// This really shouldn't happen
glog.Warningf("Couldn't find: %s in %#v", pod.ID, manifests)
return manifests, fmt.Errorf("Failed to update pod, couldn't find %s in %#v", pod.ID, manifests)
})
}

// DeletePod deletes an existing pod specified by its ID.
Expand Down