Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/k8sattributes] Add support for k8s.node.uid metadata #31637

Merged
merged 1 commit into from Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/add_node_uid_k8s_attrib.yaml
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/k8sattributes

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for `k8s.node.uid` metadata

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31637]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion processor/k8sattributesprocessor/README.md
Expand Up @@ -161,7 +161,7 @@ k8sattributes/2:

## Cluster-scoped RBAC

If you'd like to set up the k8sattributesprocessor to receive telemetry from across namespaces, it will need `get`, `watch` and `list` permissions on both `pods` and `namespaces` resources, for all namespaces and pods included in the configured filters. Additionally, when using `k8s.deployment.uid` or `k8s.deployment.name` the processor also needs `get`, `watch` and `list` permissions for `replicasets` resources. When extracting metadatas from `node`, the processor needs `get`, `watch` and `list` permissions for `nodes` resources.
If you'd like to set up the k8sattributesprocessor to receive telemetry from across namespaces, it will need `get`, `watch` and `list` permissions on both `pods` and `namespaces` resources, for all namespaces and pods included in the configured filters. Additionally, when using `k8s.deployment.uid` or `k8s.deployment.name` the processor also needs `get`, `watch` and `list` permissions for `replicasets` resources. When using `k8s.node.uid` or extracting metadata from `node`, the processor needs `get`, `watch` and `list` permissions for `nodes` resources.

Here is an example of a `ClusterRole` to give a `ServiceAccount` the necessary permissions for all pods, nodes, and namespaces in the cluster (replace `<OTEL_COL_NAMESPACE>` with a namespace where collector is deployed):

Expand Down
17 changes: 11 additions & 6 deletions processor/k8sattributesprocessor/config.go
Expand Up @@ -84,12 +84,17 @@ func (cfg *Config) Validate() error {
for _, field := range cfg.Extract.Metadata {
switch field {
case conventions.AttributeK8SNamespaceName, conventions.AttributeK8SPodName, conventions.AttributeK8SPodUID,
specPodHostName, metadataPodStartTime, conventions.AttributeK8SDeploymentName, conventions.AttributeK8SDeploymentUID,
conventions.AttributeK8SReplicaSetName, conventions.AttributeK8SReplicaSetUID, conventions.AttributeK8SDaemonSetName,
conventions.AttributeK8SDaemonSetUID, conventions.AttributeK8SStatefulSetName, conventions.AttributeK8SStatefulSetUID,
conventions.AttributeK8SContainerName, conventions.AttributeK8SJobName, conventions.AttributeK8SJobUID,
conventions.AttributeK8SCronJobName, conventions.AttributeK8SNodeName, conventions.AttributeContainerID,
conventions.AttributeContainerImageName, conventions.AttributeContainerImageTag, clusterUID:
specPodHostName, metadataPodStartTime,
conventions.AttributeK8SDeploymentName, conventions.AttributeK8SDeploymentUID,
conventions.AttributeK8SReplicaSetName, conventions.AttributeK8SReplicaSetUID,
conventions.AttributeK8SDaemonSetName, conventions.AttributeK8SDaemonSetUID,
conventions.AttributeK8SStatefulSetName, conventions.AttributeK8SStatefulSetUID,
conventions.AttributeK8SJobName, conventions.AttributeK8SJobUID,
conventions.AttributeK8SCronJobName,
conventions.AttributeK8SNodeName, conventions.AttributeK8SNodeUID,
conventions.AttributeK8SContainerName, conventions.AttributeContainerID,
conventions.AttributeContainerImageName, conventions.AttributeContainerImageTag,
clusterUID:
default:
return fmt.Errorf("\"%s\" is not a supported metadata field", field)
}
Expand Down
6 changes: 5 additions & 1 deletion processor/k8sattributesprocessor/internal/kube/client.go
Expand Up @@ -171,7 +171,7 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,
}
}

if c.extractNodeLabelsAnnotations() {
if c.extractNodeLabelsAnnotations() || c.extractNodeUID() {
c.nodeInformer = newNodeSharedInformer(c.kc, c.Filters.Node)
}

Expand Down Expand Up @@ -930,6 +930,10 @@ func (c *WatchClient) extractNodeLabelsAnnotations() bool {
return false
}

func (c *WatchClient) extractNodeUID() bool {
return c.Rules.NodeUID
}

func (c *WatchClient) addOrUpdateNode(node *api_v1.Node) {
newNode := &Node{
Name: node.Name,
Expand Down
1 change: 1 addition & 0 deletions processor/k8sattributesprocessor/internal/kube/kube.go
Expand Up @@ -210,6 +210,7 @@ type ExtractionRules struct {
StatefulSetUID bool
StatefulSetName bool
Node bool
NodeUID bool
StartTime bool
ContainerName bool
ContainerID bool
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -29,6 +29,8 @@ all_set:
enabled: true
k8s.node.name:
enabled: true
k8s.node.uid:
enabled: true
k8s.pod.hostname:
enabled: true
k8s.pod.name:
Expand Down Expand Up @@ -75,6 +77,8 @@ none_set:
enabled: false
k8s.node.name:
enabled: false
k8s.node.uid:
enabled: false
k8s.pod.hostname:
enabled: false
k8s.pod.name:
Expand Down
4 changes: 4 additions & 0 deletions processor/k8sattributesprocessor/metadata.yaml
Expand Up @@ -86,6 +86,10 @@ resource_attributes:
description: The name of the Node.
type: string
enabled: true
k8s.node.uid:
description: The UID of the Node.
type: string
enabled: false
container.id:
description: Container ID. Usually a UUID, as for example used to identify Docker containers. The UUID might be abbreviated. Requires k8s.container.restart_count.
type: string
Expand Down
5 changes: 5 additions & 0 deletions processor/k8sattributesprocessor/options.go
Expand Up @@ -94,6 +94,9 @@ func enabledAttributes() (attributes []string) {
if defaultConfig.K8sNodeName.Enabled {
attributes = append(attributes, conventions.AttributeK8SNodeName)
}
if defaultConfig.K8sNodeUID.Enabled {
attributes = append(attributes, conventions.AttributeK8SNodeUID)
}
if defaultConfig.K8sPodHostname.Enabled {
attributes = append(attributes, specPodHostName)
}
Expand Down Expand Up @@ -163,6 +166,8 @@ func withExtractMetadata(fields ...string) option {
p.rules.CronJobName = true
case conventions.AttributeK8SNodeName:
p.rules.Node = true
case conventions.AttributeK8SNodeUID:
p.rules.NodeUID = true
case conventions.AttributeContainerID:
p.rules.ContainerID = true
case conventions.AttributeContainerImageName:
Expand Down
14 changes: 14 additions & 0 deletions processor/k8sattributesprocessor/processor.go
Expand Up @@ -166,6 +166,12 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco
resource.Attributes().PutStr(key, val)
}
}
nodeUID := kp.getUIDForPodsNode(nodeName)
if nodeUID != "" {
if _, found := resource.Attributes().Get(conventions.AttributeK8SNodeUID); !found {
resource.Attributes().PutStr(conventions.AttributeK8SNodeUID, nodeUID)
}
}
}
}

Expand Down Expand Up @@ -263,6 +269,14 @@ func (kp *kubernetesprocessor) getAttributesForPodsNode(nodeName string) map[str
return node.Attributes
}

func (kp *kubernetesprocessor) getUIDForPodsNode(nodeName string) string {
node, ok := kp.kc.GetNode(nodeName)
if !ok {
return ""
}
return node.NodeUID
}

// intFromAttribute extracts int value from an attribute stored as string or int
func intFromAttribute(val pcommon.Value) (int, error) {
switch val.Type() {
Expand Down
66 changes: 66 additions & 0 deletions processor/k8sattributesprocessor/processor_test.go
Expand Up @@ -880,6 +880,72 @@ func TestAddNodeLabels(t *testing.T) {
})
}

func TestAddNodeUID(t *testing.T) {
nodeUID := "asdfasdf-asdfasdf-asdf"
m := newMultiTest(
t,
func() component.Config {
cfg := createDefaultConfig().(*Config)
cfg.Extract.Metadata = []string{"k8s.node.uid"}
cfg.Extract.Labels = []FieldExtractConfig{}
return cfg
}(),
nil,
)

podIP := "1.1.1.1"
nodes := map[string]map[string]string{
"node-1": {
"nodelabel": "1",
},
}
m.kubernetesProcessorOperation(func(kp *kubernetesprocessor) {
kp.podAssociations = []kube.Association{
{
Sources: []kube.AssociationSource{
{
From: "connection",
},
},
},
}
})

m.kubernetesProcessorOperation(func(kp *kubernetesprocessor) {
pi := kube.PodIdentifier{
kube.PodIdentifierAttributeFromConnection(podIP),
}
kp.kc.(*fakeClient).Pods[pi] = &kube.Pod{Name: "test-2323", NodeName: "node-1"}
kp.kc.(*fakeClient).Nodes = make(map[string]*kube.Node)
for ns, labels := range nodes {
kp.kc.(*fakeClient).Nodes[ns] = &kube.Node{Attributes: labels, NodeUID: nodeUID}
}
})

ctx := client.NewContext(context.Background(), client.Info{
Addr: &net.IPAddr{
IP: net.ParseIP(podIP),
},
})
m.testConsume(
ctx,
generateTraces(),
generateMetrics(),
generateLogs(),
func(err error) {
assert.NoError(t, err)
})

m.assertBatchesLen(1)
m.assertResourceObjectLen(0)
m.assertResource(0, func(res pcommon.Resource) {
assert.Equal(t, 3, res.Attributes().Len())
assertResourceHasStringAttribute(t, res, "k8s.pod.ip", podIP)
assertResourceHasStringAttribute(t, res, "k8s.node.uid", nodeUID)
assertResourceHasStringAttribute(t, res, "nodelabel", "1")
})
}

func TestProcessorAddContainerAttributes(t *testing.T) {
tests := []struct {
name string
Expand Down