Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "Update Event" to Kubernetes API #4157

Merged
merged 1 commit into from
Feb 5, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/api/errors/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func InterpretCreateError(err error, kind, name string) error {
}
}

// InterpretUpdateError converts a generic etcd error on a create
// InterpretUpdateError converts a generic etcd error on a update
// operation into the appropriate API error.
func InterpretUpdateError(err error, kind, name string) error {
switch {
Expand All @@ -54,7 +54,7 @@ func InterpretUpdateError(err error, kind, name string) error {
}
}

// InterpretDeleteError converts a generic etcd error on a create
// InterpretDeleteError converts a generic etcd error on a delete
// operation into the appropriate API error.
func InterpretDeleteError(err error, kind, name string) error {
switch {
Expand Down
8 changes: 4 additions & 4 deletions pkg/registry/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (r *Registry) UpdatePod(ctx api.Context, pod *api.Pod) error {
}
// There's no race with the scheduler, because either this write will fail because the host
// has been updated, or the host update will fail because this pod has been updated.
err = r.EtcdHelper.SetObj(podKey, pod)
err = r.EtcdHelper.SetObj(podKey, pod, 0 /* ttl */)
if err != nil {
return err
}
Expand Down Expand Up @@ -404,7 +404,7 @@ func (r *Registry) UpdateController(ctx api.Context, controller *api.Replication
if err != nil {
return err
}
err = r.SetObj(key, controller)
err = r.SetObj(key, controller, 0 /* ttl */)
return etcderr.InterpretUpdateError(err, "replicationController", controller.Name)
}

Expand Down Expand Up @@ -512,7 +512,7 @@ func (r *Registry) UpdateService(ctx api.Context, svc *api.Service) error {
if err != nil {
return err
}
err = r.SetObj(key, svc)
err = r.SetObj(key, svc, 0 /* ttl */)
return etcderr.InterpretUpdateError(err, "service", svc.Name)
}

Expand Down Expand Up @@ -605,7 +605,7 @@ func (r *Registry) CreateMinion(ctx api.Context, minion *api.Node) error {

func (r *Registry) UpdateMinion(ctx api.Context, minion *api.Node) error {
// TODO: Add some validations.
err := r.SetObj(makeNodeKey(minion.Name), minion)
err := r.SetObj(makeNodeKey(minion.Name), minion, 0 /* ttl */)
return etcderr.InterpretUpdateError(err, "minion", minion.Name)
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/registry/event/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ func (r registry) Create(ctx api.Context, id string, obj runtime.Object) error {
return etcderr.InterpretCreateError(err, r.Etcd.EndpointName, id)
}

// Update replaces an existing instance of the object, and sets a ttl so that the event
// doesn't stay in the system forever.
func (r registry) Update(ctx api.Context, id string, obj runtime.Object) error {
key, err := r.Etcd.KeyFunc(ctx, id)
if err != nil {
return err
}
err = r.Etcd.Helper.SetObj(key, obj, r.ttl)
return etcderr.InterpretUpdateError(err, r.Etcd.EndpointName, id)
}

// NewEtcdRegistry returns a registry which will store Events in the given
// EtcdHelper. ttl is the time that Events will be retained by the system.
func NewEtcdRegistry(h tools.EtcdHelper, ttl uint64) generic.Registry {
Expand Down
102 changes: 102 additions & 0 deletions pkg/registry/event/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,105 @@ func TestEventCreate(t *testing.T) {
}
}
}

func TestEventUpdate(t *testing.T) {
eventA := &api.Event{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Reason: "forTesting",
}
eventB := &api.Event{
ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: api.NamespaceDefault},
Reason: "for testing again",
}
eventC := &api.Event{
ObjectMeta: api.ObjectMeta{Name: "pan", Namespace: api.NamespaceDefault, ResourceVersion: "1"},
Reason: "for testing again something else",
}

nodeWithEventA := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), eventA),
ModifiedIndex: 1,
CreatedIndex: 1,
TTL: int64(testTTL),
},
},
E: nil,
}

nodeWithEventB := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), eventB),
ModifiedIndex: 1,
CreatedIndex: 1,
TTL: int64(testTTL),
},
},
E: nil,
}

nodeWithEventC := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), eventC),
ModifiedIndex: 1,
CreatedIndex: 1,
TTL: int64(testTTL),
},
},
E: nil,
}

emptyNode := tools.EtcdResponseWithError{
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
}

ctx := api.NewDefaultContext()
key := "foo"
path, err := etcdgeneric.NamespaceKeyFunc(ctx, "/registry/events", key)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

table := map[string]struct {
existing tools.EtcdResponseWithError
expect tools.EtcdResponseWithError
toUpdate runtime.Object
errOK func(error) bool
}{
"doesNotExist": {
existing: emptyNode,
expect: nodeWithEventA,
toUpdate: eventA,
errOK: func(err error) bool { return err == nil },
},
"doesNotExist2": {
existing: emptyNode,
expect: nodeWithEventB,
toUpdate: eventB,
errOK: func(err error) bool { return err == nil },
},
"replaceExisting": {
existing: nodeWithEventA,
expect: nodeWithEventC,
toUpdate: eventC,
errOK: func(err error) bool { return err == nil },
},
}

for name, item := range table {
fakeClient, registry := NewTestEventEtcdRegistry(t)
fakeClient.Data[path] = item.existing
err := registry.Update(ctx, key, item.toUpdate)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
}

if e, a := item.expect, fakeClient.Data[path]; !reflect.DeepEqual(e, a) {
t.Errorf("%v:\n%s", name, util.ObjectGoPrintDiff(e, a))
}
}
}
25 changes: 25 additions & 0 deletions pkg/registry/event/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,31 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE
}), nil
}

// Update replaces an existing Event instance in storage.registry, with the given instance.
func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) {
event, ok := obj.(*api.Event)
if !ok {
return nil, fmt.Errorf("not an event object: %#v", obj)
}
if api.Namespace(ctx) != "" {
if !api.ValidNamespace(ctx, &event.ObjectMeta) {
return nil, errors.NewConflict("event", event.Namespace, fmt.Errorf("event.namespace does not match the provided context"))
}
}
if errs := validation.ValidateEvent(event); len(errs) > 0 {
return nil, errors.NewInvalid("event", event.Name, errs)
}
api.FillObjectMetaSystemFields(ctx, &event.ObjectMeta)

return apiserver.MakeAsync(func() (runtime.Object, error) {
err := rs.registry.Update(ctx, event.Name, event)
if err != nil {
return nil, err
}
return rs.registry.Get(ctx, event.Name)
}), nil
}

func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, error) {
obj, err := rs.registry.Get(ctx, id)
if err != nil {
Expand Down
31 changes: 31 additions & 0 deletions pkg/registry/event/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,37 @@ func TestRESTCreate(t *testing.T) {
}
}

func TestRESTUpdate(t *testing.T) {
_, rest := NewTestREST()
eventA := testEvent("foo")
c, err := rest.Create(api.NewDefaultContext(), eventA)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
<-c
got, err := rest.Get(api.NewDefaultContext(), eventA.Name)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
if e, a := eventA, got; !reflect.DeepEqual(e, a) {
t.Errorf("diff: %s", util.ObjectDiff(e, a))
}
eventB := testEvent("bar")
u, err := rest.Update(api.NewDefaultContext(), eventB)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
<-u
got2, err := rest.Get(api.NewDefaultContext(), eventB.Name)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
if e, a := eventB, got2; !reflect.DeepEqual(e, a) {
t.Errorf("diff: %s", util.ObjectDiff(e, a))
}

}

func TestRESTDelete(t *testing.T) {
_, rest := NewTestREST()
eventA := testEvent("foo")
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/generic/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (e *Etcd) Update(ctx api.Context, id string, obj runtime.Object) error {
return err
}
// TODO: verify that SetObj checks ResourceVersion before succeeding.
err = e.Helper.SetObj(key, obj)
err = e.Helper.SetObj(key, obj, 0 /* ttl */)
return etcderr.InterpretUpdateError(err, e.EndpointName, id)
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/tools/etcd_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,22 +281,22 @@ func (h *EtcdHelper) Delete(key string, recursive bool) error {
return err
}

// SetObj marshals obj via json, and stores under key. Will do an
// atomic update if obj's ResourceVersion field is set.
func (h *EtcdHelper) SetObj(key string, obj runtime.Object) error {
// SetObj marshals obj via json, and stores under key. Will do an atomic update if obj's ResourceVersion
// field is set. 'ttl' is time-to-live in seconds, and 0 means forever.
func (h *EtcdHelper) SetObj(key string, obj runtime.Object, ttl uint64) error {
data, err := h.Codec.Encode(obj)
if err != nil {
return err
}
if h.ResourceVersioner != nil {
if version, err := h.ResourceVersioner.ResourceVersion(obj); err == nil && version != 0 {
_, err = h.Client.CompareAndSwap(key, string(data), 0, "", version)
_, err = h.Client.CompareAndSwap(key, string(data), ttl, "", version)
return err // err is shadowed!
}
}

// Create will fail if a key already exists.
_, err = h.Client.Create(key, string(data), 0)
_, err = h.Client.Create(key, string(data), ttl)
return err
}

Expand Down
16 changes: 13 additions & 3 deletions pkg/tools/etcd_tools_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func TestSetObj(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := NewFakeEtcdClient(t)
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
err := helper.SetObj("/some/key", obj)
err := helper.SetObj("/some/key", obj, 5)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
Expand All @@ -388,6 +388,10 @@ func TestSetObj(t *testing.T) {
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)
}
if e, a := uint64(5), fakeClient.LastSetTTL; e != a {
t.Errorf("Wanted %v, got %v", e, a)
}

}

func TestSetObjWithVersion(t *testing.T) {
Expand All @@ -404,7 +408,7 @@ func TestSetObjWithVersion(t *testing.T) {
}

helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
err := helper.SetObj("/some/key", obj)
err := helper.SetObj("/some/key", obj, 7)
if err != nil {
t.Fatalf("Unexpected error %#v", err)
}
Expand All @@ -417,13 +421,16 @@ func TestSetObjWithVersion(t *testing.T) {
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)
}
if e, a := uint64(7), fakeClient.LastSetTTL; e != a {
t.Errorf("Wanted %v, got %v", e, a)
}
}

func TestSetObjWithoutResourceVersioner(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := NewFakeEtcdClient(t)
helper := EtcdHelper{fakeClient, testapi.Codec(), nil}
err := helper.SetObj("/some/key", obj)
err := helper.SetObj("/some/key", obj, 3)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
Expand All @@ -436,6 +443,9 @@ func TestSetObjWithoutResourceVersioner(t *testing.T) {
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)
}
if e, a := uint64(3), fakeClient.LastSetTTL; e != a {
t.Errorf("Wanted %v, got %v", e, a)
}
}

func TestAtomicUpdate(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/tools/fake_etcd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Respons
Value: value,
CreatedIndex: createdIndex,
ModifiedIndex: i,
TTL: int64(ttl),
},
},
}
Expand Down