From 027bcc80eb0fbe7d87b147679d4d66b65df1c14d Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Thu, 10 Apr 2025 17:12:28 -0500 Subject: [PATCH 1/3] Add filebeat input parts of agent monitoring test --- .../application/monitoring/v1_monitor.go | 4 +- testing/integration/beat_receivers_test.go | 146 +++++++++++------- testing/integration/otel_test.go | 3 +- 3 files changed, 95 insertions(+), 58 deletions(-) diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index fc26fcb1377..911e86bb4b9 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -700,8 +700,7 @@ func (b *BeatsMonitor) getHttpStreams( httpStreams = append(httpStreams, httpStream) } // specifically for filebeat, we include input metrics - // disabled for filebeat receiver until https://github.com/elastic/beats/issues/43418 is resolved - if strings.EqualFold(name, "filebeat") && compInfo.RuntimeManager != component.OtelRuntimeManager { + if strings.EqualFold(name, "filebeat") { fbDataStreamName := "filebeat_input" fbDataset := fmt.Sprintf("elastic_agent.%s", fbDataStreamName) fbIndexName := fmt.Sprintf("metrics-elastic_agent.%s-%s", fbDataStreamName, monitoringNamespace) @@ -842,7 +841,6 @@ func processorsForAgentFilestream() []any { addFormattedIndexProcessor(), ) return processors - } // processorsForServiceComponentFilestream returns processors used for filestream streams for components running as diff --git a/testing/integration/beat_receivers_test.go b/testing/integration/beat_receivers_test.go index ee344c34dcd..e20d9c9185c 100644 --- a/testing/integration/beat_receivers_test.go +++ b/testing/integration/beat_receivers_test.go @@ -1,7 +1,6 @@ // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. - //go:build integration package integration @@ -13,6 +12,7 @@ import ( "fmt" "io" "net/http" + "runtime" "strings" "testing" "text/template" @@ -42,6 +42,7 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { OS: []define.OS{ {Type: define.Linux}, {Type: define.Darwin}, + {Type: define.Windows}, }, Stack: &define.Stack{}, Sudo: true, @@ -56,69 +57,80 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { type test struct { dsType string dsDataset string - dsNamespace string - query map[string]any + query []map[string]any onlyCompareKeys bool ignoreFields []string } tests := []test{ { - dsType: "logs", - dsDataset: "elastic_agent", - dsNamespace: info.Namespace, - query: map[string]any{"match_phrase": map[string]any{"message": "Determined allowed capabilities"}}, + dsType: "logs", + dsDataset: "elastic_agent", + query: []map[string]any{ + {"match_phrase": map[string]any{"message": "Determined allowed capabilities"}}, + }, onlyCompareKeys: false, + ignoreFields: genIgnoredFields(runtime.GOOS), }, { - dsType: "metrics", - dsDataset: "elastic_agent.filebeat", - dsNamespace: info.Namespace, - query: map[string]any{"exists": map[string]any{"field": "beat.stats.libbeat.pipeline.queue.acked"}}, + dsType: "metrics", + dsDataset: "elastic_agent.filebeat", + query: []map[string]any{ + {"match_phrase": map[string]any{"metricset.name": "stats"}}, + {"match_phrase": map[string]any{"component.id": "filestream-monitoring"}}, + {"exists": map[string]any{"field": "beat.stats.libbeat.pipeline.queue.acked"}}, + }, onlyCompareKeys: true, ignoreFields: []string{ - // all process related metrics are dropped for beatreceivers + "beat.elasticsearch.cluster.id", "beat.stats.cgroup", "beat.stats.cpu", "beat.stats.handles", - "beat.stats.memstats", - "beat.stats.runtime", - "beat.elasticsearch.cluster.id", "beat.stats.libbeat.config", + "beat.stats.memstats", + "beat.stats.runtime.goroutines", }, }, { - dsType: "metrics", - dsDataset: "elastic_agent.metricbeat", - dsNamespace: info.Namespace, - query: map[string]any{"exists": map[string]any{"field": "beat.stats.libbeat.pipeline.queue.acked"}}, + dsType: "metrics", + dsDataset: "elastic_agent.metricbeat", + query: []map[string]any{ + {"match_phrase": map[string]any{"metricset.name": "stats"}}, + {"match_phrase": map[string]any{"component.id": "http/metrics-monitoring"}}, + {"exists": map[string]any{"field": "beat.stats.libbeat.pipeline.queue.acked"}}, + }, onlyCompareKeys: true, ignoreFields: []string{ - // all process related metrics are dropped for beatreceivers + "beat.elasticsearch.cluster.id", "beat.stats.cgroup", "beat.stats.cpu", "beat.stats.handles", - "beat.stats.memstats", - "beat.stats.runtime", - "beat.elasticsearch.cluster.id", "beat.stats.libbeat.config", + "beat.stats.memstats", + "beat.stats.runtime.goroutines", }, }, { dsType: "metrics", dsDataset: "elastic_agent.elastic_agent", - dsNamespace: info.Namespace, onlyCompareKeys: true, - query: map[string]any{"exists": map[string]any{"field": "system.process.memory.size"}}, + query: []map[string]any{ + {"match_phrase": map[string]any{"metricset.name": "json"}}, + {"match_phrase": map[string]any{"component.id": "elastic-agent"}}, + {"exists": map[string]any{"field": "system.process.memory.size"}}, + }, + }, + { + dsType: "metrics", + dsDataset: "elastic_agent.filebeat_input", + onlyCompareKeys: true, + query: []map[string]any{ + {"match_phrase": map[string]any{"metricset.name": "json"}}, + {"match_phrase": map[string]any{"component.id": "filestream-monitoring"}}, + {"exists": map[string]any{"field": "filebeat_input.bytes_processed_total"}}, + }, }, - // TODO: fbreceiver must support /inputs/ endpoint for this to work - // { - // dsType: "metrics", - // dsDataset: "elastic_agent.filebeat_input", - // dsNamespace: info.Namespace, - // query: map[string]any{"exists": map[string]any{"field": "filebeat_input.bytes_processed_total"}}, - // }, } installOpts := atesting.InstallOpts{ @@ -203,6 +215,9 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { d.ApiKey = string(apiKey) policy.Outputs["default"] = d + processNamespace := fmt.Sprintf("%s-%s", info.Namespace, "process") + policy.Agent.Monitoring["namespace"] = processNamespace + updatedPolicyBytes, err := yaml.Marshal(policy) require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy) t.Cleanup(func() { @@ -252,11 +267,16 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { func() bool { findCtx, findCancel := context.WithTimeout(ctx, 10*time.Second) defer findCancel() - + mustClauses := []map[string]any{ + {"match": map[string]any{"data_stream.type": tc.dsType}}, + {"match": map[string]any{"data_stream.dataset": tc.dsDataset}}, + {"match": map[string]any{"data_stream.namespace": processNamespace}}, + } + mustClauses = append(mustClauses, tc.query...) rawQuery := map[string]any{ "query": map[string]any{ "bool": map[string]any{ - "must": tc.query, + "must": mustClauses, "filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestamp}}}, }, }, @@ -265,16 +285,16 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { }, } - index := tc.dsType + "-" + tc.dsDataset + "-" + tc.dsNamespace - docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+index+"*", info.ESClient) + docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, tc.dsType+"-*", info.ESClient) require.NoError(t, err) if docs.Hits.Total.Value != 0 { - agentDocs[index] = docs + key := tc.dsType + "-" + tc.dsDataset + "-" + processNamespace + agentDocs[key] = docs } return docs.Hits.Total.Value > 0 }, 2*time.Minute, 5*time.Second, - "agent monitoring classic no documents found for timestamp: %s, type: %s, dataset: %s, namespace: %s, query: %v", timestamp, tc.dsType, tc.dsDataset, tc.dsNamespace, tc.query) + "agent monitoring classic no documents found for timestamp: %s, type: %s, dataset: %s, namespace: %s, query: %v", timestamp, tc.dsType, tc.dsDataset, processNamespace, tc.query) } // 3. Uninstall @@ -283,6 +303,8 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { // 4. switch monitoring to the otel runtime policy.Agent.Monitoring["_runtime_experimental"] = "otel" + receiverNamespace := fmt.Sprintf("%s-%s", info.Namespace, "otel") + policy.Agent.Monitoring["namespace"] = receiverNamespace updatedPolicyBytes, err = yaml.Marshal(policy) require.NoErrorf(t, err, "error marshalling policy, struct was %v", policy) t.Cleanup(func() { @@ -330,11 +352,17 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { func() bool { findCtx, findCancel := context.WithTimeout(ctx, 10*time.Second) defer findCancel() + mustClauses := []map[string]any{ + {"match": map[string]any{"data_stream.type": tc.dsType}}, + {"match": map[string]any{"data_stream.dataset": tc.dsDataset}}, + {"match": map[string]any{"data_stream.namespace": receiverNamespace}}, + } + mustClauses = append(mustClauses, tc.query...) rawQuery := map[string]any{ "query": map[string]any{ "bool": map[string]any{ - "must": tc.query, + "must": mustClauses, "filter": map[string]any{"range": map[string]any{"@timestamp": map[string]any{"gte": timestampBeatReceiver}}}, }, }, @@ -343,17 +371,16 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { }, } - index := tc.dsType + "-" + tc.dsDataset + "-" + tc.dsNamespace - docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, ".ds-"+index+"*", info.ESClient) + docs, err := estools.PerformQueryForRawQuery(findCtx, rawQuery, tc.dsType+"-*", info.ESClient) require.NoError(t, err) if docs.Hits.Total.Value != 0 { - key := tc.dsType + "-" + tc.dsDataset + "-" + tc.dsNamespace + key := tc.dsType + "-" + tc.dsDataset + "-" + receiverNamespace otelDocs[key] = docs } return docs.Hits.Total.Value > 0 }, 4*time.Minute, 5*time.Second, - "agent monitoring beats receivers no documents found for timestamp: %s, type: %s, dataset: %s, namespace: %s, query: %v", timestampBeatReceiver, tc.dsType, tc.dsDataset, tc.dsNamespace, tc.query) + "agent monitoring beats receivers no documents found for timestamp: %s, type: %s, dataset: %s, namespace: %s, query: %v", timestampBeatReceiver, tc.dsType, tc.dsDataset, receiverNamespace, tc.query) } // 6. Uninstall @@ -362,9 +389,8 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { // 7. Compare both documents are equivalent for _, tc := range tests[:3] { - key := tc.dsType + "-" + tc.dsDataset + "-" + tc.dsNamespace - agent := agentDocs[key].Hits.Hits[0].Source - otel := otelDocs[key].Hits.Hits[0].Source + agent := agentDocs[tc.dsType+"-"+tc.dsDataset+"-"+processNamespace].Hits.Hits[0].Source + otel := otelDocs[tc.dsType+"-"+tc.dsDataset+"-"+receiverNamespace].Hits.Hits[0].Source ignoredFields := []string{ // Expected to change between agentDocs and OtelDocs "@timestamp", @@ -373,18 +399,15 @@ func TestClassicAndReceiverAgentMonitoring(t *testing.T) { "agent.id", // agent.version is different because we force version 9.0.0 in CI "agent.version", + "data_stream.namespace", "elastic_agent.id", - "log.file.inode", - "log.file.fingerprint", - "log.file.path", - "log.offset", "event.ingested", } switch tc.onlyCompareKeys { case true: - AssertMapstrKeysEqual(t, agent, otel, append(ignoredFields, tc.ignoreFields...), fmt.Sprintf("expected document keys to be equal for dataset: %s", key)) + AssertMapstrKeysEqual(t, agent, otel, append(ignoredFields, tc.ignoreFields...), "expected document keys to be equal for "+tc.dsType+"-"+tc.dsDataset) case false: - AssertMapsEqual(t, agent, otel, ignoredFields, fmt.Sprintf("expected document to be equal for dataset: %s", key)) + AssertMapsEqual(t, agent, otel, append(ignoredFields, tc.ignoreFields...), "expected document to be equal for "+tc.dsType+"-"+tc.dsDataset) } } @@ -683,7 +706,6 @@ outputs: AssertMapstrKeysEqual(t, agentDoc, otelDoc, nil, "expected documents keys to be equal for metricset "+tt.metricset) AssertMapsEqual(t, agentDoc, otelDoc, ignoredFields, "expected documents to be equal for metricset "+tt.metricset) }) - } }) } @@ -695,3 +717,21 @@ func assertCollectorComponentsHealthy(t *assert.CollectT, status *atesting.Agent assertCollectorComponentsHealthy(t, componentStatus) } } + +func genIgnoredFields(goos string) []string { + switch goos { + case "windows": + return []string{ + "log.file.fingerprint", + "log.file.idxhi", + "log.file.idxlo", + } + default: + return []string{ + "log.file.device_id", + "log.file.fingerprint", + "log.file.inode", + "log.file.path", + } + } +} diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index e58750d9f62..3e9b152baf6 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1658,11 +1658,10 @@ func AssertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg flatM1.Delete(f) flatM2.Delete(f) } - require.Equal(t, "", cmp.Diff(flatM1, flatM2), "expected maps to be equal") + require.Zero(t, cmp.Diff(flatM1, flatM2), msg) } func AssertMapstrKeysEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg string) { - t.Helper() // Delete all ignored fields. for _, f := range ignoredFields { From eac406fe125287b4d14baec6e06fe78b98e47ee9 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Tue, 10 Jun 2025 09:44:36 -0500 Subject: [PATCH 2/3] updated monitoring tests for filebeat_input --- .../testdata/monitoring_config_full.yaml | 57 +++++++++++++++++++ .../application/monitoring/v1_monitor_test.go | 10 +--- 2 files changed, 58 insertions(+), 9 deletions(-) diff --git a/internal/pkg/agent/application/monitoring/testdata/monitoring_config_full.yaml b/internal/pkg/agent/application/monitoring/testdata/monitoring_config_full.yaml index ed34c1a4133..7ee1db80671 100644 --- a/internal/pkg/agent/application/monitoring/testdata/monitoring_config_full.yaml +++ b/internal/pkg/agent/application/monitoring/testdata/monitoring_config_full.yaml @@ -653,6 +653,63 @@ inputs: binary: filebeat id: filestream-monitoring target: component + - data_stream: + dataset: elastic_agent.filebeat_input + namespace: default + type: metrics + failure_threshold: 5 + hosts: + - placeholder + id: metrics-monitoring-filebeat-1 + index: metrics-elastic_agent.filebeat_input-default + json.is_array: true + metricsets: + - json + namespace: filebeat_input + path: /inputs/ + period: 1m0s + processors: + - add_fields: + fields: + dataset: elastic_agent.filebeat_input + target: event + - add_fields: + fields: + id: "" + process: filebeat + snapshot: false + version: placeholder + target: elastic_agent + - add_fields: + fields: + id: "" + target: agent + - copy_fields: + fail_on_error: false + fields: + - from: http.agent.beat.cpu + to: system.process.cpu + - from: http.agent.beat.memstats.memory_sys + to: system.process.memory.size + - from: http.agent.beat.handles + to: system.process.fd + - from: http.agent.beat.cgroup + to: system.process.cgroup + - from: http.agent.apm-server + to: apm-server + - from: http.filebeat_input + to: filebeat_input + ignore_missing: true + - drop_fields: + fields: + - http + - system + ignore_missing: true + - add_fields: + fields: + binary: filebeat + id: filestream-otel + target: component - data_stream: dataset: elastic_agent.elastic_agent namespace: default diff --git a/internal/pkg/agent/application/monitoring/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/v1_monitor_test.go index 6e126f865e5..8ab48f0d9e3 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor_test.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor_test.go @@ -141,7 +141,6 @@ func TestMonitoringWithEndpoint(t *testing.T) { Enabled: true, MonitorMetrics: true, HTTP: &monitoringcfg.MonitoringHTTPConfig{ - Enabled: true, }, }, @@ -213,7 +212,6 @@ func TestMonitoringWithEndpoint(t *testing.T) { require.Equal(t, uint64(1234), streamValues["process.pid"]) } } - } } } @@ -222,7 +220,6 @@ func TestMonitoringWithEndpoint(t *testing.T) { } func TestMonitoringConfigMetricsInterval(t *testing.T) { - agentInfo, err := info.NewAgentInfo(context.Background(), false) require.NoError(t, err, "Error creating agent info") components := []component.Component{{ID: "foobeat", InputSpec: &component.InputRuntimeSpec{BinaryName: "filebeat"}}} @@ -317,7 +314,6 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) { } for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { b := &BeatsMonitor{ enabled: true, @@ -370,7 +366,6 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) { } func TestMonitoringConfigMetricsFailureThreshold(t *testing.T) { - agentInfo, err := info.NewAgentInfo(context.Background(), false) require.NoError(t, err, "Error creating agent info") components := []component.Component{{ID: "foobeat", InputSpec: &component.InputRuntimeSpec{BinaryName: "filebeat"}}} @@ -552,7 +547,6 @@ func TestMonitoringConfigMetricsFailureThreshold(t *testing.T) { } for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { b := &BeatsMonitor{ enabled: true, @@ -603,7 +597,6 @@ func TestMonitoringConfigMetricsFailureThreshold(t *testing.T) { } func TestErrorMonitoringConfigMetricsFailureThreshold(t *testing.T) { - agentInfo, err := info.NewAgentInfo(context.Background(), false) components := []component.Component{{ID: "foobeat", InputSpec: &component.InputRuntimeSpec{BinaryName: "filebeat"}}} require.NoError(t, err, "Error creating agent info") @@ -729,7 +722,6 @@ func TestErrorMonitoringConfigMetricsFailureThreshold(t *testing.T) { } for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { b := &BeatsMonitor{ enabled: true, @@ -936,7 +928,7 @@ func TestMonitoringConfigForBeatsReceivers(t *testing.T) { } } } - assert.Len(t, streamsForInputMetrics, 2) + assert.Len(t, streamsForInputMetrics, 3) } func TestMonitoringWithOtelRuntime(t *testing.T) { From d403bf97e4ce4b4dd7a637cb61b9de4c68b0e555 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Wed, 11 Jun 2025 11:46:42 -0500 Subject: [PATCH 3/3] add "log.offset" to ignored fields --- testing/integration/beat_receivers_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/testing/integration/beat_receivers_test.go b/testing/integration/beat_receivers_test.go index e20d9c9185c..87c9b9ca141 100644 --- a/testing/integration/beat_receivers_test.go +++ b/testing/integration/beat_receivers_test.go @@ -725,6 +725,7 @@ func genIgnoredFields(goos string) []string { "log.file.fingerprint", "log.file.idxhi", "log.file.idxlo", + "log.offset", } default: return []string{ @@ -732,6 +733,7 @@ func genIgnoredFields(goos string) []string { "log.file.fingerprint", "log.file.inode", "log.file.path", + "log.offset", } } }