Skip to content

Commit

Permalink
Add basic metrics and logging of etcd object sizes, group by operatio…
Browse files Browse the repository at this point in the history
…n and k8s type name
  • Loading branch information
jpbetz committed Nov 8, 2019
1 parent 6569bc7 commit cafa6ac
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 7 deletions.
26 changes: 26 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/metrics/metrics.go
Expand Up @@ -49,6 +49,22 @@ var (
},
[]string{"resource"},
)
requestSize = compbasemetrics.NewHistogramVec(
&compbasemetrics.HistogramOpts{
Name: "etcd_request_size_bytes",
Help: "Etcd request size in bytes for each operation and object type.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"operation", "type"},
)
responseSize = compbasemetrics.NewHistogramVec(
&compbasemetrics.HistogramOpts{
Name: "etcd_response_size_bytes",
Help: "Etcd response size in bytes for each operation and object type.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"operation", "type"},
)

deprecatedEtcdRequestLatenciesSummary = compbasemetrics.NewSummaryVec(
&compbasemetrics.SummaryOpts{
Expand All @@ -68,6 +84,8 @@ func Register() {
registerMetrics.Do(func() {
legacyregistry.MustRegister(etcdRequestLatency)
legacyregistry.MustRegister(objectCounts)
legacyregistry.MustRegister(requestSize)
legacyregistry.MustRegister(responseSize)

// TODO(danielqsj): Remove the following metrics, they are deprecated
legacyregistry.MustRegister(deprecatedEtcdRequestLatenciesSummary)
Expand All @@ -85,6 +103,14 @@ func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
deprecatedEtcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(sinceInMicroseconds(startTime))
}

func RecordEtcdRequestSize(verb, resource string, sizeInBytes uint64) {
requestSize.WithLabelValues(verb, resource).Observe(float64(sizeInBytes))
}

func RecordEtcdResponseSize(verb, resource string, sizeInBytes uint64) {
responseSize.WithLabelValues(verb, resource).Observe(float64(sizeInBytes))
}

// Reset resets the etcd_request_duration_seconds metric.
func Reset() {
etcdRequestLatency.Reset()
Expand Down
56 changes: 49 additions & 7 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -115,10 +116,13 @@ func (s *store) Get(ctx context.Context, key string, resourceVersion string, out
key = path.Join(s.pathPrefix, key)
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
typeName := getTypeName(out)
if err != nil {
observeFailedOperation("get", typeName, startTime, uint64(len(key)), err)
return err
}
observeSuccessfulOperation("get", typeName, startTime, uint64(len(key)), size(getResp.Kvs))

if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}
Expand All @@ -141,6 +145,7 @@ func (s *store) Get(ctx context.Context, key string, resourceVersion string, out

// Create implements storage.Interface.Create.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
typeName := getTypeName(obj)
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion should not be set on objects to be created")
}
Expand Down Expand Up @@ -169,10 +174,11 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Commit()
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
if err != nil {
observeFailedOperation("create", typeName, startTime, uint64(len(key)), err)
return err
}
observeSuccessfulOperation("create", typeName, startTime, uint64(len(newData)), 0)
if !txnResp.Succeeded {
return storage.NewKeyExistsError(key, 0)
}
Expand All @@ -197,10 +203,12 @@ func (s *store) Delete(ctx context.Context, key string, out runtime.Object, prec
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc) error {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
typeName := getTypeName(out)
if err != nil {
observeFailedOperation("get", typeName, startTime, uint64(len(key)), err)
return err
}
observeSuccessfulOperation("get", typeName, startTime, uint64(len(key)), size(getResp.Kvs))
for {
origState, err := s.getState(getResp, key, v, false)
if err != nil {
Expand All @@ -224,13 +232,16 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O
).Commit()
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if err != nil {
observeFailedOperation("delete", typeName, startTime, uint64(len(key)), err)
return err
}
if !txnResp.Succeeded {
getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
observeSuccessfulOperation("delete", typeName, startTime, uint64(len(key)), size(getResp.Kvs))
klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
continue
}
observeSuccessfulOperation("delete", typeName, startTime, uint64(len(key)), 0)
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
}
}
Expand All @@ -251,10 +262,12 @@ func (s *store) GuaranteedUpdate(
getCurrentState := func() (*objState, error) {
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
typeName := getTypeName(out)
if err != nil {
observeFailedOperation("get", typeName, startTime, uint64(len(key)), err)
return nil, err
}
observeSuccessfulOperation("get", typeName, startTime, uint64(len(key)), size(getResp.Kvs))
return s.getState(getResp, key, v, ignoreNotFound)
}

Expand Down Expand Up @@ -355,13 +368,15 @@ func (s *store) GuaranteedUpdate(
).Else(
clientv3.OpGet(key),
).Commit()
metrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime)
typeName := getTypeName(out)
if err != nil {
observeFailedOperation("update", typeName, startTime, uint64(len(newData)), err)
return err
}
trace.Step("Transaction committed")
if !txnResp.Succeeded {
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
observeSuccessfulOperation("update", typeName, startTime, uint64(len(key)), size(getResp.Kvs))
klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
origState, err = s.getState(getResp, key, v, ignoreNotFound)
if err != nil {
Expand All @@ -371,6 +386,7 @@ func (s *store) GuaranteedUpdate(
mustCheckData = false
continue
}
observeSuccessfulOperation("update", typeName, startTime, uint64(len(newData)), 0)
putResp := txnResp.Responses[0].GetResponsePut()

return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
Expand All @@ -397,10 +413,12 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
key = path.Join(s.pathPrefix, key)
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
typeName := getTypeName(listPtr)
if err != nil {
observeFailedOperation("get", typeName, startTime, uint64(len(key)), err)
return err
}
observeSuccessfulOperation("get", typeName, startTime, uint64(len(key)), size(getResp.Kvs))
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}
Expand Down Expand Up @@ -575,13 +593,15 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
for {
startTime := time.Now()
getResp, err = s.client.KV.Get(ctx, key, options...)
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
typeName := getTypeName(listPtr)
if err != nil {
observeFailedOperation("get", typeName, startTime, uint64(len(key)), err)
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
}
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}
observeSuccessfulOperation("get", typeName, startTime, uint64(len(key)), size(getResp.Kvs))
hasMore = getResp.More

if len(getResp.Kvs) == 0 && getResp.More {
Expand Down Expand Up @@ -855,3 +875,25 @@ func notFound(key string) clientv3.Cmp {
func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()
}

func observeFailedOperation(verb, typeName string, startTime time.Time, requestSize uint64, err error) {
metrics.RecordEtcdRequestLatency(verb, typeName, startTime)
metrics.RecordEtcdRequestSize(verb, typeName, requestSize)
klog.Infof("etcd request: operation:%s type:%q request-size:%d err:%v", verb, typeName, requestSize, err)
}

func observeSuccessfulOperation(verb, typeName string, startTime time.Time, requestSize, responseSize uint64) {
metrics.RecordEtcdRequestLatency(verb, typeName, startTime)
metrics.RecordEtcdRequestSize(verb, typeName, requestSize)
metrics.RecordEtcdResponseSize(verb, typeName,responseSize)
klog.Infof("etcd request: operation:%s type:%q request-size: %d, response-size: %d", verb, typeName, responseSize, responseSize)
}

func size(kvs []*mvccpb.KeyValue) uint64 {
var n uint64
for _, kv := range kvs {
n += uint64(len(kv.Key))
n += uint64(len(kv.Value))
}
return n
}

0 comments on commit cafa6ac

Please sign in to comment.