Skip to content

Commit

Permalink
Add /pods endpoint support in kubeletstats receiver to add extra labels
Browse files Browse the repository at this point in the history
This commits adds ability to optionally fetch extra metadata from /pods endpoint in kubeletstats receiver and use that data to set additional labels on metric resource. For now only container.id label is supported
  • Loading branch information
dmitryax committed Jul 27, 2020
1 parent b0e40a0 commit 7bd18b0
Show file tree
Hide file tree
Showing 22 changed files with 628 additions and 47 deletions.
22 changes: 22 additions & 0 deletions receiver/kubeletstatsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,25 @@ service:
receivers: [kubeletstats]
exporters: [file]
```

### Extra metadata labels

By default all produced metrics get resource labels based on what kubelet /stats/summary endpoint provides.
For some use cases it might be not enough. So it's possible to leverage other endpoints to fetch
additional metadata entities and set them as extra labels on metric resource.
The only additional label supported at the moment is `container.id`. If you want to have that label
added to your metrics, use `extra_metadata_labels` field to enable it, for example:

```yaml
receivers:
kubeletstats:
collection_interval: 10s
auth_type: "serviceAccount"
endpoint: "${K8S_NODE_NAME}:10250"
insecure_skip_verify: true
extra_metadata_labels:
- container.id
```

If `extra_metadata_labels` is not set, no additional API calls is done to fetch extra metadata.

6 changes: 6 additions & 0 deletions receiver/kubeletstatsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,10 @@ type Config struct {
kubelet.ClientConfig `mapstructure:",squash"`
confignet.TCPAddr `mapstructure:",squash"`
CollectionInterval time.Duration `mapstructure:"collection_interval"`

// ExtraMetadataLabels contains list of extra metadata that should be taken from /pods endpoint
// and put as extra labels on metrics resource.
// No additional metadata is fetched by default, so there are no extra calls to /pods endpoint.
// Only container.id label is supported at the moment.
ExtraMetadataLabels []kubelet.MetadataLabel `mapstructure:"extra_metadata_labels"`
}
15 changes: 15 additions & 0 deletions receiver/kubeletstatsreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,19 @@ func TestLoadConfig(t *testing.T) {
},
CollectionInterval: duration,
}, saCfg)

metadataCfg := cfg.Receivers["kubeletstats/metadata"].(*Config)
require.Equal(t, &Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: "kubeletstats",
NameVal: "kubeletstats/metadata",
},
ClientConfig: kubelet.ClientConfig{
APIConfig: k8sconfig.APIConfig{
AuthType: "serviceAccount",
},
},
CollectionInterval: duration,
ExtraMetadataLabels: []kubelet.MetadataLabel{kubelet.MetadataLabelContainerID},
}, metadataCfg)
}
10 changes: 7 additions & 3 deletions receiver/kubeletstatsreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,14 @@ func (f *Factory) CreateTraceReceiver(
func (f *Factory) CreateMetricsReceiver(
ctx context.Context,
logger *zap.Logger,
cfg configmodels.Receiver,
baseCfg configmodels.Receiver,
consumer consumer.MetricsConsumerOld,
) (component.MetricsReceiver, error) {
cfg := baseCfg.(*Config)
err := kubelet.ValidateMetadataLabelsConfig(cfg.ExtraMetadataLabels)
if err != nil {
return nil, err
}
rest, err := f.restClient(logger, cfg)
if err != nil {
return nil, err
Expand All @@ -84,8 +89,7 @@ func (f *Factory) CreateMetricsReceiver(
}, nil
}

func (f *Factory) restClient(logger *zap.Logger, baseCfg configmodels.Receiver) (kubelet.RestClient, error) {
cfg := baseCfg.(*Config)
func (f *Factory) restClient(logger *zap.Logger, cfg *Config) (kubelet.RestClient, error) {
clientProvider, err := kubelet.NewClientProvider(cfg.Endpoint, &cfg.ClientConfig, logger)
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions receiver/kubeletstatsreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ require (
go.opentelemetry.io/collector v0.5.1-0.20200723232356-d4053cc823a0
go.uber.org/zap v1.15.0
google.golang.org/grpc/examples v0.0.0-20200723182653-9106c3fff523 // indirect
k8s.io/api v0.0.0-20190813020757-36bff7324fb7
k8s.io/apimachinery v0.0.0-20190809020650-423f5d784010
k8s.io/kubernetes v1.12.0
)

Expand Down
2 changes: 2 additions & 0 deletions receiver/kubeletstatsreceiver/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,7 @@ github.com/securego/gosec/v2 v2.3.0 h1:y/9mCF2WPDbSDpL3QDWZD3HHGrSYw0QSHnCqTfs4J
github.com/securego/gosec/v2 v2.3.0/go.mod h1:UzeVyUXbxukhLeHKV3VVqo7HdoQR9MrRfFmZYotn8ME=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil v0.0.0-20190901111213-e4ec7b275ada/go.mod h1:WWnYX4lzhCH5h/3YBfyVA3VbLYjlMZZAQcW9ojMexNc=
github.com/shirou/gopsutil v0.0.0-20200517204708-c89193f22d93 h1:+ZhxoIovCjs+mkd0pCBqczqvx/vl+emW8x04WM15Y7M=
github.com/shirou/gopsutil v0.0.0-20200517204708-c89193f22d93/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc=
github.com/shurcooL/go v0.0.0-20180423040247-9e1955d9fb6e h1:MZM7FHLqUHYI0Y/mQAt3d2aYa0SiNms/hFqC9qJYolM=
Expand Down Expand Up @@ -1368,6 +1369,7 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.24.0 h1:UhZDfRO8JRQru4/+LlLE0BRKGF8L+PICnvYZmx/fEGA=
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
Expand Down
14 changes: 12 additions & 2 deletions receiver/kubeletstatsreceiver/kubelet/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ import (
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"github.com/golang/protobuf/ptypes/timestamp"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.uber.org/zap"
stats "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
)

type metricDataAccumulator struct {
m []*consumerdata.MetricsData
m []*consumerdata.MetricsData
metadata Metadata
logger *zap.Logger
}

const (
Expand Down Expand Up @@ -59,10 +62,17 @@ func (a *metricDataAccumulator) podStats(podResource *resourcepb.Resource, s sta
}

func (a *metricDataAccumulator) containerStats(podResource *resourcepb.Resource, s stats.ContainerStats) {
resource, err := containerResource(podResource, s, a.metadata)
if err != nil {
a.logger.Warn("failed to fetch container metrics", zap.String("pod", podResource.Labels[labelPodName]),
zap.String("container", podResource.Labels[labelContainerName]), zap.Error(err))
return
}

// todo s.Logs
a.accumulate(
timestampProto(s.StartTime.Time),
containerResource(podResource, s),
resource,

cpuMetrics(containerPrefix, s.CPU),
memMetrics(containerPrefix, s.Memory),
Expand Down
9 changes: 8 additions & 1 deletion receiver/kubeletstatsreceiver/kubelet/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,17 @@ func (c *clientImpl) Get(path string) ([]byte, error) {
c.logger.Warn("failed to close response body", zap.Error(closeErr))
}
}()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to read Kubelet response body - %v", err)
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("kubelet request GET %s failed - %q, response: %q",
req.URL.String(), resp.Status, string(body))
}

return body, nil
}

Expand Down
19 changes: 19 additions & 0 deletions receiver/kubeletstatsreceiver/kubelet/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,19 @@ func TestErrOnRead(t *testing.T) {
require.Nil(t, resp)
}

func TestErrCode(t *testing.T) {
tr := &fakeRoundTripper{errCode: true}
baseURL := "http://localhost:9876"
client := &clientImpl{
baseURL: baseURL,
httpClient: http.Client{Transport: tr},
logger: zap.NewNop(),
}
resp, err := client.Get("/foo")
require.Error(t, err)
require.Nil(t, resp)
}

var _ http.RoundTripper = (*fakeRoundTripper)(nil)

type fakeRoundTripper struct {
Expand All @@ -258,6 +271,7 @@ type fakeRoundTripper struct {
failOnRT bool
errOnClose bool
errOnRead bool
errCode bool
}

func (f *fakeRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
Expand All @@ -273,7 +287,12 @@ func (f *fakeRoundTripper) RoundTrip(req *http.Request) (*http.Response, error)
} else {
reader = strings.NewReader("hello")
}
statusCode := 200
if f.errCode {
statusCode = 503
}
return &http.Response{
StatusCode: statusCode,
Body: &fakeReadCloser{
Reader: reader,
onClose: func() error {
Expand Down
24 changes: 24 additions & 0 deletions receiver/kubeletstatsreceiver/kubelet/conventions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kubelet

const (
labelContainerID = "container.id"
labelContainerName = "container.name"
labelNamespaceName = "k8s.namespace.name"
labelNodeName = "k8s.node.name"
labelPodName = "k8s.pod.name"
labelPodUID = "k8s.pod.uid"
)
105 changes: 105 additions & 0 deletions receiver/kubeletstatsreceiver/kubelet/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kubelet

import (
"fmt"
"regexp"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)

type MetadataLabel string

const (
MetadataLabelContainerID MetadataLabel = labelContainerID
)

var supportedLabels = map[MetadataLabel]bool{
MetadataLabelContainerID: true,
}

// ValidateMetadataLabelsConfig validates that provided list of metadata labels is supported
func ValidateMetadataLabelsConfig(labels []MetadataLabel) error {
labelsFound := map[MetadataLabel]bool{}
for _, label := range labels {
if _, supported := supportedLabels[label]; supported {
if _, duplicate := labelsFound[label]; duplicate {
return fmt.Errorf("duplicate found in extra_metadata_labels: %q ", label)
}
labelsFound[label] = true
} else {
return fmt.Errorf("label %q is not supported", label)
}
}
return nil
}

type Metadata struct {
Labels []MetadataLabel
PodsMetadata *v1.PodList
}

func NewMetadata(labels []MetadataLabel, podsMetadata *v1.PodList) Metadata {
return Metadata{
Labels: labels,
PodsMetadata: podsMetadata,
}
}

// setExtraLabels sets extra labels in `lables` map based on available metadata
func (m *Metadata) setExtraLabels(labels map[string]string, podUID string, containerName string) error {
for _, label := range m.Labels {
switch label {
case MetadataLabelContainerID:
containerID, err := m.getContainerID(podUID, containerName)
if err != nil {
return err
}
labels[labelContainerID] = containerID
return nil
}
}
return nil
}

// getContainerID retrieves container id from metadata for given pod UID and container name,
// returns an error if no container found in the metadata that matches the requirements.
func (m *Metadata) getContainerID(podUID string, containerName string) (string, error) {
if m.PodsMetadata == nil {
return "", errors.New("pods metadata were not fetched")
}

for _, pod := range m.PodsMetadata.Items {
if pod.UID == types.UID(podUID) {
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerName == containerStatus.Name {
return stripContainerID(containerStatus.ContainerID), nil
}
}
}
}

return "", fmt.Errorf("pod %q with container %q not found in the fetched metadata", podUID, containerName)
}

var containerSchemeRegexp = regexp.MustCompile(`^[\w_-]+://`)

// stripContainerID returns a pure container id without the runtime scheme://
func stripContainerID(id string) string {
return containerSchemeRegexp.ReplaceAllString(id, "")
}
46 changes: 46 additions & 0 deletions receiver/kubeletstatsreceiver/kubelet/metadata_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kubelet

import (
"encoding/json"

v1 "k8s.io/api/core/v1"
)

// MetadataProvider wraps a RestClient, returning an unmarshaled
// Metadata.Summary struct from the kubelet API.
type MetadataProvider struct {
rc RestClient
}

func NewMetadataProvider(rc RestClient) *MetadataProvider {
return &MetadataProvider{rc: rc}
}

// Pods calls the /pods endpoint and unmarshals the
// results into a v1.PodList struct.
func (p *MetadataProvider) Pods() (*v1.PodList, error) {
pods, err := p.rc.Pods()
if err != nil {
return nil, err
}
var out v1.PodList
err = json.Unmarshal(pods, &out)
if err != nil {
return nil, err
}
return &out, nil
}

0 comments on commit 7bd18b0

Please sign in to comment.