Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prometheus receiver: Memory Leak #31591

Open
henrikrexed opened this issue Mar 5, 2024 · 16 comments
Open

prometheus receiver: Memory Leak #31591

henrikrexed opened this issue Mar 5, 2024 · 16 comments
Labels
bug Something isn't working receiver/prometheus Prometheus receiver

Comments

@henrikrexed
Copy link

Component(s)

No response

What happened?

Description

After running several benchmarks to compare fluentbit and the opentelemetry collector.
i discovered a memory leak on the collector when using the prometheus receiver ( using a scrape config kubernetes_sd_config)

Steps to Reproduce

Here is the repo using all the assets for my tests:
https://github.com/isItObservable/fluentbit-vs-collector

Expected Result

The memory usage of the collector should be the same with the same load.

Actual Result

When running a 24h test with constant load, the collector is consuming 10g of RAM and then crash.
benchmarks

Collector version

v0.90.0

Environment information

Environment

OS: (e.g., "Ubuntu 20.04")
GKE cluster with --machine-type=e2-standard-4 --num-nodes=2

OpenTelemetry Collector configuration

receivers:
      prometheus:
        config:
          scrape_configs:
          - job_name: opentelemetry-collector
            scrape_interval: 5s
            static_configs:
            - targets:
              - ${MY_POD_IP}:8888
          - job_name: kepler
            scrape_interval: 5s
            static_configs:
            - targets:
              - kepler.kepler.svc.cluster.local:9102
            relabel_configs:
            - source_labels: [__name__]
              regex: 'kepler_process_uncore_joules'
              action: drop
            - source_labels: [__name__]
              regex: 'go_*'
              action: drop
            - action: labeldrop
              regex: container_id
            - action: labeldrop
              regex: pid
          - job_name: kubesatemetrics
            scrape_interval: 5s
            static_configs:
            - targets:
              - prometheus-kube-state-metrics.default.svc.cluster.local:8080
          - job_name: node-exporter
            scrape_interval: 5s
            static_configs:
            - targets:
              - prometheus-prometheus-node-exporter.default.svc.cluster.local:9100
          - job_name: kubernetes-pods
            kubernetes_sd_configs:
            - role: pod
            relabel_configs:
            - action: keep
              regex: true
              source_labels:
              - __meta_kubernetes_pod_annotation_prometheus_io_scrape
            - action: replace
              regex: (https?)
              source_labels:
              - __meta_kubernetes_pod_annotation_prometheus_io_scheme
              target_label: __scheme__
            - action: replace
              regex: (.+)
              source_labels:
              - __meta_kubernetes_pod_annotation_prometheus_io_path
              target_label: __metrics_path__
            - action: replace
              regex: ([^:]+)(?::\d+)?;(\d+)
              replacement: $1:$2
              source_labels:
              - __address__
              - __meta_kubernetes_pod_annotation_prometheus_io_port
              target_label: __address__
            - action: labelmap
              regex: __meta_kubernetes_pod_label_(.+)
            - action: replace
              source_labels:
              - __meta_kubernetes_namespace
              target_label: kubernetes_namespace
            - action: replace
              source_labels:
              - __meta_kubernetes_pod_name
              target_label: kubernetes_pod_name
            - action: drop
              regex: Pending|Succeeded|Failed
              source_labels:
              - __meta_kubernetes_pod_phase
      filelog:
        include:
          - /var/log/pods/*/*/*.log
        start_at: beginning
        include_file_path: true
        include_file_name: false
        operators:
          # Find out which format is used by kubernetes
          - type: add
            id: receivetiming
            field: resource["receiverTime"]
            value: 'EXPR(now().UnixMicro())'
          - type: router
            id: get-format
            routes:
              - output: parser-docker
                expr: 'body matches "^\\{"'
              - output: parser-crio
                expr: 'body matches "^[^ Z]+ "'
              - output: parser-containerd
                expr: 'body matches "^[^ Z]+Z"'
          # Parse CRI-O format
          - type: regex_parser
            id: parser-crio
            regex: '^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$'
            output: extract_metadata_from_filepath
            timestamp:
              parse_from: attributes.time
              layout_type: gotime
              layout: '2006-01-02T15:04:05.999999999Z07:00'
          # Parse CRI-Containerd format
          - type: regex_parser
            id: parser-containerd
            regex: '^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) ?(?P<log>.*)$'
            output: extract_metadata_from_filepath
            timestamp:
              parse_from: attributes.time
              layout: '%Y-%m-%dT%H:%M:%S.%LZ'
          # Parse Docker format
          - type: json_parser
            id: parser-docker
            output: extract_metadata_from_filepath
            timestamp:
              parse_from: attributes.time
              layout: '%Y-%m-%dT%H:%M:%S.%LZ'
          - type: move
            from: attributes.log
            to: body
          # Extract metadata from file path
          - type: regex_parser
            id: extract_metadata_from_filepath
            regex: '^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]{36})\/(?P<container_name>[^\._]+)\/(?P<restart_count>\d+)\.log$'
            parse_from: attributes["log.file.path"]
            cache:
              size: 128  # default maximum amount of Pods per Node is 110
          # Rename attributes
          - type: move
            from: attributes.stream
            to: attributes["log.iostream"]
          - type: move
            from: attributes.container_name
            to: resource["k8s.container.name"]
          - type: move
            from: attributes.namespace
            to: resource["k8s.namespace.name"]
          - type: move
            from: attributes.pod_name
            to: resource["k8s.pod.name"]
          - type: move
            from: attributes.restart_count
            to: resource["k8s.container.restart_count"]
          - type: move
            from: attributes.uid
            to: resource["k8s.pod.uid"]
    
      
      
      otlp:
        protocols:
          grpc:
          http:


    processors:
      batch:
        send_batch_max_size: 1000
        timeout: 30s
        send_batch_size : 800
    
      transform/setstarttime:
        log_statements:
              context: log
              statements:
                - set(resource.attributes["processing.startime"],UnixMicro(Now()))  where resource.attributes["receiverTime"] != nil
    
      transform/setendtime:
        log_statements:
              context: log
              statements:
                - set(resource.attributes["processing.endtime"],UnixMicro(Now())) where resource.attributes["receiverTime"] != nil
    
      cumulativetodelta:
    
      filter:
        error_mode: ignore
        metrics:
          metric:
            - 'type == METRIC_DATA_TYPE_HISTOGRAM'
    
      k8sattributes:
        auth_type: "serviceAccount"
        passthrough: false
        filter:
           node_from_env_var: K8S_NODE_NAME
        extract:
          metadata:
            - k8s.pod.name
            - k8s.pod.uid
            - k8s.deployment.name
            - k8s.namespace.name
            - k8s.node.name
            - k8s.pod.start_time
          # Pod labels which can be fetched via K8sattributeprocessor
          labels:
            - tag_name: key1
              key: label1
              from: pod
            - tag_name: key2
              key: label2
              from: pod
        # Pod association using resource attributes and connection
        pod_association:
          - sources:
             - from: resource_attribute
               name: k8s.pod.uid
             - from: resource_attribute
               name: k8s.pod.name
          - sources:
             - from: connection
      memory_limiter:
        check_interval: 1s
        limit_percentage: 70
        spike_limit_percentage: 30
      
      resource:
        attributes:
        - key: k8s.cluster.name 
          value: $CLUSTERNAME
          action: insert
        - key: dt.kubernetes.cluster.id
          value: $CLUSTER_ID
          action: insert
    
      transform:
        log_statements:
          context: log
          statements:
            - merge_maps(cache,ExtractPatterns(attributes["log.file.path"],"^.*/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\\-]{36})/(?P<container_name>[^\\._]+)/(?P<restart_count>\\d+)\\.log$"), "upsert") where attributes["log.file.path"] != nil
            - set(resource.attributes["k8s.namespace"],cache["namespace"]) where cache["namespace"]!= nil
            - set(resource.attributes["k8s.pod.name"],cache["pod_name"]) where cache["pod_name"]!= nil
            - set(resource.attributes["k8s.pod.uid"],cache["uid"]) where cache["uid"]!= nil
            - set(resource.attributes["k8s.container.name"],cache["container_name"]) where cache["uid"]!= nil
      transform/docker:
        log_statements:
          context: log
          statements:
            - merge_maps(cache,ParseJSON(body), "upsert") where body!= nil
            - set(body,cache["log"]) where cache["log"] != nil
      transform/metrics:
        metric_statements:
          - context: metric
            statements:
              - replace_pattern(unit, "_", "") where IsMatch(unit,".*[_]{1}.*")
              - replace_pattern(name,"^(.*)$","longer.name.$$1") where Len(name) <= 4
              - set(resource.attributes["cumulative"],"true") where aggregation_temporality == AGGREGATION_TEMPORALITY_CUMULATIVE
            
      transform/crio:
        log_statements:
          context: log
          statements:
            - merge_maps(cache,ExtractPatterns(body,"^(?P<time>[^Z]+)Z (?P<stream>stdout|stderr) (?P<logtag>[^\\s]*) ?(?P<log>.*)$"), "upsert") where body != nil
            - set(body,cache["log"]) where cache["log"] != nil      
      transform/containerd:
        log_statements:
          context: log
          statements:
            - merge_maps(cache,ExtractPatterns(body,"^(?P<time>[^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^\\s]*) ?(?P<log>.*)$"), "upsert") where body != nil
            - merge_maps(cache,ExtractPatterns(body,"^(?P<time>\\d+/\\d+/\\d+\\s+\\d+:\\d+\\d+) (?P<log>.*)$"), "upsert") where attributes["log_name"]!= "MeshAccessLog" and cache["log"]!= nil and not IsMap(cache["log"])
            - set(body,cache["log"]) where cache["log"] != nil
            - merge_maps(cache,ParseJSON(body), "upsert") where IsMap(body)
            - set(body,cache["message"]) where cache["message"] != nil
            - set(body,cache["msg"]) where cache["msg"] != nil
            - set(severity_text,cache["level"]) where cache["level"] != nil
            - set(severity_text,cache["severity"]) where cache["severity"] != nil
            - set(severity_number,SEVERITY_NUMBER_INFO) where cache["level"] == "INFO"
            - set(severity_number,SEVERITY_NUMBER_INFO) where cache["severity"] == "info"
            - set(attributes["loggerName"],cache["loggerName"]) where cache["loggerName"] != nil
    connectors:
      routing:
        default_pipelines:  [logs/default]
        error_mode: ignore
        table:
          - statement: route() where attributes["container.runtime"] == "crio"
            pipelines: [logs/crio]
          - statement: route() where attributes["container.runtime"] == "docker"
            pipelines: [logs/docker]
          - statement: route() where attributes["container.runtime"] == "containerd"
            pipelines: [logs/containerd]
      routing/metrics:
        default_pipelines: [metrics/default]
        error_mode: ignore
        table:
          - statement: route() where attributes["cumulative"]=="true"
            pipelines: [metrics/conversion]
    
    exporters:
      logging:
        verbosity: detailed
     
      otlphttp:
        endpoint: $DT_ENDPOINT/api/v2/otlp
        headers:
          Authorization: "Api-Token $DT_API_TOKEN"
      
    
    
    service:
      pipelines:
        logs:
          receivers: [filelog,otlp]
          processors: [transform/setstarttime,memory_limiter,k8sattributes,resource,transform/setendtime,batch]
          exporters: [otlphttp]
        metrics:
          receivers: [otlp,prometheus]
          processors: [memory_limiter,filter,resource, transform/metrics,k8sattributes]
          exporters: [routing/metrics]
        traces:
          receivers: [otlp]
          processors: [memory_limiter,k8sattributes,resource,batch]
          exporters: [otlphttp]
        metrics/default:
          receivers: [routing/metrics]
          processors: [batch]
          exporters: [otlphttp]
        metrics/conversion:
          receivers: [routing/metrics]
          processors: [cumulativetodelta,batch]
          exporters: [otlphttp]
      telemetry:
        metrics:
          address: $MY_POD_IP:8888

Log output

No response

Additional context

No response

@henrikrexed henrikrexed added bug Something isn't working needs triage New item requiring triage labels Mar 5, 2024
@crobert-1 crobert-1 added the receiver/prometheus Prometheus receiver label Mar 5, 2024
@crobert-1
Copy link
Member

Hello @henrikrexed, is there a specific reason you believe this to be caused by the Prometheus receiver instead of any of the other components included in your configuration?

Copy link
Contributor

github-actions bot commented Mar 5, 2024

Pinging code owners for receiver/prometheus: @Aneurysm9 @dashpole. See Adding Labels via Comments if you do not have permissions to add labels yourself.

@henrikrexed
Copy link
Author

i have done test with only logs , logs and traces, and then adding prometheus receiver.
the memory consumptions only explodes when i'm adding the promehteus receiver.

BTW when using the target allocator , the memory consumption is much more stable.

@dashpole
Copy link
Contributor

dashpole commented Mar 5, 2024

Would you mind providing a simpler reproduction case? Ideally with just the prometheus receiver + otlp exporter, and a single job in the prometheus receiver that scrapes a workload we can run ourselves (e.g. node exporter, KSM).

@dashpole dashpole removed the needs triage New item requiring triage label Mar 5, 2024
@henrikrexed
Copy link
Author

let me build a collector pipeline with only metrics ( but i want to run it during 24h to confirm the behavior).
I won't be able to run this test until tomorrow, ( i have a soak test running with the target allocator , to confirm previous results).

@henrikrexed
Copy link
Author

Hi,

so i have created a statefulset collector with 2 replicas , that is only collecting prometheus metrics and enriching the data.
after 10h both replicas consumes 8G or RAM. i will let the test run a bit and then re run the same test with the ballast extension.
here is the Collector CRD used;

apiVersion: opentelemetry.io/v1alpha1
kind: OpenTelemetryCollector
metadata:
  name: otel
  labels:
    app: opentelemetry
spec:
  mode: statefulset
  replicas: 2
  serviceAccount: otelcontribcol
  image: otel/opentelemetry-collector-contrib:0.90.0
  ports:
    - name: prometheus
      port: 9090
      targetPort: 9090
  env:
    - name: CLUSTER_ID
      valueFrom:
        secretKeyRef:
          name: dynatrace
          key: clusterid
    - name: MY_POD_IP
      valueFrom:
        fieldRef:
          apiVersion: v1
          fieldPath: status.podIP
    - name: K8S_NODE_NAME
      valueFrom:
        fieldRef:
          fieldPath: spec.nodeName
    - name: DT_ENDPOINT
      valueFrom:
        secretKeyRef:
          name: dynatrace
          key: dynatrace_oltp_url
    - name: DT_API_TOKEN
      valueFrom:
        secretKeyRef:
          name: dynatrace
          key: dt_api_token
    - name: CLUSTERNAME
      valueFrom:
        secretKeyRef:
          name: dynatrace
          key: clustername
  config: |
    receivers:
      prometheus:
        config:
          scrape_configs:
          - job_name: opentelemetry-collector
            scrape_interval: 5s
            static_configs:
            - targets:
              - ${MY_POD_IP}:8888
          - job_name: kepler
            scrape_interval: 5s
            static_configs:
            - targets:
              - kepler.kepler.svc.cluster.local:9102
            relabel_configs:
            - source_labels: [__name__]
              regex: 'kepler_process_uncore_joules'
              action: drop
            - source_labels: [__name__]
              regex: 'go_*'
              action: drop
            - action: labeldrop
              regex: container_id
            - action: labeldrop
              regex: pid
          - job_name: kubesatemetrics
            scrape_interval: 5s
            static_configs:
            - targets:
              - prometheus-kube-state-metrics.default.svc.cluster.local:8080
          - job_name: node-exporter
            scrape_interval: 5s
            static_configs:
            - targets:
              - prometheus-prometheus-node-exporter.default.svc.cluster.local:9100
          - job_name: kubernetes-pods
            kubernetes_sd_configs:
            - role: pod
            relabel_configs:
            - action: keep
              regex: true
              source_labels:
              - __meta_kubernetes_pod_annotation_prometheus_io_scrape
            - action: replace
              regex: (https?)
              source_labels:
              - __meta_kubernetes_pod_annotation_prometheus_io_scheme
              target_label: __scheme__
            - action: replace
              regex: (.+)
              source_labels:
              - __meta_kubernetes_pod_annotation_prometheus_io_path
              target_label: __metrics_path__
            - action: replace
              regex: ([^:]+)(?::\d+)?;(\d+)
              replacement: $1:$2
              source_labels:
              - __address__
              - __meta_kubernetes_pod_annotation_prometheus_io_port
              target_label: __address__
            - action: labelmap
              regex: __meta_kubernetes_pod_label_(.+)
            - action: replace
              source_labels:
              - __meta_kubernetes_namespace
              target_label: kubernetes_namespace
            - action: replace
              source_labels:
              - __meta_kubernetes_pod_name
              target_label: kubernetes_pod_name
            - action: drop
              regex: Pending|Succeeded|Failed
              source_labels:
              - __meta_kubernetes_pod_phase
     
          



    processors:
    
      cumulativetodelta:
    
     
      filter:
        error_mode: ignore
        metrics:
          metric:
            - 'type == METRIC_DATA_TYPE_HISTOGRAM'
            - 'IsMatch(name, "kafka.consumer.*")'
      batch:
        send_batch_max_size: 1000
        timeout: 30s
        send_batch_size : 800
      
      transform/metrics:
        metric_statements:
          - context: metric
            statements:
            - set(resource.attributes["k8s.pod.name"],resource.attributes["pod_name"]) where  resource.attributes["pod_name"]!= nil
            - set(resource.attributes["k8s.namespace.name"], resource.attributes["container_namespace"]) where  resource.attributes["container_namespace"] != nil
            - replace_pattern(name,"^(.*)$","longer.name.$$1") where Len(name) <= 4
            - set(resource.attributes["cumulative"],"true") where aggregation_temporality == AGGREGATION_TEMPORALITY_CUMULATIVE
          - context: datapoint
            statements:
            - set(attributes["k8s.pod.name"],attributes["pod_name"]) where  attributes["pod_name"]!= nil
            - set(attributes["k8s.namespace.name"], attributes["container_namespace"]) where  attributes["container_namespace"] != nil
      k8sattributes:
        auth_type: "serviceAccount"
        passthrough: false
        filter:
           node_from_env_var: K8S_NODE_NAME
        extract:
          metadata:
            - k8s.pod.name
            - k8s.pod.uid
            - k8s.deployment.name
            - k8s.namespace.name
            - k8s.node.name
            - k8s.pod.start_time
          # Pod labels which can be fetched via K8sattributeprocessor
          labels:
            - tag_name: key1
              key: label1
              from: pod
            - tag_name: key2
              key: label2
              from: pod
        # Pod association using resource attributes and connection
        pod_association:
          - sources:
             - from: resource_attribute
               name: k8s.pod.name
          - sources:
             - from: connection
      memory_limiter:
        check_interval: 1s
        limit_percentage: 70
        spike_limit_percentage: 30
    
      resource:
        attributes:
        - key: k8s.cluster.name 
          value: $CLUSTERNAME
          action: insert
        - key: dt.kubernetes.cluster.id
          value: $CLUSTER_ID
          action: insert
      
    
    exporters:
      
     
      otlphttp:
        endpoint: $DT_ENDPOINT/api/v2/otlp
        headers:
          Authorization: "Api-Token $DT_API_TOKEN"
    
    connectors:
      routing/metrics:
          default_pipelines: [metrics/default]
          error_mode: ignore
          table:
            - statement: route() where attributes["cumulative"]=="true"
              pipelines: [metrics/conversion]
    
     extensions:
        memory_ballast:
          size_in_percentage: 20
        zpages:
          endpoint: "$MY_POD_IP:55679"
    service:
     
      pipelines:     
        extensions: memory_ballast
        metrics:
          receivers: [prometheus]
          processors: [memory_limiter,filter, resource,transform/metrics,k8sattributes]
          exporters: [routing/metrics]
        metrics/conversion:
         receivers: [routing/metrics]
         processors: [cumulativetodelta,batch]
         exporters: [otlphttp]
        metrics/default:
          receivers: [routing/metrics]
          processors: [batch]
          exporters: [otlphttp]
      telemetry:
        metrics:
          address: $MY_POD_IP:8888
![collector_process](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/81800915/a8475950-6931-452d-9730-117bbd18c7c2)
![ressource_usage](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/81800915/cd79fc1a-6b2c-4517-a51c-d30b8293f847)

    I also added few graph of the test.

ingest

@henrikrexed
Copy link
Author

ressource_usage

@henrikrexed
Copy link
Author

collector_process

@henrikrexed
Copy link
Author

here are the results of the same test but with the ballast extension.
ressource_ballast

@henrikrexed
Copy link
Author

collector_process_ballazst

@dashpole
Copy link
Contributor

@braydonk, who was looking into memory usage recently.

@braydonk
Copy link
Contributor

Hey @henrikrexed,

I'm wondering if you might be able to gather some profiles from these collectors if possible. My initial guess for this steady increase is the shape of the metrics causing the scrape cache to grow continuously, but I can't easily verify this without more info, and I hope profiles will help.

These are the steps to get the info:

  1. Add the pprofextension to your collector build (if you're just using contrib this will already be there)
  2. Add the following to the collector config posted above:
extensions:
  pprof:
...
service:
  extensions: [pprof] # along with whatever other extensions you have

This starts a pprof endpoint on <host>:1777.
3. Pull a heap profile and goroutine profile:

wget http://<host>:1777/debug/pprof/heap
wget http://<host>:1777/debug/pprof/goroutine

If you're running the test over a long period of time, maybe doing this hourly would be good with a scheduled job of some kind.

With these profiles I can take a look at the differences over time to see what part of the heap is growing/verify there aren't leaking goroutines.

@henrikrexed
Copy link
Author

like promised during kubecon, i have run test with a scrape config:

  • job_name: kepler
    scrape_interval: 5s
    static_configs:
    - targets:
    - kepler.kepler.svc.cluster.local:9102
    relabel_configs:
    - source_labels: [name]
    regex: 'kepler_process_uncore_joules'
    action: drop
    - source_labels: [name]
    regex: 'scrape_'
    action: drop
    - source_labels: [name]
    regex: 'go_
    '
    action: drop
    - source_labels: [ name ]
    regex: 'up_'
    action: drop
    - action: labeldrop
    regex: container_id
    - action: labeldrop
    regex: pid
    - action: replace
    source_labels: [ pod_name ]
    target_label: kubernetes_pod_name
    - action: replace
    source_labels: [ container_namespace ]
    target_label: kubernetes_namespace_name
    - action: replace
    source_labels: [ exported_instance ]
    target_label: kubernetes_node_name
    and then with kuberbenetes_sd_config ( for envoy) :
  • job_name: kubernetes-pods
    kubernetes_sd_configs:
    - role: pod
    relabel_configs:
    - action: keep
    regex: true
    source_labels:
    - __meta_kubernetes_pod_annotation_prometheus_io_scrape
    - action: replace
    regex: (https?)
    source_labels:
    - __meta_kubernetes_pod_annotation_prometheus_io_scheme
    target_label: scheme
    - action: replace
    regex: (.+)
    source_labels:
    - __meta_kubernetes_pod_annotation_prometheus_io_path
    target_label: metrics_path
    - action: replace
    regex: ([^:]+)(?::\d+)?;(\d+)
    replacement: $1:$2
    source_labels:
    - address
    - __meta_kubernetes_pod_annotation_prometheus_io_port
    target_label: address
    - action: labelmap
    regex: _meta_kubernetes_pod_label(.+)
    - action: replace
    source_labels:
    - __meta_kubernetes_namespace
    target_label: kubernetes_namespace
    - action: replace
    source_labels:
    - __meta_kubernetes_pod_name
    target_label: kubernetes_pod_name
    - action: drop
    regex: Pending|Succeeded|Failed
    source_labels:
    - __meta_kubernetes_pod_phase
    - job_name: 'istiod'
    kubernetes_sd_configs:
    - role: endpoints
    namespaces:
    names:
    - istio-system
    relabel_configs:
    - source_labels: [__meta_kubernetes_service_name, __meta_kubernetes_endpoint_port_name]
    action: keep
    regex: istiod;http-monitoring
    - job_name: 'envoy-stats'
    metrics_path: /stats/prometheus
    kubernetes_sd_configs:
    - role: pod
    relabel_configs:
    - source_labels: [__meta_kubernetes_pod_container_port_name]
    action: keep
    regex: '.*-envoy-prom'

both tests had pprof enabled.
the first test is showing the memory leak , and the other one has a memory usage that is "normal".
it seems that the relabeling config witn labeldrop could bring this memorye pressure.

i have more than 12 pprof files. how can i share those?

@braydonk
Copy link
Contributor

I think it should be possible to attach the profile files directly in a GitHub comment. If not, perhaps you could email or Slack them to me, since I'm not sure if other folks will care as much to see the profiles themselves.

@braydonk
Copy link
Contributor

braydonk commented Apr 13, 2024

Thanks @henrikrexed for sending the profiles over to. I took a look last night. I can see where the memory is growing, but I don't understand why.

The growing region of memory is the cache of metric identities in the cumulativetodeltaprocessor:

func (t *MetricTracker) Convert(in MetricPoint) (out DeltaValue, valid bool) {
metricID := in.Identity
metricPoint := in.Value
if !metricID.IsSupportedMetricType() {
return
}
// NaN is used to signal "stale" metrics.
// These are ignored for now.
// https://github.com/open-telemetry/opentelemetry-collector/pull/3423
if metricID.IsFloatVal() && math.IsNaN(metricPoint.FloatValue) {
return
}
b := identityBufferPool.Get().(*bytes.Buffer)
b.Reset()
metricID.Write(b)
hashableID := b.String()
identityBufferPool.Put(b)
s, ok := t.states.LoadOrStore(hashableID, &State{
PrevPoint: metricPoint,
})

The metric cache keys are a hash of the tracking.MetricIdentity type:

type MetricIdentity struct {
Resource pcommon.Resource
InstrumentationLibrary pcommon.InstrumentationScope
MetricType pmetric.MetricType
MetricIsMonotonic bool
MetricName string
MetricUnit string
StartTimestamp pcommon.Timestamp
Attributes pcommon.Map
MetricValueType pmetric.NumberDataPointValueType
}

If any values of the MetricIdentity change, that would change the hash causing it to be recognized as a new metric and added to the cache. So if these values are continuously changing for some metrics, then the cache could constantly grow as it adds new state entries.

This is where I get lost though, I tried various things to reproduce this setup short of spinning up an actual k8s cluster with the identical configured apps, but I never got this scenario where the cumulativetodelta cache constantly grows in any of my setups. So at this point it's unclear to me if this is a legitimate problem with how the receivers/processors are interacting in this scenario, or if this is some really hard to spot configuration footgun.

The biggest hunch that I have is that the StartTimestamp in the metric identity is the most likely thing to be constantly changing and messing with the cache. But I can't imagine why it's changing.

@braydonk
Copy link
Contributor

Adding a recommendation here; I would recommend using the max_staleness configuration on the cumulativetodelta processor. With scrape times of 5s, a max_staleness of 1m would probably be a good idea. This means if there's cardinality that got leaked, the state of old cardinality won't be tracked infinitely.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working receiver/prometheus Prometheus receiver
Projects
None yet
Development

No branches or pull requests

4 participants