forked from cilium/cilium
-
Notifications
You must be signed in to change notification settings - Fork 0
/
lock.go
181 lines (155 loc) · 4.78 KB
/
lock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
// Copyright 2016-2019 Authors of Cilium
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kvstore
import (
"context"
"fmt"
"time"
"github.com/cilium/cilium/pkg/debug"
"github.com/cilium/cilium/pkg/defaults"
"github.com/cilium/cilium/pkg/lock"
uuidfactor "github.com/cilium/cilium/pkg/uuid"
"github.com/davecgh/go-spew/spew"
"github.com/pborman/uuid"
"github.com/sirupsen/logrus"
)
var (
kvstoreLocks = pathLocks{lockPaths: map[string]lockOwner{}}
// staleLockTimeout is the timeout after which waiting for a believed
// other local lock user for the same key is given up on and etcd is
// asked directly. It is still highly unlikely that concurrent access
// occurs as only one consumer will manage to acquire the newly
// released lock. The only possibility of concurrent access is if a
// consumer is *still* holding the lock but this is highly unlikely
// given the duration of this timeout.
staleLockTimeout = defaults.KVStoreStaleLockTimeout
)
type KVLocker interface {
Unlock() error
// Comparator returns an object that should be used by the KVStore to make
// sure if the lock is still valid for its client or nil if no such
// verification exists.
Comparator() interface{}
}
// getLockPath returns the lock path representation of the given path.
func getLockPath(path string) string {
return path + ".lock"
}
type lockOwner struct {
created time.Time
id uuid.UUID
}
type pathLocks struct {
mutex lock.RWMutex
lockPaths map[string]lockOwner
}
func init() {
debug.RegisterStatusObject("kvstore-locks", &kvstoreLocks)
}
// DebugStatus implements debug.StatusObject to provide debug status collection
// ability
func (pl *pathLocks) DebugStatus() string {
pl.mutex.RLock()
str := spew.Sdump(pl.lockPaths)
pl.mutex.RUnlock()
return str
}
func (pl *pathLocks) runGC() {
pl.mutex.Lock()
for path, owner := range pl.lockPaths {
if time.Since(owner.created) > staleLockTimeout {
log.WithField("path", path).Error("Forcefully unlocking local kvstore lock")
delete(pl.lockPaths, path)
}
}
pl.mutex.Unlock()
}
func (pl *pathLocks) lock(ctx context.Context, path string) (id uuid.UUID, err error) {
for {
pl.mutex.Lock()
if _, ok := pl.lockPaths[path]; !ok {
id = uuidfactor.NewUUID()
pl.lockPaths[path] = lockOwner{
created: time.Now(),
id: id,
}
pl.mutex.Unlock()
return
}
pl.mutex.Unlock()
select {
case <-time.After(time.Duration(10) * time.Millisecond):
case <-ctx.Done():
err = fmt.Errorf("lock was cancelled: %s", ctx.Err())
return
}
}
}
func (pl *pathLocks) unlock(path string, id uuid.UUID) {
pl.mutex.Lock()
if owner, ok := pl.lockPaths[path]; ok && uuid.Equal(owner.id, id) {
delete(pl.lockPaths, path)
}
pl.mutex.Unlock()
}
// Lock is a lock return by LockPath
type Lock struct {
path string
id uuid.UUID
kvLock KVLocker
}
// LockPath locks the specified path. The key for the lock is not the path
// provided itself but the path with a suffix of ".lock" appended. The lock
// returned also contains a patch specific local Mutex which will be held.
//
// It is required to call Unlock() on the returned Lock to unlock
func LockPath(ctx context.Context, path string) (l *Lock, err error) {
id, err := kvstoreLocks.lock(ctx, path)
if err != nil {
return nil, err
}
lock, err := Client().LockPath(ctx, path)
if err != nil {
kvstoreLocks.unlock(path, id)
Trace("Failed to lock", err, logrus.Fields{fieldKey: path})
err = fmt.Errorf("error while locking path %s: %s", path, err)
return nil, err
}
Trace("Successful lock", err, logrus.Fields{fieldKey: path})
return &Lock{kvLock: lock, path: path, id: id}, err
}
// RunLockGC inspects all local kvstore locks to determine whether they have
// been held longer than the stale lock timeout, and if so, unlocks them
// forceably.
func RunLockGC() {
kvstoreLocks.runGC()
}
// Unlock unlocks a lock
func (l *Lock) Unlock() error {
if l == nil {
return nil
}
// Unlock kvstore mutex first
err := l.kvLock.Unlock()
if err != nil {
log.WithError(err).WithField("path", l.path).Error("Unable to unlock kvstore lock")
}
// unlock local lock even if kvstore cannot be unlocked
kvstoreLocks.unlock(l.path, l.id)
Trace("Unlocked", nil, logrus.Fields{fieldKey: l.path})
return err
}
func (l *Lock) Comparator() interface{} {
return l.kvLock.Comparator()
}