Skip to content

Commit

Permalink
[processor/k8sattributes] Add optional k8s.cluster.uid resource attri…
Browse files Browse the repository at this point in the history
…bute (#23668)

**Description:** 
Add k8s.cluster.uid to attribute to k8sattributes processor and disable
it by default for backward compatibility. Users can set it to `true` to
populate cluster uid as part of resource attributes.

**Link to tracking Issue:**
#21974
  • Loading branch information
shivanshuraj1333 committed Aug 12, 2023
1 parent a6cc5ee commit 99fb6b6
Show file tree
Hide file tree
Showing 18 changed files with 129 additions and 8 deletions.
27 changes: 27 additions & 0 deletions .chloggen/k8smetadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sattributes

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Added k8s.cluster.uid to k8sattributes processor to add cluster uid

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [21974]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions examples/kubernetes/otel-collector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ data:
- k8s.namespace.name
- k8s.node.name
- k8s.pod.start_time
- k8s.cluster.uid
# Pod labels which can be fetched via K8sattributeprocessor
labels:
- tag_name: key1
Expand Down
1 change: 1 addition & 0 deletions processor/k8sattributesprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type ExtractConfig struct {
// k8s.statefulset.name, k8s.statefulset.uid,
// k8s.container.name, container.image.name,
// container.image.tag, container.id
// k8s.cluster.uid
//
// Specifying anything other than these values will result in an error.
// By default, the following fields are extracted and added to spans, metrics and logs as attributes:
Expand Down
2 changes: 1 addition & 1 deletion processor/k8sattributesprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestLoadConfig(t *testing.T) {
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeKubeConfig},
Passthrough: false,
Extract: ExtractConfig{
Metadata: []string{"k8s.pod.name", "k8s.pod.uid", "k8s.deployment.name", "k8s.namespace.name", "k8s.node.name", "k8s.pod.start_time"},
Metadata: []string{"k8s.pod.name", "k8s.pod.uid", "k8s.deployment.name", "k8s.namespace.name", "k8s.node.name", "k8s.pod.start_time", "k8s.cluster.uid"},
Annotations: []FieldExtractConfig{
{TagName: "a1", Key: "annotation-one", From: "pod"},
{TagName: "a2", Key: "annotation-two", Regex: "field=(?P<value>.+)", From: kube.MetadataFromPod},
Expand Down
12 changes: 12 additions & 0 deletions processor/k8sattributesprocessor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func TestE2E(t *testing.T) {
"k8s.annotations.workload": newExpectedValue(equal, "job"),
"k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-job"),
"k8s.container.name": newExpectedValue(equal, "telemetrygen"),
"k8s.cluster.uid": newExpectedValue(exist, ""),
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
Expand All @@ -124,6 +125,7 @@ func TestE2E(t *testing.T) {
"k8s.annotations.workload": newExpectedValue(equal, "statefulset"),
"k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-statefulset"),
"k8s.container.name": newExpectedValue(equal, "telemetrygen"),
"k8s.cluster.uid": newExpectedValue(exist, ""),
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
Expand All @@ -147,6 +149,7 @@ func TestE2E(t *testing.T) {
"k8s.annotations.workload": newExpectedValue(equal, "deployment"),
"k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-deployment"),
"k8s.container.name": newExpectedValue(equal, "telemetrygen"),
"k8s.cluster.uid": newExpectedValue(exist, ""),
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
Expand All @@ -168,6 +171,7 @@ func TestE2E(t *testing.T) {
"k8s.annotations.workload": newExpectedValue(equal, "daemonset"),
"k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-traces-daemonset"),
"k8s.container.name": newExpectedValue(equal, "telemetrygen"),
"k8s.cluster.uid": newExpectedValue(exist, ""),
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
Expand All @@ -189,6 +193,7 @@ func TestE2E(t *testing.T) {
"k8s.annotations.workload": newExpectedValue(equal, "job"),
"k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-job"),
"k8s.container.name": newExpectedValue(equal, "telemetrygen"),
"k8s.cluster.uid": newExpectedValue(exist, ""),
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
Expand All @@ -210,6 +215,7 @@ func TestE2E(t *testing.T) {
"k8s.annotations.workload": newExpectedValue(equal, "statefulset"),
"k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-statefulset"),
"k8s.container.name": newExpectedValue(equal, "telemetrygen"),
"k8s.cluster.uid": newExpectedValue(exist, ""),
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
Expand All @@ -233,6 +239,7 @@ func TestE2E(t *testing.T) {
"k8s.annotations.workload": newExpectedValue(equal, "deployment"),
"k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-deployment"),
"k8s.container.name": newExpectedValue(equal, "telemetrygen"),
"k8s.cluster.uid": newExpectedValue(exist, ""),
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
Expand All @@ -254,6 +261,7 @@ func TestE2E(t *testing.T) {
"k8s.annotations.workload": newExpectedValue(equal, "daemonset"),
"k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-metrics-daemonset"),
"k8s.container.name": newExpectedValue(equal, "telemetrygen"),
"k8s.cluster.uid": newExpectedValue(exist, ""),
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
Expand All @@ -275,6 +283,7 @@ func TestE2E(t *testing.T) {
"k8s.annotations.workload": newExpectedValue(equal, "job"),
"k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-job"),
"k8s.container.name": newExpectedValue(equal, "telemetrygen"),
"k8s.cluster.uid": newExpectedValue(exist, ""),
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
Expand All @@ -296,6 +305,7 @@ func TestE2E(t *testing.T) {
"k8s.annotations.workload": newExpectedValue(equal, "statefulset"),
"k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-statefulset"),
"k8s.container.name": newExpectedValue(equal, "telemetrygen"),
"k8s.cluster.uid": newExpectedValue(exist, ""),
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
Expand All @@ -319,6 +329,7 @@ func TestE2E(t *testing.T) {
"k8s.annotations.workload": newExpectedValue(equal, "deployment"),
"k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-deployment"),
"k8s.container.name": newExpectedValue(equal, "telemetrygen"),
"k8s.cluster.uid": newExpectedValue(exist, ""),
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
Expand All @@ -340,6 +351,7 @@ func TestE2E(t *testing.T) {
"k8s.annotations.workload": newExpectedValue(equal, "daemonset"),
"k8s.labels.app": newExpectedValue(equal, "telemetrygen-"+testID+"-logs-daemonset"),
"k8s.container.name": newExpectedValue(equal, "telemetrygen"),
"k8s.cluster.uid": newExpectedValue(exist, ""),
"container.image.name": newExpectedValue(equal, "ghcr.io/open-telemetry/opentelemetry-collector-contrib/telemetrygen"),
"container.image.tag": newExpectedValue(equal, "latest"),
"container.id": newExpectedValue(exist, ""),
Expand Down
24 changes: 18 additions & 6 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,15 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,
}

if newNamespaceInformer == nil {
newNamespaceInformer = newNamespaceSharedInformer
// if rules to extract metadata from namespace is configured use namespace shared informer containing
// all namespaces including kube-system which contains cluster uid information (kube-system-uid)
if c.extractNamespaceLabelsAnnotations() {
newNamespaceInformer = newNamespaceSharedInformer
} else {
// use kube-system shared informer to only watch kube-system namespace
// reducing overhead of watching all the namespaces
newNamespaceInformer = newKubeSystemSharedInformer
}
}

c.informer = newInformer(c.kc, c.Filters.Namespace, labelSelector, fieldSelector)
Expand All @@ -132,11 +140,7 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,
return nil, err
}

if c.extractNamespaceLabelsAnnotations() {
c.namespaceInformer = newNamespaceInformer(c.kc)
} else {
c.namespaceInformer = NewNoOpInformer(c.kc)
}
c.namespaceInformer = newNamespaceInformer(c.kc)

if rules.DeploymentName || rules.DeploymentUID {
if newReplicaSetInformer == nil {
Expand Down Expand Up @@ -433,6 +437,14 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
tags[tagNodeName] = pod.Spec.NodeName
}

if c.Rules.ClusterUID {
if val, ok := c.Namespaces["kube-system"]; ok {
tags[tagClusterUID] = val.NamespaceUID
} else {
c.logger.Debug("unable to find kube-system namespace, cluster uid will not be available")
}
}

for _, r := range c.Rules.Labels {
r.extractFromPodMetadata(pod.Labels, tags, "k8s.pod.labels.%s")
}
Expand Down
23 changes: 23 additions & 0 deletions processor/k8sattributesprocessor/internal/kube/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"k8s.io/client-go/tools/cache"
)

const kubeSystemNamespace = "kube-system"

// InformerProvider defines a function type that returns a new SharedInformer. It is used to
// allow passing custom shared informers to the watch client.
type InformerProvider func(
Expand Down Expand Up @@ -73,6 +75,27 @@ func informerWatchFuncWithSelectors(client kubernetes.Interface, namespace strin
}
}

// newKubeSystemSharedInformer watches only kube-system namespace
func newKubeSystemSharedInformer(
client kubernetes.Interface,
) cache.SharedInformer {
informer := cache.NewSharedInformer(
&cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", kubeSystemNamespace).String()
return client.CoreV1().Namespaces().List(context.Background(), opts)
},
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", kubeSystemNamespace).String()
return client.CoreV1().Namespaces().Watch(context.Background(), opts)
},
},
&api_v1.Namespace{},
watchSyncPeriod,
)
return informer
}

func newNamespaceSharedInformer(
client kubernetes.Interface,
) cache.SharedInformer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ func Test_newSharedNamespaceInformer(t *testing.T) {
assert.NotNil(t, informer)
}

func Test_newKubeSystemSharedInformer(t *testing.T) {
client, err := newFakeAPIClientset(k8sconfig.APIConfig{})
require.NoError(t, err)
informer := newKubeSystemSharedInformer(client)
assert.NotNil(t, informer)
}

func Test_informerListFuncWithSelectors(t *testing.T) {
ls, fs, err := selectorsFromFilters(Filters{
Fields: []FieldFilter{
Expand Down
2 changes: 2 additions & 0 deletions processor/k8sattributesprocessor/internal/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
tagNodeName = "k8s.node.name"
tagStartTime = "k8s.pod.start_time"
tagHostName = "k8s.pod.hostname"
tagClusterUID = "k8s.cluster.uid"
// MetadataFromPod is used to specify to extract metadata/labels/annotations from pod
MetadataFromPod = "pod"
// MetadataFromNamespace is used to specify to extract metadata/labels/annotations from namespace
Expand Down Expand Up @@ -203,6 +204,7 @@ type ExtractionRules struct {
ContainerID bool
ContainerImageName bool
ContainerImageTag bool
ClusterUID bool

Annotations []FieldExtractionRule
Labels []FieldExtractionRule
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ all_set:
enabled: true
container.image.tag:
enabled: true
k8s.cluster.uid:
enabled: true
k8s.container.name:
enabled: true
k8s.cronjob.name:
Expand Down Expand Up @@ -51,6 +53,8 @@ none_set:
enabled: false
container.image.tag:
enabled: false
k8s.cluster.uid:
enabled: false
k8s.container.name:
enabled: false
k8s.cronjob.name:
Expand Down
4 changes: 4 additions & 0 deletions processor/k8sattributesprocessor/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ status:
active: [dmitryax, rmfitzpatrick, fatsheep9146]
# resource attributes are exposed through a different configuration interface (extract::metadata).
resource_attributes:
k8s.cluster.uid:
description: Gives cluster uid identified with kube-system namespace
type: string
enabled: false
k8s.namespace.name:
description: The name of the namespace that the pod is running in.
type: string
Expand Down
7 changes: 7 additions & 0 deletions processor/k8sattributesprocessor/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const (
filterOPDoesNotExist = "does-not-exist"
metadataPodStartTime = "k8s.pod.start_time"
specPodHostName = "k8s.pod.hostname"
// TODO: use k8s.cluster.uid from semconv when available, and replace clusterUID with conventions.AttributeClusterUid
clusterUID = "k8s.cluster.uid"
)

// option represents a configuration option that can be passes.
Expand Down Expand Up @@ -50,6 +52,9 @@ func withPassthrough() option {
// enabledAttributes returns the list of resource attributes enabled by default.
func enabledAttributes() (attributes []string) {
defaultConfig := metadata.DefaultResourceAttributesConfig()
if defaultConfig.K8sClusterUID.Enabled {
attributes = append(attributes, clusterUID)
}
if defaultConfig.ContainerID.Enabled {
attributes = append(attributes, conventions.AttributeContainerID)
}
Expand Down Expand Up @@ -167,6 +172,8 @@ func withExtractMetadata(fields ...string) option {
p.rules.ContainerImageName = true
case conventions.AttributeContainerImageTag:
p.rules.ContainerImageTag = true
case clusterUID:
p.rules.ClusterUID = true
default:
return fmt.Errorf("\"%s\" is not a supported metadata field", field)
}
Expand Down
Loading

0 comments on commit 99fb6b6

Please sign in to comment.