Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to avoid Get to etcd in GuaranteedUpdate in Cacher #35415

Merged
merged 1 commit into from
Oct 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 15 additions & 1 deletion pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,21 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
}

// Implements storage.Interface.
func (c *Cacher) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, preconditions *Preconditions, tryUpdate UpdateFunc) error {
func (c *Cacher) GuaranteedUpdate(
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
preconditions *Preconditions, tryUpdate UpdateFunc, _ ...runtime.Object) error {
// Ignore the suggestion and try to pass down the current version of the object
// read from cache.
if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
glog.Errorf("GetByKey returned error: %v", err)
} else if exists {
currObj, copyErr := api.Scheme.Copy(elem.(*storeElement).Object)
if copyErr == nil {
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate, currObj)
}
glog.Errorf("couldn't copy object: %v", copyErr)
}
// If we couldn't get the object, fallback to no-suggestion.
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/etcd/etcd_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,10 @@ func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node
}

// Implements storage.Interface.
func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc) error {
func (h *etcdHelper) GuaranteedUpdate(
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, _ ...runtime.Object) error {
// Ignore the suggestion about current object.
if ctx == nil {
glog.Errorf("Context is nil")
}
Expand Down
57 changes: 49 additions & 8 deletions pkg/storage/etcd3/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,22 +211,33 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O
}

// GuaranteedUpdate implements storage.Interface.GuaranteedUpdate.
func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool, precondtions *storage.Preconditions, tryUpdate storage.UpdateFunc) error {
func (s *store) GuaranteedUpdate(
ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
precondtions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
v, err := conversion.EnforcePtr(out)
if err != nil {
panic("unable to convert output object to pointer")
}
key = keyWithPrefix(s.pathPrefix, key)
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
if err != nil {
return err
}
for {
origState, err := s.getState(getResp, key, v, ignoreNotFound)

var origState *objState
if len(suggestion) == 1 && suggestion[0] != nil {
origState, err = s.getStateFromObject(suggestion[0])
if err != nil {
return err
}
} else {
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
if err != nil {
return err
}
origState, err = s.getState(getResp, key, v, ignoreNotFound)
if err != nil {
return err
}
}

for {
if err := checkPreconditions(key, precondtions, origState.obj); err != nil {
return err
}
Expand Down Expand Up @@ -260,8 +271,12 @@ func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Ob
return err
}
if !txnResp.Succeeded {
getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
glog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
origState, err = s.getState(getResp, key, v, ignoreNotFound)
if err != nil {
return err
}
continue
}
putResp := txnResp.Responses[0].GetResponsePut()
Expand Down Expand Up @@ -369,6 +384,32 @@ func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Va
return state, nil
}

func (s *store) getStateFromObject(obj runtime.Object) (*objState, error) {
state := &objState{
obj: obj,
meta: &storage.ResponseMeta{},
}

rv, err := s.versioner.ObjectResourceVersion(obj)
if err != nil {
return nil, fmt.Errorf("couldn't get resource version: %v", err)
}
state.rev = int64(rv)
state.meta.ResourceVersion = uint64(state.rev)

// Compute the serialized form - for that we need to temporarily clean
// its resource version field (those are not stored in etcd).
if err := s.versioner.UpdateObject(obj, 0); err != nil {
return nil, errors.New("resourceVersion cannot be set on objects store in etcd")
}
state.data, err = runtime.Encode(s.codec, obj)
if err != nil {
return nil, err
}
s.versioner.UpdateObject(state.obj, uint64(rv))
return state, nil
}

func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, uint64, error) {
ret, ttlPtr, err := userUpdate(st.obj, *st.meta)
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ type Interface interface {
// or zero value in 'ptrToType' parameter otherwise.
// If the object to update has the same value as previous, it won't do any update
// but will return the object in 'ptrToType' parameter.
// If 'suggestion' can contain zero or one element - in such case this can be used as
// a suggestion about the current version of the object to avoid read operation from
// storage to get it.
//
// Example:
//
Expand All @@ -166,5 +169,7 @@ type Interface interface {
// return cur, nil, nil
// }
// })
GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, precondtions *Preconditions, tryUpdate UpdateFunc) error
GuaranteedUpdate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... This approach has a limitation that we have to put suggestion always at the end of the arg list. I have no strong opinion on this though... GU is not clean already. We really should clean this up at some point.

/cc @hongchaodeng

ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error
}