Add kmsg receiver for Linux kernel log messages#309
Add kmsg receiver for Linux kernel log messages#309mheffner merged 8 commits intostreamfold:mainfrom
Conversation
ec31ead to
a12db1a
Compare
|
I'll fix conflicts once #310 is merged (otherwise tests are flaky). |
a12db1a to
53c17c9
Compare
Done |
mheffner
left a comment
There was a problem hiding this comment.
This looks like a great addition to me, thanks for adding this.
At some point would we want the ability to save the current offset so that rotel would begin reading at the same location in kmsg after a restart of rotel? This could happen similarly to the file receiver's persistence. However, I'm guessing on an IoT device restart there's a chance that the persistence file location is wiped.
Excellent idea, added in 36d2084 |
@neeme-praks-sympower, nice addition. Not a blocker for now but in the future you might want to leverage the end-to-end message acknowledgement feature to "track" the pending offsets at the receiver and acknowledge them once they've actually been exported. That way you'll get at-least-once semantics. Right now if you crash, you might update your offset before the messages are published and on restart you won't re-read them so you'll lose some messages, essentially at-most-once delivery. Checkout the Kafka receiver or File receiver for examples on how to wire in offset Metadata, handle message acknowledgement from the exporter, and track offsets acknowledgement to drive offset state persistence. |
mheffner
left a comment
There was a problem hiding this comment.
Looks good, thanks for taking a look at the feedback!
One small thing: I removed the integration-tests feature flag and moved it behind an env var to decouple it from build features, so I think I broke the merge. You can see the change in this commit, could you model a similar KMSG_INTEGRATION_TESTS envvar? 7a72d56. Besides that we should be good to go.
Add a new receiver that reads kernel messages from /dev/kmsg and converts them to OpenTelemetry logs. This enables collection of kernel-level diagnostics on Linux systems. Features: - Async I/O using AsyncFd for efficient event-based reading - Priority filtering by syslog level (0-7) - Configurable batching (size and timeout) - Ring buffer overflow recovery (EPIPE handling) - Continuation message tracking via kmsg.continuation attribute - Proper severity mapping to OpenTelemetry levels
Implement offset persistence for the kmsg receiver to resume reading from where it left off after a Rotel restart. This prevents duplicate processing of kernel messages during normal service restarts while correctly handling system reboots. Key features: - Persist last-read sequence number to a JSON file using atomic writes - Use Linux boot_id to detect system reboots and invalidate stale state - Configurable checkpoint interval (default 5s, minimum 100ms) - Directory fsync on shutdown for durability - Automatic resume on restart: reads from ring buffer start and skips already-processed messages based on persisted sequence
Prevent silent infinite retry loops when /dev/kmsg becomes inaccessible after a successful start. Track consecutive read error duration and exit after a configurable threshold (default 60 seconds). Changes: - Add max_read_error_duration config option (default 60s) - Exit immediately with friendly message for permission denied (EACCES), suggesting CAP_SYSLOG capability - Exit immediately with friendly message for device not found (ENOENT), noting container/chroot environments - For other errors: track duration, warn while within threshold, exit when threshold exceeded - Log debug message when reads recover after previous failures
Replace conditional_wait with poll_pending to prevent dropping in-flight batches when cancellation wins the select! race. The old conditional_wait used .take() which removed the future from the Option before awaiting. If cancellation fired first, the pending send future was dropped and the batch lost. The new poll_pending polls the future in place without taking ownership. If cancellation wins, the future remains in the Option and shutdown code can complete it via complete_pending_send().
Move boot time calculation from convert_to_otlp_logs (called per batch) to BatchSender initialization (called once at receiver start). This eliminates repeated /proc/uptime reads during normal operation. The convert_to_otlp_logs function now accepts boot_time_ns as a parameter, allowing the caller to control when and how boot time is obtained.
Previously, the kmsg receiver checkpointed offsets immediately after reading messages, which could cause data loss if the process crashed before messages were exported. This change defers offset persistence until downstream exporters acknowledge successful export. Key changes: - Add KmsgOffsetTracker to track in-flight sequences using a BTreeSet, enabling O(1) lookup of the lowest pending sequence for safe checkpoint calculation - Add KmsgOffsetCommitter as a separate task that processes acks and persists offsets, surviving the main receiver loop to drain remaining acks on shutdown - Attach KmsgMetadata with sequence numbers and ack channel to outgoing messages - Change resume logic to skip sequences < persisted (was <=) to match new semantics where persisted sequence is the one to resume from The persistable sequence is now the lowest pending sequence (if any are in-flight) or hwm+1 (if all acknowledged), ensuring we never checkpoint past unacknowledged messages.
Replace the integration-tests feature flag with an environment variable for kmsg integration tests, matching the pattern used for Kafka tests. Changes: - Add KMSG_INTEGRATION_TESTS env var handling in build.rs - Update test file to use cfg(kmsg_integration_tests = "true") - Pass env var to Docker container in helper script
a95c584 to
bb101a8
Compare
Even better, added in a4ee9d1. |
Applied a similar change in bb101a8. I rebased the entire branch on top of latest changes from upstream |
Summary
Add a new kmsg receiver for collecting Linux kernel log messages from
/dev/kmsg. This enables rotel to ingest kernel-level diagnostics and forward them as OpenTelemetry logs.Features
/dev/kmsgusing async I/Okmsg.continuationattributeConfiguration
Part of #299