Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TTL support to etcd_helper in preparation for graceful delete #5516

Merged
merged 3 commits into from
Mar 16, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 0 additions & 5 deletions pkg/api/latest/latest.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@ var Codec = v1beta1.Codec
// accessor is the shared static metadata accessor for the API.
var accessor = meta.NewAccessor()

// ResourceVersioner describes a default versioner that can handle all types
// of versioning.
// TODO: when versioning changes, make this part of each API definition.
var ResourceVersioner = runtime.ResourceVersioner(accessor)

// SelfLinker can set or get the SelfLink field of all API types.
// TODO: when versioning changes, make this part of each API definition.
// TODO(lavalamp): Combine SelfLinker & ResourceVersioner interfaces, force all uses
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/latest/latest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

func TestResourceVersioner(t *testing.T) {
pod := internal.Pod{ObjectMeta: internal.ObjectMeta{ResourceVersion: "10"}}
version, err := ResourceVersioner.ResourceVersion(&pod)
version, err := accessor.ResourceVersion(&pod)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand All @@ -36,7 +36,7 @@ func TestResourceVersioner(t *testing.T) {
}

podList := internal.PodList{ListMeta: internal.ListMeta{ResourceVersion: "10"}}
version, err = ResourceVersioner.ResourceVersion(&podList)
version, err = accessor.ResourceVersion(&podList)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down
19 changes: 15 additions & 4 deletions pkg/api/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,27 @@ func HasObjectMetaSystemFieldValues(meta *ObjectMeta) bool {
len(meta.UID) != 0
}

// GetObjectMetaPtr returns a pointer to a provided object's ObjectMeta.
// ObjectMetaFor returns a pointer to a provided object's ObjectMeta.
// TODO: allow runtime.Unknown to extract this object
func ObjectMetaFor(obj runtime.Object) (*ObjectMeta, error) {
v, err := conversion.EnforcePtr(obj)
if err != nil {
return nil, err
}
var objectMeta *ObjectMeta
if err := runtime.FieldPtr(v, "ObjectMeta", &objectMeta); err != nil {
var meta *ObjectMeta
err = runtime.FieldPtr(v, "ObjectMeta", &meta)
return meta, err
}

// ListMetaFor returns a pointer to a provided object's ListMeta,
// or an error if the object does not have that pointer.
// TODO: allow runtime.Unknown to extract this object
func ListMetaFor(obj runtime.Object) (*ListMeta, error) {
v, err := conversion.EnforcePtr(obj)
if err != nil {
return nil, err
}
return objectMeta, nil
var meta *ListMeta
err = runtime.FieldPtr(v, "ListMeta", &meta)
return meta, err
}
2 changes: 1 addition & 1 deletion pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe
if err != nil {
return helper, err
}
return tools.EtcdHelper{client, versionInterfaces.Codec, tools.RuntimeVersionAdapter{versionInterfaces.MetadataAccessor}}, nil
return tools.NewEtcdHelper(client, versionInterfaces.Codec), nil
}

// setDefaults fills in any fields not set that are required to have valid data.
Expand Down
4 changes: 2 additions & 2 deletions pkg/registry/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,9 @@ func (r *Registry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) er
}
// TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop.
err = r.AtomicUpdate(key, &api.Endpoints{}, true,
func(input runtime.Object) (runtime.Object, error) {
func(input runtime.Object) (runtime.Object, uint64, error) {
// TODO: racy - label query is returning different results for two simultaneous updaters
return endpoints, nil
return endpoints, 0, nil
})
return etcderr.InterpretUpdateError(err, "endpoints", endpoints.Name)
}
Expand Down
20 changes: 13 additions & 7 deletions pkg/registry/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ import (
)

func NewTestEtcdRegistry(client tools.EtcdClient) *Registry {
registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}, nil)
registry := NewRegistry(tools.NewEtcdHelper(client, latest.Codec), nil)
return registry
}

func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry {
helper := tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}
helper := tools.NewEtcdHelper(client, latest.Codec)
podStorage, _, _ := podetcd.NewREST(helper)
registry := NewRegistry(helper, pod.NewRegistry(podStorage))
return registry
Expand Down Expand Up @@ -195,6 +195,7 @@ func TestEtcdDeleteController(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
key, _ := makeControllerKey(ctx, "foo")
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
err := registry.DeleteController(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -532,6 +533,11 @@ func TestEtcdDeleteService(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
key, _ := makeServiceKey(ctx, "foo")
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
endpointsKey, _ := makeServiceEndpointsKey(ctx, "foo")
fakeClient.Set(endpointsKey, runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Protocol: "TCP"}), 0)

err := registry.DeleteService(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
Expand All @@ -540,13 +546,11 @@ func TestEtcdDeleteService(t *testing.T) {
if len(fakeClient.DeletedKeys) != 2 {
t.Errorf("Expected 2 delete, found %#v", fakeClient.DeletedKeys)
}
key, _ := makeServiceKey(ctx, "foo")
if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
}
key, _ = makeServiceEndpointsKey(ctx, "foo")
if fakeClient.DeletedKeys[1] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[1], key)
if fakeClient.DeletedKeys[1] != endpointsKey {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[1], endpointsKey)
}
}

Expand Down Expand Up @@ -906,6 +910,9 @@ func TestEtcdDeleteMinion(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
key := "/registry/minions/foo"
fakeClient.Set("/registry/minions/foo", runtime.EncodeOrDie(latest.Codec, &api.Node{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)

err := registry.DeleteMinion(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
Expand All @@ -914,7 +921,6 @@ func TestEtcdDeleteMinion(t *testing.T) {
if len(fakeClient.DeletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
}
key := "/registry/minions/foo"
if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/event/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var testTTL uint64 = 60
func NewTestEventEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) {
f := tools.NewFakeEtcdClient(t)
f.TestIndex = true
h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}}
h := tools.NewEtcdHelper(f, testapi.Codec())
return f, NewEtcdRegistry(h, testTTL)
}

Expand Down
36 changes: 25 additions & 11 deletions pkg/registry/generic/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,35 +251,49 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool
// TODO: expose TTL
creating := false
out := e.NewFunc()
err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, error) {
version, err := e.Helper.ResourceVersioner.ResourceVersion(existing)
err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, uint64, error) {
version, err := e.Helper.Versioner.ObjectResourceVersion(existing)
if err != nil {
return nil, err
return nil, 0, err
}
if version == 0 {
if !e.UpdateStrategy.AllowCreateOnUpdate() {
return nil, kubeerr.NewNotFound(e.EndpointName, name)
return nil, 0, kubeerr.NewNotFound(e.EndpointName, name)
}
creating = true
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, err
return nil, 0, err
}
ttl := uint64(0)
if e.TTLFunc != nil {
ttl, err = e.TTLFunc(obj, true)
if err != nil {
return nil, 0, err
}
}
return obj, nil
return obj, ttl, nil
}

creating = false
newVersion, err := e.Helper.ResourceVersioner.ResourceVersion(obj)
newVersion, err := e.Helper.Versioner.ObjectResourceVersion(obj)
if err != nil {
return nil, err
return nil, 0, err
}
if newVersion != version {
// TODO: return the most recent version to a client?
return nil, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the resource was updated to %d", version))
return nil, 0, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the resource was updated to %d", version))
}
if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
return nil, err
return nil, 0, err
}
return obj, nil
ttl := uint64(0)
if e.TTLFunc != nil {
ttl, err = e.TTLFunc(obj, false)
if err != nil {
return nil, 0, err
}
}
return obj, ttl, nil
})

if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/registry/generic/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool {
func NewTestGenericEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, *Etcd) {
f := tools.NewFakeEtcdClient(t)
f.TestIndex = true
h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}}
h := tools.NewEtcdHelper(f, testapi.Codec())
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false}
return f, &Etcd{
NewFunc: func() runtime.Object { return &api.Pod{} },
Expand Down Expand Up @@ -624,16 +624,16 @@ func TestEtcdDelete(t *testing.T) {
"notExisting": {
existing: emptyNode,
expect: emptyNode,
errOK: func(err error) bool { return err == nil },
errOK: func(err error) bool { return errors.IsNotFound(err) },
},
}

for name, item := range table {
fakeClient, registry := NewTestGenericEtcdRegistry(t)
fakeClient.Data[path] = item.existing
_, err := registry.Delete(api.NewContext(), key)
obj, err := registry.Delete(api.NewContext(), key)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
t.Errorf("%v: unexpected error: %v (%#v)", name, err, obj)
}

if item.expect.E != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/limitrange/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
func NewTestLimitRangeEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) {
f := tools.NewFakeEtcdClient(t)
f.TestIndex = true
h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}}
h := tools.NewEtcdHelper(f, testapi.Codec())
return f, NewEtcdRegistry(h)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/namespace/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
fakeEtcdClient := tools.NewFakeEtcdClient(t)
fakeEtcdClient.TestIndex = true
helper := tools.EtcdHelper{Client: fakeEtcdClient, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}}
helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec)
return fakeEtcdClient, helper
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/registry/pod/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,18 +160,18 @@ func (r *BindingREST) setPodHostTo(ctx api.Context, podID, oldMachine, machine s
if err != nil {
return nil, err
}
err = r.store.Helper.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, error) {
err = r.store.Helper.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, uint64, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, fmt.Errorf("unexpected object: %#v", obj)
return nil, 0, fmt.Errorf("unexpected object: %#v", obj)
}
if pod.Spec.Host != oldMachine || pod.Status.Host != oldMachine {
return nil, fmt.Errorf("pod %v is already assigned to host %q or %q", pod.Name, pod.Spec.Host, pod.Status.Host)
return nil, 0, fmt.Errorf("pod %v is already assigned to host %q or %q", pod.Name, pod.Spec.Host, pod.Status.Host)
}
pod.Spec.Host = machine
pod.Status.Host = machine
finalPod = pod
return pod, nil
return pod, 0, nil
})
return finalPod, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/pod/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (f *fakeCache) ClearPodStatus(namespace, name string) {
func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
fakeEtcdClient := tools.NewFakeEtcdClient(t)
fakeEtcdClient.TestIndex = true
helper := tools.EtcdHelper{Client: fakeEtcdClient, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}}
helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec)
return fakeEtcdClient, helper
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/resourcequota/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
func newHelper(t *testing.T) (*tools.FakeEtcdClient, tools.EtcdHelper) {
fakeEtcdClient := tools.NewFakeEtcdClient(t)
fakeEtcdClient.TestIndex = true
helper := tools.EtcdHelper{Client: fakeEtcdClient, Codec: latest.Codec, ResourceVersioner: tools.RuntimeVersionAdapter{latest.ResourceVersioner}}
helper := tools.NewEtcdHelper(fakeEtcdClient, latest.Codec)
return fakeEtcdClient, helper
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/secret/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
func NewTestSecretEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) {
f := tools.NewFakeEtcdClient(t)
f.TestIndex = true
h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}}
h := tools.NewEtcdHelper(f, testapi.Codec())
return f, NewEtcdRegistry(h)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/tools/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Package tools implements general tools which depend on the api package.
// Package tools implements types which help work with etcd which depend on the api package.
// TODO: move this package to an etcd specific utility package.
package tools