Skip to content

Elasticsearch sink always using Kafka timestamp as @timestamp #22879

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
xavirg opened this issue Apr 15, 2025 · 2 comments
Open

Elasticsearch sink always using Kafka timestamp as @timestamp #22879

xavirg opened this issue Apr 15, 2025 · 2 comments
Labels
sink: elasticsearch Anything `elasticsearch` sink related source: kafka Anything `kafka` source related type: bug A code related bug.

Comments

@xavirg
Copy link

xavirg commented Apr 15, 2025

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

When using the Elasticsearch sink data_stream mode, Vector has a remap method to create the ELK @timestamp field before pushing the event:

if let Some(cfg) = mode.as_data_stream_config() {
cfg.sync_fields(&mut log);
cfg.remap_timestamp(&mut log);
};

/// If there is a `timestamp` field, rename it to the expected `@timestamp` for Elastic Common Schema.
pub fn remap_timestamp(&self, log: &mut LogEvent) {
if let Some(timestamp_key) = log.timestamp_path().cloned() {
if timestamp_key.to_string() == DATA_STREAM_TIMESTAMP_KEY {
return;
}
log.rename_key(&timestamp_key, event_path!(DATA_STREAM_TIMESTAMP_KEY));
}
}

However, I've found that @timestamp value is always the Kafka message time (i.e. %kafka.timestamp), even if the global timestamp field is set to anything else:

log_schema:
  timestamp_key: "custom_time"

Which I would expect to change timestamp_key = "custom_time" at remap_timestamp method, so its value is used for the new @timestamp.

Right now, to bypass this situation, I'm manually overwriting %kafka.timestamp in a transform:

transforms:
  timestamp_fix:
    type: "remap"
    inputs:
      - "kafka"
    source: |-
      %kafka.timestamp = .custom_time

Configuration

acknowledgements:
  enabled: true
schema:
  log_namespace: true
log_schema:
  timestamp_key: "custom_time"

sources:
  kafka:
    type: kafka
    bootstrap_servers: "kafka:9092"
    topics:
    - logging.service.v2.ingestion
    session_timeout_ms: 10000
    drain_timeout_ms: 5000
    commit_interval_ms: 5000
    librdkafka_options:
      partition.assignment.strategy: "cooperative-sticky"
      enable.auto.commit: "true"
      heartbeat.interval.ms: "3000"
    group_id: "test.groupid"
    tls:
      enabled: false
    decoding:
      codec: "json"

sinks:
  opensearch:
    type: elasticsearch
    api_version: v8
    inputs:
      - kafka
    endpoints:
      - "https://opensearch:9200/"
    auth:
      strategy: "basic"
      user: "foo"
      password: "bar"
    mode: data_stream
    bulk:
      action: "create"
    data_stream:
      type: "service"
      dataset: "v2"
      namespace: ""
    batch:
      max_bytes: 10000000
    request:
      timeout_secs: 60
    tls:
      verify_certificate: false
      verify_hostname: false

Version

0.46

Debug Output


Example Data

Event ingested at the Kafka topic:

{
	"Value": "{\"custom_time\":\"2025-04-14T07:19:24.649687061Z\",\"log\":\"Failed to fetch the Moon\\n\"}",
	"Offset": 1,
	"Key": null,
	"Partition": 1,
	"Headers": {},
	"Timestamp": "2025-04-15T09:52:34.778Z"
}

Vector's console output:

{"custom_time":"2025-04-14T07:19:24.649687061Z","log":"Failed to fetch the Moon\n"}

Resulting Elasticsearch document:

{
  "_index": "service-v2",
  "_id": "YyzdOJYB-GTU1tpUZXPf",
  "_score": 1,
  "_source": {
    "@timestamp": "2025-04-15T09:52:34.778Z",
    "custom_time": "2025-04-14T07:19:24.649687061Z",
    "data_stream": {
      "dataset": "v2",
      "namespace": "",
      "type": "service"
    },
    "log": "Failed to fetch the Moon\n"
  },
  "fields": {
    "@timestamp": [
      "2025-04-15T09:52:34.778Z"
    ]
  }
}

Additional Context

No response

References

No response

@xavirg xavirg added the type: bug A code related bug. label Apr 15, 2025
@pront
Copy link
Member

pront commented Apr 15, 2025

Hi @xavirg,

I noticed you set log_namespace: true which changes how Vector accesses the timestamp field.

Basically the following part of the config, is only relevant when namespacing is OFF (default):

log_schema:
  timestamp_key: "custom_time"

Instead you have to use set_semantic_meaning:

set_semantic_meaning(.custom_time, "timestamp")

Implementation here:

    /// Fetches the "timestamp" path of the event. This is either from the "timestamp" semantic meaning (Vector namespace)
    /// or from the timestamp key set on the "Global Log Schema" (Legacy namespace).
    pub fn timestamp_path(&self) -> Option<&OwnedTargetPath> {
        match self.namespace() {
            LogNamespace::Vector => self.find_key_by_meaning("timestamp"),
            LogNamespace::Legacy => log_schema().timestamp_key_target_path(),
        }
    }

This comes up every now and then. There are more details in this issue: #22341 (comment)

@pront pront closed this as completed Apr 15, 2025
@xavirg
Copy link
Author

xavirg commented Apr 16, 2025

@pront

I tried with log namespacing disabled and still does not work as expected (Kafka timestamp is sent to Elasticsearch sink).

Can you please try to reproduce it at least?

Maybe you closed this issue too fast.

@pront pront reopened this Apr 16, 2025
@pront pront added sink: elasticsearch Anything `elasticsearch` sink related source: kafka Anything `kafka` source related labels Apr 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sink: elasticsearch Anything `elasticsearch` sink related source: kafka Anything `kafka` source related type: bug A code related bug.
Projects
None yet
Development

No branches or pull requests

2 participants