Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Require versioner in etcdHelper to be non-null. #21310

Merged
merged 1 commit into from
Feb 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
88 changes: 34 additions & 54 deletions pkg/storage/etcd/etcd_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ type etcdHelper struct {
client etcd.KeysAPI
codec runtime.Codec
copier runtime.ObjectCopier
// optional, has to be set to perform any atomic operations
// Note that versioner is required for etcdHelper to work correctly.
// The public constructors (NewStorage & NewEtcdStorage) are setting it
// correctly, so be careful when manipulating with it manually.
versioner storage.Versioner
// prefix for all etcd keys
pathPrefix string
Expand Down Expand Up @@ -159,10 +161,8 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob
if err != nil {
return err
}
if h.versioner != nil {
if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion may not be set on objects to be created")
}
if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion may not be set on objects to be created")
}
trace.Step("Version checked")

Expand Down Expand Up @@ -193,21 +193,16 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec
}

version := uint64(0)
if h.versioner != nil {
var err error
if version, err = h.versioner.ObjectResourceVersion(obj); err != nil {
return errors.New("couldn't get resourceVersion from object")
}
if version != 0 {
// We cannot store object with resourceVersion in etcd, we need to clear it here.
if err := h.versioner.UpdateObject(obj, nil, 0); err != nil {
return errors.New("resourceVersion cannot be set on objects store in etcd")
}
var err error
if version, err = h.versioner.ObjectResourceVersion(obj); err != nil {
return errors.New("couldn't get resourceVersion from object")
}
if version != 0 {
// We cannot store object with resourceVersion in etcd, we need to clear it here.
if err := h.versioner.UpdateObject(obj, nil, 0); err != nil {
return errors.New("resourceVersion cannot be set on objects store in etcd")
}
}
// TODO: If versioner is nil, then we may end up with having ResourceVersion set
// in the object and this will be incorrect ResourceVersion. We should fix it by
// requiring "versioner != nil" at the constructor level for 1.3 milestone.

var response *etcd.Response
data, err := runtime.Encode(h.codec, obj)
Expand All @@ -217,19 +212,17 @@ func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Objec
key = h.prefixEtcdKey(key)

create := true
if h.versioner != nil {
if version != 0 {
create = false
startTime := time.Now()
opts := etcd.SetOptions{
TTL: time.Duration(ttl) * time.Second,
PrevIndex: version,
}
response, err = h.client.Set(ctx, key, string(data), &opts)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
if err != nil {
return err
}
if version != 0 {
create = false
startTime := time.Now()
opts := etcd.SetOptions{
TTL: time.Duration(ttl) * time.Second,
PrevIndex: version,
}
response, err = h.client.Set(ctx, key, string(data), &opts)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
if err != nil {
return err
}
}
if create {
Expand Down Expand Up @@ -372,10 +365,8 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
if out != objPtr {
return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
}
if h.versioner != nil {
_ = h.versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex)
// being unable to set the version does not prevent the object from being extracted
}
// being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex)
return body, node, err
}

Expand Down Expand Up @@ -414,10 +405,8 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F
return err
}
trace.Step("Object decoded")
if h.versioner != nil {
if err := h.versioner.UpdateList(listObj, response.Index); err != nil {
return err
}
if err := h.versioner.UpdateList(listObj, response.Index); err != nil {
return err
}
return nil
}
Expand Down Expand Up @@ -450,10 +439,8 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun
if err != nil {
return err
}
if h.versioner != nil {
// being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex)
}
// being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex)
if filter(obj) {
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
Expand Down Expand Up @@ -490,10 +477,8 @@ func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion strin
return err
}
trace.Step("Node list decoded")
if h.versioner != nil {
if err := h.versioner.UpdateList(listObj, index); err != nil {
return err
}
if err := h.versioner.UpdateList(listObj, index); err != nil {
return err
}
return nil
}
Expand Down Expand Up @@ -577,14 +562,9 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
}

// Since update object may have a resourceVersion set, we need to clear it here.
if h.versioner != nil {
if err := h.versioner.UpdateObject(ret, meta.Expiration, 0); err != nil {
return errors.New("resourceVersion cannot be set on objects store in etcd")
}
if err := h.versioner.UpdateObject(ret, meta.Expiration, 0); err != nil {
return errors.New("resourceVersion cannot be set on objects store in etcd")
}
// TODO: If versioner is nil, then we may end up with having ResourceVersion set
// in the object and this will be incorrect ResourceVersion. We should fix it by
// requiring "versioner != nil" at the constructor level for 1.3 milestone.

data, err := runtime.Encode(h.codec, ret)
if err != nil {
Expand Down
16 changes: 0 additions & 16 deletions pkg/storage/etcd/etcd_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,22 +352,6 @@ func TestSetWithVersion(t *testing.T) {
}
}

func TestSetWithoutResourceVersioner(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)
helper := newEtcdHelper(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix())
helper.versioner = nil
returnedObj := &api.Pod{}
err := helper.Set(context.TODO(), "/some/key", obj, returnedObj, 3)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
if returnedObj.ResourceVersion != "" {
t.Errorf("Resource revision should not be set on returned objects")
}
}

func TestSetNilOutParam(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
server := etcdtesting.NewEtcdTestClientServer(t)
Expand Down
20 changes: 12 additions & 8 deletions pkg/storage/etcd/etcd_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ func exceptKey(except string) includeFunc {

// etcdWatcher converts a native etcd watch to a watch.Interface.
type etcdWatcher struct {
encoding runtime.Codec
encoding runtime.Codec
// Note that versioner is required for etcdWatcher to work correctly.
// There is no public constructor of it, so be careful when manipulating
// with it manually.
versioner storage.Versioner
transform TransformFunc

Expand Down Expand Up @@ -108,9 +111,12 @@ type etcdWatcher struct {
// watchWaitDuration is the amount of time to wait for an error from watch.
const watchWaitDuration = 100 * time.Millisecond

// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, quorum bool, include includeFunc, filter storage.FilterFunc, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, cache etcdCache) *etcdWatcher {
// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes.
// The versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(
list bool, quorum bool, include includeFunc, filter storage.FilterFunc,
encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc,
cache etcdCache) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
Expand Down Expand Up @@ -310,10 +316,8 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
}

// ensure resource version is set on the object we load from etcd
if w.versioner != nil {
if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err))
}
if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err))
}

// perform any necessary transformation
Expand Down