Skip to content
This repository has been archived by the owner on Nov 9, 2020. It is now read-only.

vFile: include timeout for ETCD request #1850

Merged
merged 2 commits into from Aug 30, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
92 changes: 62 additions & 30 deletions client_plugin/drivers/vfile/kvstore/etcdops/etcdops.go
Expand Up @@ -42,11 +42,13 @@ import (
cluster
etcdClusterStateExisting: Used to indicate that this node is joining
an existing etcd cluster
requestTimeout: After how long should an etcd request timeout
etcdRequestTimeout: After how long should an etcd request timeout
etcdUpdateTimeout: Timeout for waiting etcd server change status
checkSleepDuration: How long to wait in any busy waiting situation
before checking again
gcTicker: ticker for garbage collector to run a collection
etcdClientCreateError: Error indicating failure to create etcd client
swarmUnhealthyErrorMsg: Message indicating swarm cluster is unhealthy
etcdSingleRef: if global refcount 0 -> 1, start SMB server
etcdNoRef: if global refcount 1 -> 0, shut down SMB server
*/
Expand All @@ -58,10 +60,12 @@ const (
etcdScheme = "http://"
etcdClusterStateNew = "new"
etcdClusterStateExisting = "existing"
requestTimeout = 5 * time.Second
etcdRequestTimeout = 2 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you know the timeout value used here is the right one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the etcdUpdateTimeout is in fact the old requestTimeout now.
In the previous code, requestTimeout or 2 * requestTimeout is used for waiting status change of ETCD server, or waiting for a key-value inside ETCD getting updated by other managers. So it's basically keeping the old value.
And the new etcdRequestTimeout is timeout for waiting an ETCD operation to be done. I checked the example code in ETCD codebase and 2 seconds are used for requests timeout.

etcdUpdateTimeout = 10 * time.Second
checkSleepDuration = time.Second
gcTicker = 5 * time.Second
etcdClientCreateError = "Failed to create etcd client"
swarmUnhealthyErrorMsg = "Swarm cluster maybe unhealthy"
etcdSingleRef = "1"
etcdNoRef = "0"
)
Expand Down Expand Up @@ -192,9 +196,12 @@ func (e *EtcdKVS) joinEtcdCluster(leaderAddr string) error {
"nodeID": nodeID},
).Error("Failed to join ETCD cluster on manager ")
}
defer etcd.Close()

// list all current ETCD members, check if this node is already added as a member
lresp, err := etcd.MemberList(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), etcdRequestTimeout)
lresp, err := etcd.MemberList(ctx)
cancel()
if err != nil {
log.WithFields(
log.Fields{"leaderAddr": leaderAddr,
Expand Down Expand Up @@ -227,7 +234,9 @@ func (e *EtcdKVS) joinEtcdCluster(leaderAddr string) error {
"peerAddr": peerAddr},
).Info("Already joined as a ETCD member and started. Action: remove self before re-join ")

_, err = etcd.MemberRemove(context.Background(), member.ID)
ctx, cancel = context.WithTimeout(context.Background(), etcdRequestTimeout)
_, err = etcd.MemberRemove(ctx, member.ID)
cancel()
if err != nil {
log.WithFields(
log.Fields{"peerAddr": peerAddr,
Expand All @@ -244,7 +253,9 @@ func (e *EtcdKVS) joinEtcdCluster(leaderAddr string) error {
initCluster := ""
if !existing {
peerAddrs := []string{peerAddr}
aresp, err := etcd.MemberAdd(context.Background(), peerAddrs)
ctx, cancel = context.WithTimeout(context.Background(), etcdRequestTimeout)
aresp, err := etcd.MemberAdd(ctx, peerAddrs)
cancel()
if err != nil {
log.WithFields(
log.Fields{"leaderAddr": leaderAddr,
Expand Down Expand Up @@ -299,7 +310,7 @@ func etcdService(cmd []string) {
func (e *EtcdKVS) checkLocalEtcd() error {
ticker := time.NewTicker(checkSleepDuration)
defer ticker.Stop()
timer := time.NewTimer(requestTimeout)
timer := time.NewTimer(etcdUpdateTimeout)
defer timer.Stop()

for {
Expand Down Expand Up @@ -545,11 +556,14 @@ func (e *EtcdKVS) CompareAndPut(key string, oldVal string, newVal string) bool {
return false
}
defer etcdAPI.Close()
txresp, err := etcdAPI.Txn(context.TODO()).If(

ctx, cancel := context.WithTimeout(context.Background(), etcdRequestTimeout)
txresp, err := etcdAPI.Txn(ctx).If(
etcdClient.Compare(etcdClient.Value(key), "=", oldVal),
).Then(
etcdClient.OpPut(key, newVal),
).Commit()
cancel()

if err != nil {
log.WithFields(
Expand All @@ -576,13 +590,16 @@ func (e *EtcdKVS) CompareAndPutOrFetch(key string,
return txresp, errors.New(etcdClientCreateError)
}
defer etcdAPI.Close()
txresp, err := etcdAPI.Txn(context.TODO()).If(

ctx, cancel := context.WithTimeout(context.Background(), etcdRequestTimeout)
txresp, err := etcdAPI.Txn(ctx).If(
etcdClient.Compare(etcdClient.Value(key), "=", oldVal),
).Then(
etcdClient.OpPut(key, newVal),
).Else(
etcdClient.OpGet(key),
).Commit()
cancel()

if err != nil {
// There was some error
Expand All @@ -605,7 +622,7 @@ func (e *EtcdKVS) CompareAndPutStateOrBusywait(key string, oldVal string, newVal

ticker := time.NewTicker(checkSleepDuration)
defer ticker.Stop()
timer := time.NewTimer(2 * requestTimeout)
timer := time.NewTimer(etcdUpdateTimeout)
defer timer.Stop()
for {
select {
Expand Down Expand Up @@ -670,15 +687,12 @@ func addrToEtcdClient(addr string) (*etcdClient.Client, error) {
s := strings.Split(addr, ":")
endpoint := s[0] + etcdClientPort
cfg := etcdClient.Config{
Endpoints: []string{endpoint},
Endpoints: []string{endpoint},
DialTimeout: etcdRequestTimeout,
}

etcd, err := etcdClient.New(cfg)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the error log is removed?

Copy link
Contributor Author

@luomiao luomiao Aug 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this function is called by the garbage collector and List function too.
When some of the swarm managers don't have plugin installed (no ETCD on it), which is still a valid environment for vFile, this message will pollute the logs. However, it's not really necessary. This is because a plugin will loop all the managers to find a valid one with plugin installed. We only need the error log when none of the managers are valid, which is currently printed out at: https://github.com/vmware/docker-volume-vsphere/pull/1850/files/a7f1b6a45f7bf37d664018534cfed073fc3d2e24#diff-ac388eb52d1ec58c6967185c9f547fe2R674

log.WithFields(
log.Fields{"endpoint": endpoint,
"error": err},
).Error("Failed to create ETCD Client ")
return nil, err
}

Expand All @@ -695,7 +709,7 @@ func (e *EtcdKVS) List(prefix string) ([]string, error) {
}
defer client.Close()

ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
ctx, cancel := context.WithTimeout(context.Background(), etcdRequestTimeout)
resp, err := client.Get(ctx, prefix, etcdClient.WithPrefix(),
etcdClient.WithSort(etcdClient.SortByKey, etcdClient.SortDescend))
cancel()
Expand Down Expand Up @@ -724,7 +738,7 @@ func (e *EtcdKVS) kvMapFromPrefix(prefix string) (map[string]string, error) {
}
defer client.Close()

ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
ctx, cancel := context.WithTimeout(context.Background(), etcdRequestTimeout)
resp, err := client.Get(ctx, prefix, etcdClient.WithPrefix(),
etcdClient.WithSort(etcdClient.SortByKey, etcdClient.SortDescend))
cancel()
Expand Down Expand Up @@ -761,14 +775,19 @@ func (e *EtcdKVS) WriteMetaData(entries []kvstore.KvPair) error {

// Lets write the metadata in a single transaction
// Use a transaction if more than one entries are to be written
ctx, cancel := context.WithTimeout(context.Background(), etcdRequestTimeout)
if len(entries) > 1 {
_, err = client.Txn(context.TODO()).Then(ops...).Commit()
_, err = client.Txn(ctx).Then(ops...).Commit()
} else {
_, err = client.Do(context.TODO(), ops[0])
_, err = client.Do(ctx, ops[0])
}
cancel()

if err != nil {
msg = fmt.Sprintf("Failed to write metadata. Reason: %v", err)
msg = fmt.Sprintf("Failed to write metadata: %v.", err)
if err == context.DeadlineExceeded {
msg += fmt.Sprintf(swarmUnhealthyErrorMsg)
}
log.Warningf(msg)
return errors.New(msg)
}
Expand Down Expand Up @@ -796,10 +815,16 @@ func (e *EtcdKVS) ReadMetaData(keys []string) ([]kvstore.KvPair, error) {
}

// Read all requested keys in one transaction
getresp, err := client.Txn(context.TODO()).Then(ops...).Commit()
ctx, cancel := context.WithTimeout(context.Background(), etcdRequestTimeout)
getresp, err := client.Txn(ctx).Then(ops...).Commit()
cancel()
if err != nil {
log.Warningf("Transactional metadata read failed: %v", err)
return entries, err
msg := fmt.Sprintf("Transactional metadata read failed: %v.", err)
if err == context.DeadlineExceeded {
msg += fmt.Sprintf(swarmUnhealthyErrorMsg)
}
log.Warningf(msg)
return entries, errors.New(msg)
}

// Check responses and append them in entries[]
Expand Down Expand Up @@ -852,9 +877,14 @@ func (e *EtcdKVS) DeleteMetaData(name string) error {
}

// Delete the metadata in a single transaction
_, err = client.Txn(context.TODO()).Then(ops...).Commit()
ctx, cancel := context.WithTimeout(context.Background(), etcdRequestTimeout)
_, err = client.Txn(ctx).Then(ops...).Commit()
cancel()
if err != nil {
msg = fmt.Sprintf("Failed to delete metadata for volume %s. Reason: %v", name, err)
msg = fmt.Sprintf("Failed to delete metadata for volume %s: %v", name, err)
if err == context.DeadlineExceeded {
msg += fmt.Sprintf(swarmUnhealthyErrorMsg)
}
log.Warningf(msg)
return errors.New(msg)
}
Expand All @@ -872,13 +902,13 @@ func (e *EtcdKVS) AtomicIncr(key string) error {

ticker := time.NewTicker(checkSleepDuration)
defer ticker.Stop()
timer := time.NewTimer(requestTimeout)
timer := time.NewTimer(etcdUpdateTimeout)
defer timer.Stop()

for {
select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
ctx, cancel := context.WithTimeout(context.Background(), etcdRequestTimeout)
resp, err := client.Get(ctx, key)
cancel()
if err != nil {
Expand Down Expand Up @@ -917,13 +947,13 @@ func (e *EtcdKVS) AtomicDecr(key string) error {

ticker := time.NewTicker(checkSleepDuration)
defer ticker.Stop()
timer := time.NewTimer(requestTimeout)
timer := time.NewTimer(etcdUpdateTimeout)
defer timer.Stop()

for {
select {
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
ctx, cancel := context.WithTimeout(context.Background(), etcdRequestTimeout)
resp, err := client.Get(ctx, key)
cancel()
if err != nil {
Expand Down Expand Up @@ -968,17 +998,19 @@ func (e *EtcdKVS) BlockingWaitAndGet(key string, value string, newKey string) (s
defer ticker.Stop()
// This call is used to block and wait for long
// running functions. Larger timeout is justified.
timer := time.NewTimer(dockerops.GetServiceStartTimeout() + 2*requestTimeout)
timer := time.NewTimer(dockerops.GetServiceStartTimeout() + etcdUpdateTimeout)
defer timer.Stop()

for {
select {
case <-ticker.C:
txresp, err := client.Txn(context.TODO()).If(
ctx, cancel := context.WithTimeout(context.Background(), etcdRequestTimeout)
txresp, err := client.Txn(ctx).If(
etcdClient.Compare(etcdClient.Value(key), "=", value),
).Then(
etcdClient.OpGet(newKey),
).Commit()
cancel()

if err != nil {
log.WithFields(
Expand Down