-
Notifications
You must be signed in to change notification settings - Fork 405
/
latency_tracker.go
107 lines (86 loc) · 3.38 KB
/
latency_tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"context"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
endpointsrequest "k8s.io/apiserver/pkg/endpoints/request"
)
// NewETCDLatencyTracker returns an implementation of
// clientv3.KV that times the calls from the specified
// 'delegate' KV instance in order to track latency incurred.
func NewETCDLatencyTracker(delegate clientv3.KV) clientv3.KV {
return &clientV3KVLatencyTracker{KV: delegate}
}
// clientV3KVLatencyTracker decorates a clientv3.KV instance and times
// each call so we can track the latency an API request incurs in etcd
// round trips (the time it takes to send data to etcd and get the
// complete response back)
//
// If an API request involves N (N>=1) round trips to etcd, then we will sum
// up the latenciy incurred in each roundtrip.
// It uses the context associated with the request in flight, so there
// are no states shared among the requests in flight, and so there is no
// concurrency overhead.
// If the goroutine executing the request handler makes concurrent calls
// to the underlying storage layer, that is protected since the latency
// tracking function TrackStorageLatency is thread safe.
//
// NOTE: Compact is an asynchronous process and is not associated with
// any request, so we will not be tracking its latency.
type clientV3KVLatencyTracker struct {
clientv3.KV
}
func (c *clientV3KVLatencyTracker) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) {
startedAt := time.Now()
defer func() {
endpointsrequest.TrackStorageLatency(ctx, time.Since(startedAt))
}()
return c.KV.Put(ctx, key, val, opts...)
}
func (c *clientV3KVLatencyTracker) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
startedAt := time.Now()
defer func() {
endpointsrequest.TrackStorageLatency(ctx, time.Since(startedAt))
}()
return c.KV.Get(ctx, key, opts...)
}
func (c *clientV3KVLatencyTracker) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
startedAt := time.Now()
defer func() {
endpointsrequest.TrackStorageLatency(ctx, time.Since(startedAt))
}()
return c.KV.Delete(ctx, key, opts...)
}
func (c *clientV3KVLatencyTracker) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) {
startedAt := time.Now()
defer func() {
endpointsrequest.TrackStorageLatency(ctx, time.Since(startedAt))
}()
return c.KV.Do(ctx, op)
}
func (c *clientV3KVLatencyTracker) Txn(ctx context.Context) clientv3.Txn {
return &clientV3TxnTracker{ctx: ctx, Txn: c.KV.Txn(ctx)}
}
type clientV3TxnTracker struct {
ctx context.Context
clientv3.Txn
}
func (t *clientV3TxnTracker) Commit() (*clientv3.TxnResponse, error) {
startedAt := time.Now()
defer func() {
endpointsrequest.TrackStorageLatency(t.ctx, time.Since(startedAt))
}()
return t.Txn.Commit()
}