-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
lock.go
238 lines (208 loc) · 7.56 KB
/
lock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd2topo
import (
"context"
"fmt"
"path"
"github.com/spf13/pflag"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
)
var (
leaseTTL = 30
)
func init() {
for _, cmd := range topo.FlagBinaries {
servenv.OnParseFor(cmd, registerEtcd2TopoLockFlags)
}
}
func registerEtcd2TopoLockFlags(fs *pflag.FlagSet) {
fs.IntVar(&leaseTTL, "topo_etcd_lease_ttl", leaseTTL, "Lease TTL for locks and leader election. The client will use KeepAlive to keep the lease going.")
}
// newUniqueEphemeralKV creates a new file in the provided directory.
// It is linked to the Lease.
// Errors returned are converted to topo errors.
func (s *Server) newUniqueEphemeralKV(ctx context.Context, cli *clientv3.Client, leaseID clientv3.LeaseID, nodePath string, contents string) (string, int64, error) {
// Use the lease ID as the file name, so it's guaranteed unique.
newKey := fmt.Sprintf("%v/%v", nodePath, leaseID)
// Only create a new file if it doesn't exist already
// (version = 0), to avoid two processes using the
// same file name. Since we use the lease ID, this should never happen.
txnresp, err := cli.Txn(ctx).
If(clientv3.Compare(clientv3.Version(newKey), "=", 0)).
Then(clientv3.OpPut(newKey, contents, clientv3.WithLease(leaseID))).
Commit()
if err != nil {
if err == context.Canceled || err == context.DeadlineExceeded {
// Our context was canceled as we were sending
// a creation request. We don't know if it
// succeeded or not. In any case, let's try to
// delete the node, so we don't leave an orphan
// node behind for *leaseTTL time.
if _, err := cli.Delete(context.Background(), newKey); err != nil {
log.Errorf("cli.Delete(context.Background(), newKey) failed :%v", err)
}
}
return "", 0, convertError(err, newKey)
}
if !txnresp.Succeeded {
// The key already exists, that should not happen.
return "", 0, ErrBadResponse
}
// The key was created.
return newKey, txnresp.Header.Revision, nil
}
// waitOnLastRev waits on all revisions of the files in the provided
// directory that have revisions smaller than the provided revision.
// It returns true only if there is no more other older files.
func (s *Server) waitOnLastRev(ctx context.Context, cli *clientv3.Client, nodePath string, revision int64) (bool, error) {
// Get the keys that are blocking us, if any.
opts := append(clientv3.WithLastRev(), clientv3.WithMaxModRev(revision-1))
lastKey, err := cli.Get(ctx, nodePath+"/", opts...)
if err != nil {
return false, convertError(err, nodePath)
}
if len(lastKey.Kvs) == 0 {
// No older key, we're done waiting.
return true, nil
}
// Wait for release on blocking key. Cancel the watch when we
// exit this function.
key := string(lastKey.Kvs[0].Key)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wc := cli.Watch(ctx, key, clientv3.WithRev(revision))
if wc == nil {
return false, vterrors.Errorf(vtrpc.Code_INTERNAL, "Watch failed")
}
select {
case <-ctx.Done():
return false, convertError(ctx.Err(), nodePath)
case wresp := <-wc:
for _, ev := range wresp.Events {
if ev.Type == mvccpb.DELETE {
// There might still be older keys,
// but not this one.
return false, nil
}
}
}
// The Watch stopped, we're not sure if there are more items.
return false, nil
}
// etcdLockDescriptor implements topo.LockDescriptor.
type etcdLockDescriptor struct {
s *Server
leaseID clientv3.LeaseID
}
// TryLock is part of the topo.Conn interface.
func (s *Server) TryLock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
// We list all the entries under dirPath
entries, err := s.ListDir(ctx, dirPath, true)
if err != nil {
// We need to return the right error codes, like
// topo.ErrNoNode and topo.ErrInterrupted, and the
// easiest way to do this is to return convertError(err).
// It may lose some of the context, if this is an issue,
// maybe logging the error would work here.
return nil, convertError(err, dirPath)
}
// If there is a folder '/locks' with some entries in it then we can assume that someone else already has a lock.
// Throw error in this case
for _, e := range entries {
if e.Name == locksPath && e.Type == topo.TypeDirectory && e.Ephemeral {
return nil, topo.NewError(topo.NodeExists, fmt.Sprintf("lock already exists at path %s", dirPath))
}
}
// everything is good let's acquire the lock.
return s.lock(ctx, dirPath, contents)
}
// Lock is part of the topo.Conn interface.
func (s *Server) Lock(ctx context.Context, dirPath, contents string) (topo.LockDescriptor, error) {
// We list the directory first to make sure it exists.
if _, err := s.ListDir(ctx, dirPath, false /*full*/); err != nil {
// We need to return the right error codes, like
// topo.ErrNoNode and topo.ErrInterrupted, and the
// easiest way to do this is to return convertError(err).
// It may lose some of the context, if this is an issue,
// maybe logging the error would work here.
return nil, convertError(err, dirPath)
}
return s.lock(ctx, dirPath, contents)
}
// lock is used by both Lock() and primary election.
func (s *Server) lock(ctx context.Context, nodePath, contents string) (topo.LockDescriptor, error) {
nodePath = path.Join(s.root, nodePath, locksPath)
// Get a lease, set its KeepAlive.
lease, err := s.cli.Grant(ctx, int64(leaseTTL))
if err != nil {
return nil, convertError(err, nodePath)
}
leaseKA, err := s.cli.KeepAlive(ctx, lease.ID)
if err != nil {
return nil, convertError(err, nodePath)
}
go func() {
// Drain the lease keepAlive channel, we're not
// interested in its contents.
for range leaseKA {
}
}()
// Create an ephemeral node in the locks directory.
key, revision, err := s.newUniqueEphemeralKV(ctx, s.cli, lease.ID, nodePath, contents)
if err != nil {
return nil, err
}
// Wait until all older nodes in the locks directory are gone.
for {
done, err := s.waitOnLastRev(ctx, s.cli, nodePath, revision)
if err != nil {
// We had an error waiting on the last node.
// Revoke our lease, this will delete the file.
if _, rerr := s.cli.Revoke(context.Background(), lease.ID); rerr != nil {
log.Warningf("Revoke(%d) failed, may have left %v behind: %v", lease.ID, key, rerr)
}
return nil, err
}
if done {
// No more older nodes, we're it!
return &etcdLockDescriptor{
s: s,
leaseID: lease.ID,
}, nil
}
}
}
// Check is part of the topo.LockDescriptor interface.
// We use KeepAliveOnce to make sure the lease is still active and well.
func (ld *etcdLockDescriptor) Check(ctx context.Context) error {
_, err := ld.s.cli.KeepAliveOnce(ctx, ld.leaseID)
if err != nil {
return convertError(err, "lease")
}
return nil
}
// Unlock is part of the topo.LockDescriptor interface.
func (ld *etcdLockDescriptor) Unlock(ctx context.Context) error {
_, err := ld.s.cli.Revoke(ctx, ld.leaseID)
if err != nil {
return convertError(err, "lease")
}
return nil
}