Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move etcd_util.go to separate package #17414

Merged
merged 1 commit into from
Nov 25, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions cluster/addons/dns/kube2sky/kube2sky.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
kframework "k8s.io/kubernetes/pkg/controller/framework"
kselector "k8s.io/kubernetes/pkg/fields"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
)
Expand Down Expand Up @@ -418,7 +418,7 @@ func newEtcdClient(etcdServer string) (*etcd.Client, error) {
err error
)
for attempt := 1; attempt <= maxConnectAttempts; attempt++ {
if _, err = etcdstorage.GetEtcdVersion(etcdServer); err == nil {
if _, err = etcdutil.GetEtcdVersion(etcdServer); err == nil {
break
}
if attempt == maxConnectAttempts {
Expand Down
14 changes: 7 additions & 7 deletions contrib/mesos/pkg/election/etcd_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch"
Expand Down Expand Up @@ -91,10 +91,10 @@ func (e *etcdMasterElector) extendMaster(path, id string, ttl uint64, res *etcd.
// We don't handle the TTL delete w/o a write case here, it's handled in the next loop
// iteration.
_, err := e.etcd.CompareAndSwap(path, id, ttl, "", res.Node.ModifiedIndex)
if err != nil && !etcdstorage.IsEtcdTestFailed(err) {
if err != nil && !etcdutil.IsEtcdTestFailed(err) {
return "", err
}
if err != nil && etcdstorage.IsEtcdTestFailed(err) {
if err != nil && etcdutil.IsEtcdTestFailed(err) {
return "", nil
}
return id, nil
Expand All @@ -106,11 +106,11 @@ func (e *etcdMasterElector) extendMaster(path, id string, ttl uint64, res *etcd.
// returns "", err if an error occurred
func (e *etcdMasterElector) becomeMaster(path, id string, ttl uint64) (string, error) {
_, err := e.etcd.Create(path, id, ttl)
if err != nil && !etcdstorage.IsEtcdNodeExist(err) {
if err != nil && !etcdutil.IsEtcdNodeExist(err) {
// unexpected error
return "", err
}
if err != nil && etcdstorage.IsEtcdNodeExist(err) {
if err != nil && etcdutil.IsEtcdNodeExist(err) {
return "", nil
}
return id, nil
Expand All @@ -125,12 +125,12 @@ func (e *etcdMasterElector) handleMaster(path, id string, ttl uint64) (string, e
res, err := e.etcd.Get(path, false, false)

// Unexpected error, bail out
if err != nil && !etcdstorage.IsEtcdNotFound(err) {
if err != nil && !etcdutil.IsEtcdNotFound(err) {
return "", err
}

// There is no master, try to become the master.
if err != nil && etcdstorage.IsEtcdNotFound(err) {
if err != nil && etcdutil.IsEtcdNotFound(err) {
return e.becomeMaster(path, id, ttl)
}

Expand Down
6 changes: 3 additions & 3 deletions contrib/mesos/pkg/scheduler/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ import (
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/master/ports"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/tools"

// lock to this API version, compilation will fail when this becomes unsupported
Expand Down Expand Up @@ -894,7 +894,7 @@ func (s *SchedulerServer) buildFrameworkInfo() (info *mesos.FrameworkInfo, cred
func (s *SchedulerServer) fetchFrameworkID(client tools.EtcdClient) (*mesos.FrameworkID, error) {
if s.failoverTimeout > 0 {
if response, err := client.Get(meta.FrameworkIDKey, false, false); err != nil {
if !etcdstorage.IsEtcdNotFound(err) {
if !etcdutil.IsEtcdNotFound(err) {
return nil, fmt.Errorf("unexpected failure attempting to load framework ID from etcd: %v", err)
}
log.V(1).Infof("did not find framework ID in etcd")
Expand All @@ -905,7 +905,7 @@ func (s *SchedulerServer) fetchFrameworkID(client tools.EtcdClient) (*mesos.Fram
} else {
//TODO(jdef) this seems like a totally hackish way to clean up the framework ID
if _, err := client.Delete(meta.FrameworkIDKey, true); err != nil {
if !etcdstorage.IsEtcdNotFound(err) {
if !etcdutil.IsEtcdNotFound(err) {
return nil, fmt.Errorf("failed to delete framework ID from etcd: %v", err)
}
log.V(1).Infof("nothing to delete: did not find framework ID in etcd")
Expand Down
22 changes: 11 additions & 11 deletions pkg/api/errors/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ package etcd

import (
"k8s.io/kubernetes/pkg/api/errors"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
)

// InterpretListError converts a generic etcd error on a retrieval
// operation into the appropriate API error.
func InterpretListError(err error, kind string) error {
switch {
case etcdstorage.IsEtcdNotFound(err):
case etcdutil.IsEtcdNotFound(err):
return errors.NewNotFound(kind, "")
case etcdstorage.IsEtcdUnreachable(err):
case etcdutil.IsEtcdUnreachable(err):
return errors.NewServerTimeout(kind, "list", 2) // TODO: make configurable or handled at a higher level
default:
return err
Expand All @@ -38,9 +38,9 @@ func InterpretListError(err error, kind string) error {
// operation into the appropriate API error.
func InterpretGetError(err error, kind, name string) error {
switch {
case etcdstorage.IsEtcdNotFound(err):
case etcdutil.IsEtcdNotFound(err):
return errors.NewNotFound(kind, name)
case etcdstorage.IsEtcdUnreachable(err):
case etcdutil.IsEtcdUnreachable(err):
return errors.NewServerTimeout(kind, "get", 2) // TODO: make configurable or handled at a higher level
default:
return err
Expand All @@ -51,9 +51,9 @@ func InterpretGetError(err error, kind, name string) error {
// operation into the appropriate API error.
func InterpretCreateError(err error, kind, name string) error {
switch {
case etcdstorage.IsEtcdNodeExist(err):
case etcdutil.IsEtcdNodeExist(err):
return errors.NewAlreadyExists(kind, name)
case etcdstorage.IsEtcdUnreachable(err):
case etcdutil.IsEtcdUnreachable(err):
return errors.NewServerTimeout(kind, "create", 2) // TODO: make configurable or handled at a higher level
default:
return err
Expand All @@ -64,9 +64,9 @@ func InterpretCreateError(err error, kind, name string) error {
// operation into the appropriate API error.
func InterpretUpdateError(err error, kind, name string) error {
switch {
case etcdstorage.IsEtcdTestFailed(err), etcdstorage.IsEtcdNodeExist(err):
case etcdutil.IsEtcdTestFailed(err), etcdutil.IsEtcdNodeExist(err):
return errors.NewConflict(kind, name, err)
case etcdstorage.IsEtcdUnreachable(err):
case etcdutil.IsEtcdUnreachable(err):
return errors.NewServerTimeout(kind, "update", 2) // TODO: make configurable or handled at a higher level
default:
return err
Expand All @@ -77,9 +77,9 @@ func InterpretUpdateError(err error, kind, name string) error {
// operation into the appropriate API error.
func InterpretDeleteError(err error, kind, name string) error {
switch {
case etcdstorage.IsEtcdNotFound(err):
case etcdutil.IsEtcdNotFound(err):
return errors.NewNotFound(kind, name)
case etcdstorage.IsEtcdUnreachable(err):
case etcdutil.IsEtcdUnreachable(err):
return errors.NewServerTimeout(kind, "delete", 2) // TODO: make configurable or handled at a higher level
default:
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/apiserver/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"net/http"

"k8s.io/kubernetes/pkg/api/unversioned"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/util"
)

Expand Down Expand Up @@ -52,7 +52,7 @@ func errToAPIStatus(err error) *unversioned.Status {
status := http.StatusInternalServerError
switch {
//TODO: replace me with NewConflictErr
case etcdstorage.IsEtcdTestFailed(err):
case etcdutil.IsEtcdTestFailed(err):
status = http.StatusConflict
}
// Log errors that were not converted to an error status
Expand Down
3 changes: 2 additions & 1 deletion pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import (
thirdpartyresourcedataetcd "k8s.io/kubernetes/pkg/registry/thirdpartyresourcedata/etcd"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/ui"
"k8s.io/kubernetes/pkg/util"
Expand Down Expand Up @@ -855,7 +856,7 @@ func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {
addr = etcdUrl.Host
port = 4001
}
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{Addr: addr, Port: port, Path: "/health", Validate: etcdstorage.EtcdHealthCheck}
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{Addr: addr, Port: port, Path: "/health", Validate: etcdutil.EtcdHealthCheck}
}
return serversToValidate
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/master/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/intstr"

Expand Down Expand Up @@ -806,7 +807,7 @@ func testInstallThirdPartyAPIDeleteVersion(t *testing.T, version string) {
thirdPartyObj := extensions.ThirdPartyResourceData{}
err = master.thirdPartyStorage.Get(
context.TODO(), expectedDeletedKey, &thirdPartyObj, false)
if !etcdstorage.IsEtcdNotFound(err) {
if !etcdutil.IsEtcdNotFound(err) {
t.Errorf("expected deletion didn't happen: %v", err)
}
}
Expand Down Expand Up @@ -893,7 +894,7 @@ func testInstallThirdPartyResourceRemove(t *testing.T, version string) {
for _, key := range expectedDeletedKeys {
thirdPartyObj := extensions.ThirdPartyResourceData{}
err := master.thirdPartyStorage.Get(context.TODO(), key, &thirdPartyObj, false)
if !etcdstorage.IsEtcdNotFound(err) {
if !etcdutil.IsEtcdNotFound(err) {
t.Errorf("expected deletion didn't happen: %v", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/registry/service/allocator/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"k8s.io/kubernetes/pkg/registry/service/allocator"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"

"golang.org/x/net/context"
)
Expand Down Expand Up @@ -174,7 +174,7 @@ func (e *Etcd) Refresh() (*api.RangeAllocation, error) {

existing := &api.RangeAllocation{}
if err := e.storage.Get(context.TODO(), e.baseKey, existing, false); err != nil {
if etcdstorage.IsEtcdNotFound(err) {
if etcdutil.IsEtcdNotFound(err) {
return nil, nil
}
return nil, etcderr.InterpretGetError(err, e.kind, "")
Expand Down
19 changes: 10 additions & 9 deletions pkg/storage/etcd/etcd_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd/metrics"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch"
Expand Down Expand Up @@ -179,7 +180,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object)
startTime := time.Now()
response, err := h.client.Delete(key, false)
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if !IsEtcdNotFound(err) {
if !etcdutil.IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update out.
if err != nil || response.PrevNode != nil {
_, _, err = h.extractObj(response, err, out, false, true)
Expand Down Expand Up @@ -230,7 +231,7 @@ func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr r
response, err := h.client.Get(key, false, false)
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)

if err != nil && !IsEtcdNotFound(err) {
if err != nil && !etcdutil.IsEtcdNotFound(err) {
return "", nil, nil, err
}
body, node, err = h.extractObj(response, err, objPtr, ignoreNotFound, false)
Expand Down Expand Up @@ -284,7 +285,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
trace.Step("Etcd node read")
if err != nil {
if IsEtcdNotFound(err) {
if etcdutil.IsEtcdNotFound(err) {
return nil
}
return err
Expand Down Expand Up @@ -387,12 +388,12 @@ func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node
}
result, err := h.client.Get(key, true, true)
if err != nil {
index, ok := etcdErrorIndex(err)
if !ok {
index = 0
var index uint64
if etcdError, ok := err.(*etcd.EtcdError); ok {
index = etcdError.Index
}
nodes := make([]*etcd.Node, 0)
if IsEtcdNotFound(err) {
if etcdutil.IsEtcdNotFound(err) {
return nodes, index, nil
} else {
return nodes, index, err
Expand Down Expand Up @@ -464,7 +465,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
startTime := time.Now()
response, err := h.client.Create(key, string(data), ttl)
metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
if IsEtcdNodeExist(err) {
if etcdutil.IsEtcdNodeExist(err) {
continue
}
_, _, err = h.extractObj(response, err, ptrToType, false, false)
Expand All @@ -479,7 +480,7 @@ func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType
// Swap origBody with data, if origBody is the latest etcd data.
response, err := h.client.CompareAndSwap(key, string(data), ttl, origBody, index)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
if IsEtcdTestFailed(err) {
if etcdutil.IsEtcdTestFailed(err) {
// Try again.
continue
}
Expand Down