Skip to content

Commit

Permalink
[processor/k8sattributes] Add support for k8s.node.uid metadata
Browse files Browse the repository at this point in the history
Signed-off-by: ChrsMark <chrismarkou92@gmail.com>
  • Loading branch information
ChrsMark committed Mar 7, 2024
1 parent 368fcdf commit a2e8782
Show file tree
Hide file tree
Showing 13 changed files with 131 additions and 9 deletions.
2 changes: 1 addition & 1 deletion processor/k8sattributesprocessor/README.md
Expand Up @@ -159,7 +159,7 @@ k8sattributes/2:

## Role-based access control

The k8sattributesprocessor needs `get`, `watch` and `list` permissions on both `pods` and `namespaces` resources, for all namespaces and pods included in the configured filters. Additionally, when using `k8s.deployment.uid` or `k8s.deployment.name` the processor also needs `get`, `watch` and `list` permissions for `replicasets` resources. When extracting metadatas from `node`, the processor needs `get`, `watch` and `list` permissions for `nodes` resources.
The k8sattributesprocessor needs `get`, `watch` and `list` permissions on both `pods` and `namespaces` resources, for all namespaces and pods included in the configured filters. Additionally, when using `k8s.deployment.uid` or `k8s.deployment.name` the processor also needs `get`, `watch` and `list` permissions for `replicasets` resources. When using `k8s.node.uid` or extracting metadata from `node`, the processor needs `get`, `watch` and `list` permissions for `nodes` resources.

Here is an example of a `ClusterRole` to give a `ServiceAccount` the necessary permissions for all pods, nodes, and namespaces in the cluster (replace `<OTEL_COL_NAMESPACE>` with a namespace where collector is deployed):

Expand Down
17 changes: 11 additions & 6 deletions processor/k8sattributesprocessor/config.go
Expand Up @@ -84,12 +84,17 @@ func (cfg *Config) Validate() error {
for _, field := range cfg.Extract.Metadata {
switch field {
case conventions.AttributeK8SNamespaceName, conventions.AttributeK8SPodName, conventions.AttributeK8SPodUID,
specPodHostName, metadataPodStartTime, conventions.AttributeK8SDeploymentName, conventions.AttributeK8SDeploymentUID,
conventions.AttributeK8SReplicaSetName, conventions.AttributeK8SReplicaSetUID, conventions.AttributeK8SDaemonSetName,
conventions.AttributeK8SDaemonSetUID, conventions.AttributeK8SStatefulSetName, conventions.AttributeK8SStatefulSetUID,
conventions.AttributeK8SContainerName, conventions.AttributeK8SJobName, conventions.AttributeK8SJobUID,
conventions.AttributeK8SCronJobName, conventions.AttributeK8SNodeName, conventions.AttributeContainerID,
conventions.AttributeContainerImageName, conventions.AttributeContainerImageTag, clusterUID:
specPodHostName, metadataPodStartTime,
conventions.AttributeK8SDeploymentName, conventions.AttributeK8SDeploymentUID,
conventions.AttributeK8SReplicaSetName, conventions.AttributeK8SReplicaSetUID,
conventions.AttributeK8SDaemonSetName, conventions.AttributeK8SDaemonSetUID,
conventions.AttributeK8SStatefulSetName, conventions.AttributeK8SStatefulSetUID,
conventions.AttributeK8SJobName, conventions.AttributeK8SJobUID,
conventions.AttributeK8SCronJobName,
conventions.AttributeK8SNodeName, conventions.AttributeK8SNodeUID,
conventions.AttributeK8SContainerName, conventions.AttributeContainerID,
conventions.AttributeContainerImageName, conventions.AttributeContainerImageTag,
clusterUID:
default:
return fmt.Errorf("\"%s\" is not a supported metadata field", field)
}
Expand Down
6 changes: 5 additions & 1 deletion processor/k8sattributesprocessor/internal/kube/client.go
Expand Up @@ -168,7 +168,7 @@ func New(logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules ExtractionRules,
}
}

if c.extractNodeLabelsAnnotations() {
if c.extractNodeLabelsAnnotations() || c.extractNodeUID() {
c.nodeInformer = newNodeSharedInformer(c.kc, c.Filters.Node)
}

Expand Down Expand Up @@ -927,6 +927,10 @@ func (c *WatchClient) extractNodeLabelsAnnotations() bool {
return false
}

func (c *WatchClient) extractNodeUID() bool {
return c.Rules.NodeUID
}

func (c *WatchClient) addOrUpdateNode(node *api_v1.Node) {
newNode := &Node{
Name: node.Name,
Expand Down
1 change: 1 addition & 0 deletions processor/k8sattributesprocessor/internal/kube/kube.go
Expand Up @@ -210,6 +210,7 @@ type ExtractionRules struct {
StatefulSetUID bool
StatefulSetName bool
Node bool
NodeUID bool
StartTime bool
ContainerName bool
ContainerID bool
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Expand Up @@ -29,6 +29,8 @@ all_set:
enabled: true
k8s.node.name:
enabled: true
k8s.node.uid:
enabled: true
k8s.pod.hostname:
enabled: true
k8s.pod.name:
Expand Down Expand Up @@ -75,6 +77,8 @@ none_set:
enabled: false
k8s.node.name:
enabled: false
k8s.node.uid:
enabled: false
k8s.pod.hostname:
enabled: false
k8s.pod.name:
Expand Down
4 changes: 4 additions & 0 deletions processor/k8sattributesprocessor/metadata.yaml
Expand Up @@ -86,6 +86,10 @@ resource_attributes:
description: The name of the Node.
type: string
enabled: true
k8s.node.uid:
description: The UID of the Node.
type: string
enabled: false
container.id:
description: Container ID. Usually a UUID, as for example used to identify Docker containers. The UUID might be abbreviated. Requires k8s.container.restart_count.
type: string
Expand Down
5 changes: 5 additions & 0 deletions processor/k8sattributesprocessor/options.go
Expand Up @@ -94,6 +94,9 @@ func enabledAttributes() (attributes []string) {
if defaultConfig.K8sNodeName.Enabled {
attributes = append(attributes, conventions.AttributeK8SNodeName)
}
if defaultConfig.K8sNodeUID.Enabled {
attributes = append(attributes, conventions.AttributeK8SNodeUID)
}
if defaultConfig.K8sPodHostname.Enabled {
attributes = append(attributes, specPodHostName)
}
Expand Down Expand Up @@ -163,6 +166,8 @@ func withExtractMetadata(fields ...string) option {
p.rules.CronJobName = true
case conventions.AttributeK8SNodeName:
p.rules.Node = true
case conventions.AttributeK8SNodeUID:
p.rules.NodeUID = true
case conventions.AttributeContainerID:
p.rules.ContainerID = true
case conventions.AttributeContainerImageName:
Expand Down
14 changes: 14 additions & 0 deletions processor/k8sattributesprocessor/processor.go
Expand Up @@ -166,6 +166,12 @@ func (kp *kubernetesprocessor) processResource(ctx context.Context, resource pco
resource.Attributes().PutStr(key, val)
}
}
nodeUID := kp.getUIDForPodsNode(nodeName)
if nodeUID != "" {
if _, found := resource.Attributes().Get(conventions.AttributeK8SNodeUID); !found {
resource.Attributes().PutStr(conventions.AttributeK8SNodeUID, nodeUID)
}
}
}
}

Expand Down Expand Up @@ -263,6 +269,14 @@ func (kp *kubernetesprocessor) getAttributesForPodsNode(nodeName string) map[str
return node.Attributes
}

func (kp *kubernetesprocessor) getUIDForPodsNode(nodeName string) string {
node, ok := kp.kc.GetNode(nodeName)
if !ok {
return ""
}
return node.NodeUID
}

// intFromAttribute extracts int value from an attribute stored as string or int
func intFromAttribute(val pcommon.Value) (int, error) {
switch val.Type() {
Expand Down
66 changes: 66 additions & 0 deletions processor/k8sattributesprocessor/processor_test.go
Expand Up @@ -880,6 +880,72 @@ func TestAddNodeLabels(t *testing.T) {
})
}

func TestAddNodeUID(t *testing.T) {
nodeUID := "asdfasdf-asdfasdf-asdf"
m := newMultiTest(
t,
func() component.Config {
cfg := createDefaultConfig().(*Config)
cfg.Extract.Metadata = []string{"k8s.node.uid"}
cfg.Extract.Labels = []FieldExtractConfig{}
return cfg
}(),
nil,
)

podIP := "1.1.1.1"
nodes := map[string]map[string]string{
"node-1": {
"nodelabel": "1",
},
}
m.kubernetesProcessorOperation(func(kp *kubernetesprocessor) {
kp.podAssociations = []kube.Association{
{
Sources: []kube.AssociationSource{
{
From: "connection",
},
},
},
}
})

m.kubernetesProcessorOperation(func(kp *kubernetesprocessor) {
pi := kube.PodIdentifier{
kube.PodIdentifierAttributeFromConnection(podIP),
}
kp.kc.(*fakeClient).Pods[pi] = &kube.Pod{Name: "test-2323", NodeName: "node-1"}
kp.kc.(*fakeClient).Nodes = make(map[string]*kube.Node)
for ns, labels := range nodes {
kp.kc.(*fakeClient).Nodes[ns] = &kube.Node{Attributes: labels, NodeUID: nodeUID}
}
})

ctx := client.NewContext(context.Background(), client.Info{
Addr: &net.IPAddr{
IP: net.ParseIP(podIP),
},
})
m.testConsume(
ctx,
generateTraces(),
generateMetrics(),
generateLogs(),
func(err error) {
assert.NoError(t, err)
})

m.assertBatchesLen(1)
m.assertResourceObjectLen(0)
m.assertResource(0, func(res pcommon.Resource) {
assert.Equal(t, 3, res.Attributes().Len())
assertResourceHasStringAttribute(t, res, "k8s.pod.ip", podIP)
assertResourceHasStringAttribute(t, res, "k8s.node.uid", nodeUID)
assertResourceHasStringAttribute(t, res, "nodelabel", "1")
})
}

func TestProcessorAddContainerAttributes(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit a2e8782

Please sign in to comment.