Skip to content

Commit

Permalink
Expose etcd client latency metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtek-t committed Apr 1, 2019
1 parent b75e28f commit d5c9ad8
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 1 deletion.
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD
Expand Up @@ -62,6 +62,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/metrics:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
"//vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes:go_default_library",
Expand Down
28 changes: 27 additions & 1 deletion staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd"
"k8s.io/apiserver/pkg/storage/etcd/metrics"
"k8s.io/apiserver/pkg/storage/value"
utiltrace "k8s.io/utils/trace"
)
Expand Down Expand Up @@ -111,7 +112,9 @@ func (s *store) Versioner() storage.Versioner {
// Get implements storage.Interface.Get.
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
key = path.Join(s.pathPrefix, key)
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
if err != nil {
return err
}
Expand Down Expand Up @@ -156,11 +159,13 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
return storage.NewInternalError(err.Error())
}

startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key),
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Commit()
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
if err != nil {
return err
}
Expand Down Expand Up @@ -191,10 +196,12 @@ func (s *store) Delete(ctx context.Context, key string, out runtime.Object, prec
func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime.Object) error {
// We need to do get and delete in single transaction in order to
// know the value and revision before deleting it.
startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If().Then(
clientv3.OpGet(key),
clientv3.OpDelete(key),
).Commit()
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if err != nil {
return err
}
Expand All @@ -212,7 +219,9 @@ func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime
}

func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions) error {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
if err != nil {
return err
}
Expand All @@ -224,13 +233,15 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O
if err := preconditions.Check(key, origState.obj); err != nil {
return err
}
startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
).Then(
clientv3.OpDelete(key),
).Else(
clientv3.OpGet(key),
).Commit()
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if err != nil {
return err
}
Expand All @@ -247,7 +258,7 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O
func (s *store) GuaranteedUpdate(
ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
trace := utiltrace.New(fmt.Sprintf("GuaranteedUpdate etcd3: %s", reflect.TypeOf(out).String()))
trace := utiltrace.New(fmt.Sprintf("GuaranteedUpdate etcd3: %s", getTypeName(out)))
defer trace.LogIfLong(500 * time.Millisecond)

v, err := conversion.EnforcePtr(out)
Expand All @@ -257,7 +268,9 @@ func (s *store) GuaranteedUpdate(
key = path.Join(s.pathPrefix, key)

getCurrentState := func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -339,13 +352,15 @@ func (s *store) GuaranteedUpdate(
}
trace.Step("Transaction prepared")

startTime := time.Now()
txnResp, err := s.client.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Else(
clientv3.OpGet(key),
).Commit()
metrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime)
if err != nil {
return err
}
Expand Down Expand Up @@ -379,7 +394,9 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
}

key = path.Join(s.pathPrefix, key)
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
if err != nil {
return err
}
Expand All @@ -399,7 +416,9 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin

func (s *store) Count(key string) (int64, error) {
key = path.Join(s.pathPrefix, key)
startTime := time.Now()
getResp, err := s.client.KV.Get(context.Background(), key, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)), clientv3.WithCountOnly())
metrics.RecordEtcdRequestLatency("listWithCount", key, startTime)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -554,7 +573,9 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
var lastKey []byte
var hasMore bool
for {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, options...)
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
if err != nil {
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
}
Expand Down Expand Up @@ -786,3 +807,8 @@ func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.Selec
func notFound(key string) clientv3.Cmp {
return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
}

// getTypeName returns type name of an object for reporting purposes.
func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()
}
4 changes: 4 additions & 0 deletions test/integration/metrics/metrics_test.go
Expand Up @@ -61,6 +61,9 @@ func scrapeMetrics(s *httptest.Server) ([]*prometheuspb.MetricFamily, error) {
// Each line in the response body should contain all the data for a single metric.
var metrics []*prometheuspb.MetricFamily
scanner := bufio.NewScanner(resp.Body)
// Increase buffer size, since default one is too small for reading
// the /metrics contents.
scanner.Buffer(make([]byte, 10), 131072)
for scanner.Scan() {
var metric prometheuspb.MetricFamily
if err := proto.UnmarshalText(scanner.Text(), &metric); err != nil {
Expand Down Expand Up @@ -122,5 +125,6 @@ func TestApiserverMetrics(t *testing.T) {
checkForExpectedMetrics(t, metrics, []string{
"apiserver_request_total",
"apiserver_request_duration_seconds",
"etcd_request_duration_seconds",
})
}

0 comments on commit d5c9ad8

Please sign in to comment.