This repository has been archived by the owner on Apr 29, 2020. It is now read-only.
/
ops.go
172 lines (152 loc) · 4.14 KB
/
ops.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package consulutil
import (
"context"
"errors"
"fmt"
"path/filepath"
"runtime"
"github.com/hashicorp/consul/api"
)
// KVError encapsulates a consul error
type KVError struct {
Op string
Key string
KVError error
filename string
function string
lineNumber int
}
// Error implements the error and "pkg/util".CallsiteError interfaces.
func (err KVError) Error() string {
return fmt.Sprintf("%s failed for path %s: %s", err.Op, err.Key, err.KVError)
}
// LineNumber implements the "pkg/util".CallsiteError interface.
func (err KVError) LineNumber() int {
return err.lineNumber
}
// Filename implements the "pkg/util".CallsiteError interface.
func (err KVError) Filename() string {
return err.filename
}
// Function implements the "pkg/util".CallsiteError interface.
func (err KVError) Function() string {
return err.function
}
// NewKVError constructs a new KVError to wrap errors from Consul.
func NewKVError(op string, key string, err error) KVError {
var function string
// Skip one stack frame to get the file & line number of caller.
pc, file, line, ok := runtime.Caller(1)
if ok {
function = runtime.FuncForPC(pc).Name()
}
return KVError{
Op: op,
Key: key,
KVError: err,
filename: filepath.Base(file),
function: function,
lineNumber: line,
}
}
// CanceledError signifies that the Consul operation was explicitly canceled.
var CanceledError = errors.New("Consul operation canceled")
// ConsulLister is a portion of the interface for api.KV
type ConsulLister interface {
List(prefix string, opts *api.QueryOptions) (api.KVPairs, *api.QueryMeta, error)
}
type listReply struct {
pairs api.KVPairs
queryMeta *api.QueryMeta
err error
}
// List performs a KV List operation that can be canceled. When the "done" channel is
// closed, CanceledError will be immediately returned. (The HTTP RPC can't be canceled,
// but it will be ignored.) Errors from Consul will be wrapped in a KVError value.
func List(
clientKV ConsulLister,
done <-chan struct{},
prefix string,
options *api.QueryOptions,
) (api.KVPairs, *api.QueryMeta, error) {
resultChan := make(chan listReply, 1)
go func() {
pairs, queryMeta, err := clientKV.List(prefix, options)
if err != nil {
err = NewKVError("list", prefix, err)
}
resultChan <- listReply{pairs, queryMeta, err}
}()
select {
case <-done:
return nil, nil, CanceledError
case r := <-resultChan:
return r.pairs, r.queryMeta, r.err
}
}
type ConsulKeyser interface {
Keys(prefix, separator string, q *api.QueryOptions) ([]string, *api.QueryMeta, error)
}
type keysReply struct {
keys []string
queryMeta *api.QueryMeta
err error
}
// SafeKeys performs a KV Keys operation that can be canceled. When the "done"
// channel is closed, CanceledError will be immediately returned. (The HTTP RPC
// can't be canceled, but it will be ignored.) Errors from Consul will be
// wrapped in a KVError value.
func SafeKeys(
clientKV ConsulKeyser,
done <-chan struct{},
prefix string,
options *api.QueryOptions,
) ([]string, *api.QueryMeta, error) {
resultChan := make(chan keysReply, 1)
go func() {
keys, queryMeta, err := clientKV.Keys(prefix, "", options)
if err != nil {
err = NewKVError("keys", prefix, err)
}
resultChan <- keysReply{keys, queryMeta, err}
}()
select {
case <-done:
return nil, nil, CanceledError
case r := <-resultChan:
return r.keys, r.queryMeta, r.err
}
}
type ConsulGetter interface {
Get(key string, opts *api.QueryOptions) (*api.KVPair, *api.QueryMeta, error)
}
type getReply struct {
kvp *api.KVPair
queryMeta *api.QueryMeta
err error
}
// Like List, but for a single key instead of a list.
func Get(
ctx context.Context,
clientKV ConsulGetter,
key string,
options *api.QueryOptions,
) (*api.KVPair, *api.QueryMeta, error) {
resultChan := make(chan getReply, 1)
go func() {
kvp, queryMeta, err := clientKV.Get(key, options)
if err != nil {
err = NewKVError("get", key, err)
}
select {
case resultChan <- getReply{kvp, queryMeta, err}:
case <-ctx.Done():
}
}()
select {
case <-ctx.Done():
return nil, nil, CanceledError
case r := <-resultChan:
return r.kvp, r.queryMeta, r.err
}
}