/
stats_conn.go
223 lines (196 loc) · 6.94 KB
/
stats_conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topo
import (
"context"
"time"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
)
var _ Conn = (*StatsConn)(nil)
var (
topoStatsConnTimings = stats.NewMultiTimings(
"TopologyConnOperations",
"TopologyConnOperations timings",
[]string{"Operation", "Cell"})
topoStatsConnErrors = stats.NewCountersWithMultiLabels(
"TopologyConnErrors",
"TopologyConnErrors errors per operation",
[]string{"Operation", "Cell"})
)
const readOnlyErrorStrFormat = "cannot perform %s on %s as the topology server connection is read-only"
// The StatsConn is a wrapper for a Conn that emits stats for every operation
type StatsConn struct {
cell string
conn Conn
readOnly bool
}
// NewStatsConn returns a StatsConn
func NewStatsConn(cell string, conn Conn) *StatsConn {
return &StatsConn{
cell: cell,
conn: conn,
readOnly: false,
}
}
// ListDir is part of the Conn interface
func (st *StatsConn) ListDir(ctx context.Context, dirPath string, full bool) ([]DirEntry, error) {
startTime := time.Now()
statsKey := []string{"ListDir", st.cell}
defer topoStatsConnTimings.Record(statsKey, startTime)
res, err := st.conn.ListDir(ctx, dirPath, full)
if err != nil {
topoStatsConnErrors.Add(statsKey, int64(1))
return res, err
}
return res, err
}
// Create is part of the Conn interface
func (st *StatsConn) Create(ctx context.Context, filePath string, contents []byte) (Version, error) {
statsKey := []string{"Create", st.cell}
if st.readOnly {
return nil, vterrors.Errorf(vtrpc.Code_READ_ONLY, readOnlyErrorStrFormat, statsKey[0], filePath)
}
startTime := time.Now()
defer topoStatsConnTimings.Record(statsKey, startTime)
res, err := st.conn.Create(ctx, filePath, contents)
if err != nil {
topoStatsConnErrors.Add(statsKey, int64(1))
return res, err
}
return res, err
}
// Update is part of the Conn interface
func (st *StatsConn) Update(ctx context.Context, filePath string, contents []byte, version Version) (Version, error) {
statsKey := []string{"Update", st.cell}
if st.readOnly {
return nil, vterrors.Errorf(vtrpc.Code_READ_ONLY, readOnlyErrorStrFormat, statsKey[0], filePath)
}
startTime := time.Now()
defer topoStatsConnTimings.Record(statsKey, startTime)
res, err := st.conn.Update(ctx, filePath, contents, version)
if err != nil {
topoStatsConnErrors.Add(statsKey, int64(1))
return res, err
}
return res, err
}
// Get is part of the Conn interface
func (st *StatsConn) Get(ctx context.Context, filePath string) ([]byte, Version, error) {
startTime := time.Now()
statsKey := []string{"Get", st.cell}
defer topoStatsConnTimings.Record(statsKey, startTime)
bytes, version, err := st.conn.Get(ctx, filePath)
if err != nil {
topoStatsConnErrors.Add(statsKey, int64(1))
return bytes, version, err
}
return bytes, version, err
}
// List is part of the Conn interface
func (st *StatsConn) List(ctx context.Context, filePathPrefix string) ([]KVInfo, error) {
startTime := time.Now()
statsKey := []string{"List", st.cell}
defer topoStatsConnTimings.Record(statsKey, startTime)
bytes, err := st.conn.List(ctx, filePathPrefix)
if err != nil {
topoStatsConnErrors.Add(statsKey, int64(1))
return bytes, err
}
return bytes, err
}
// Delete is part of the Conn interface
func (st *StatsConn) Delete(ctx context.Context, filePath string, version Version) error {
statsKey := []string{"Delete", st.cell}
if st.readOnly {
return vterrors.Errorf(vtrpc.Code_READ_ONLY, readOnlyErrorStrFormat, statsKey[0], filePath)
}
startTime := time.Now()
defer topoStatsConnTimings.Record(statsKey, startTime)
err := st.conn.Delete(ctx, filePath, version)
if err != nil {
topoStatsConnErrors.Add(statsKey, int64(1))
return err
}
return err
}
// Lock is part of the Conn interface
func (st *StatsConn) Lock(ctx context.Context, dirPath, contents string) (LockDescriptor, error) {
return st.internalLock(ctx, dirPath, contents, true)
}
// TryLock is part of the topo.Conn interface. Its implementation is same as Lock
func (st *StatsConn) TryLock(ctx context.Context, dirPath, contents string) (LockDescriptor, error) {
return st.internalLock(ctx, dirPath, contents, false)
}
// TryLock is part of the topo.Conn interface. Its implementation is same as Lock
func (st *StatsConn) internalLock(ctx context.Context, dirPath, contents string, isBlocking bool) (LockDescriptor, error) {
statsKey := []string{"Lock", st.cell}
if st.readOnly {
return nil, vterrors.Errorf(vtrpc.Code_READ_ONLY, readOnlyErrorStrFormat, statsKey[0], dirPath)
}
startTime := time.Now()
defer topoStatsConnTimings.Record(statsKey, startTime)
var res LockDescriptor
var err error
if isBlocking {
res, err = st.conn.Lock(ctx, dirPath, contents)
} else {
res, err = st.conn.TryLock(ctx, dirPath, contents)
}
if err != nil {
topoStatsConnErrors.Add(statsKey, int64(1))
return res, err
}
return res, err
}
// Watch is part of the Conn interface
func (st *StatsConn) Watch(ctx context.Context, filePath string) (current *WatchData, changes <-chan *WatchData, err error) {
startTime := time.Now()
statsKey := []string{"Watch", st.cell}
defer topoStatsConnTimings.Record(statsKey, startTime)
return st.conn.Watch(ctx, filePath)
}
func (st *StatsConn) WatchRecursive(ctx context.Context, path string) ([]*WatchDataRecursive, <-chan *WatchDataRecursive, error) {
startTime := time.Now()
statsKey := []string{"WatchRecursive", st.cell}
defer topoStatsConnTimings.Record(statsKey, startTime)
return st.conn.WatchRecursive(ctx, path)
}
// NewLeaderParticipation is part of the Conn interface
func (st *StatsConn) NewLeaderParticipation(name, id string) (LeaderParticipation, error) {
startTime := time.Now()
statsKey := []string{"NewLeaderParticipation", st.cell}
defer topoStatsConnTimings.Record(statsKey, startTime)
res, err := st.conn.NewLeaderParticipation(name, id)
if err != nil {
topoStatsConnErrors.Add(statsKey, int64(1))
return res, err
}
return res, err
}
// Close is part of the Conn interface
func (st *StatsConn) Close() {
startTime := time.Now()
statsKey := []string{"Close", st.cell}
defer topoStatsConnTimings.Record(statsKey, startTime)
st.conn.Close()
}
// SetReadOnly with true prevents any write operations from being made on the topo connection
func (st *StatsConn) SetReadOnly(readOnly bool) {
st.readOnly = readOnly
}
// IsReadOnly allows you to check the access type for the topo connection
func (st *StatsConn) IsReadOnly() bool {
return st.readOnly
}