forked from hashicorp/vault
-
Notifications
You must be signed in to change notification settings - Fork 0
/
inmem_ha.go
167 lines (143 loc) · 3.3 KB
/
inmem_ha.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package inmem
import (
"fmt"
"sync"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/physical"
)
type InmemHABackend struct {
physical.Backend
locks map[string]string
l *sync.Mutex
cond *sync.Cond
logger log.Logger
}
type TransactionalInmemHABackend struct {
physical.Transactional
InmemHABackend
}
// NewInmemHA constructs a new in-memory HA backend. This is only for testing.
func NewInmemHA(_ map[string]string, logger log.Logger) (physical.Backend, error) {
be, err := NewInmem(nil, logger)
if err != nil {
return nil, err
}
in := &InmemHABackend{
Backend: be,
locks: make(map[string]string),
logger: logger,
l: new(sync.Mutex),
}
in.cond = sync.NewCond(in.l)
return in, nil
}
func NewTransactionalInmemHA(_ map[string]string, logger log.Logger) (physical.Backend, error) {
transInmem, err := NewTransactionalInmem(nil, logger)
if err != nil {
return nil, err
}
inmemHA := InmemHABackend{
Backend: transInmem,
locks: make(map[string]string),
logger: logger,
l: new(sync.Mutex),
}
in := &TransactionalInmemHABackend{
InmemHABackend: inmemHA,
Transactional: transInmem.(physical.Transactional),
}
in.cond = sync.NewCond(in.l)
return in, nil
}
// LockWith is used for mutual exclusion based on the given key.
func (i *InmemHABackend) LockWith(key, value string) (physical.Lock, error) {
l := &InmemLock{
in: i,
key: key,
value: value,
}
return l, nil
}
// LockMapSize is used in some tests to determine whether this backend has ever
// been used for HA purposes rather than simply for storage
func (i *InmemHABackend) LockMapSize() int {
return len(i.locks)
}
// HAEnabled indicates whether the HA functionality should be exposed.
// Currently always returns true.
func (i *InmemHABackend) HAEnabled() bool {
return true
}
// InmemLock is an in-memory Lock implementation for the HABackend
type InmemLock struct {
in *InmemHABackend
key string
value string
held bool
leaderCh chan struct{}
l sync.Mutex
}
func (i *InmemLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
i.l.Lock()
defer i.l.Unlock()
if i.held {
return nil, fmt.Errorf("lock already held")
}
// Attempt an async acquisition
didLock := make(chan struct{})
releaseCh := make(chan bool, 1)
go func() {
// Wait to acquire the lock
i.in.l.Lock()
_, ok := i.in.locks[i.key]
for ok {
i.in.cond.Wait()
_, ok = i.in.locks[i.key]
}
i.in.locks[i.key] = i.value
i.in.l.Unlock()
// Signal that lock is held
close(didLock)
// Handle an early abort
release := <-releaseCh
if release {
i.in.l.Lock()
delete(i.in.locks, i.key)
i.in.l.Unlock()
i.in.cond.Broadcast()
}
}()
// Wait for lock acquisition or shutdown
select {
case <-didLock:
releaseCh <- false
case <-stopCh:
releaseCh <- true
return nil, nil
}
// Create the leader channel
i.held = true
i.leaderCh = make(chan struct{})
return i.leaderCh, nil
}
func (i *InmemLock) Unlock() error {
i.l.Lock()
defer i.l.Unlock()
if !i.held {
return nil
}
close(i.leaderCh)
i.leaderCh = nil
i.held = false
i.in.l.Lock()
delete(i.in.locks, i.key)
i.in.l.Unlock()
i.in.cond.Broadcast()
return nil
}
func (i *InmemLock) Value() (bool, string, error) {
i.in.l.Lock()
val, ok := i.in.locks[i.key]
i.in.l.Unlock()
return ok, val, nil
}