Skip to content

Commit e82e597

Browse files
committed
feat: align collection timestamps in Prometheus metrics scraping
1 parent 43dd6c9 commit e82e597

File tree

2 files changed

+69
-7
lines changed

2 files changed

+69
-7
lines changed

cmd/start.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ func runAgent(cmd *cobra.Command, args []string) error {
116116
}()
117117

118118
// Main scraping loop
119+
// Use ticker for interval, but align collection timestamps to interval boundaries
119120
ticker := time.NewTicker(cfg.Agent.Interval)
120121
defer ticker.Stop()
121122

@@ -125,8 +126,9 @@ func runAgent(cmd *cobra.Command, args []string) error {
125126
logger.String("prometheus_endpoint", cfg.Prometheus.Endpoint),
126127
logger.String("server_endpoint", cfg.Server.Endpoint))
127128

128-
// Scrape immediately on start
129-
if err := scrapeAndSend(scraper, sender, cfg.Agent.ServerID); err != nil {
129+
// Scrape immediately on start with aligned timestamp
130+
collectionTime := time.Now().Truncate(cfg.Agent.Interval)
131+
if err := scrapeAndSendWithTimestamp(scraper, sender, cfg.Agent.ServerID, collectionTime); err != nil {
130132
logger.Error("Initial scrape failed", logger.Err(err))
131133
}
132134

@@ -135,30 +137,44 @@ func runAgent(cmd *cobra.Command, args []string) error {
135137
select {
136138
case <-ctx.Done():
137139
return nil
138-
case <-ticker.C:
139-
if err := scrapeAndSend(scraper, sender, cfg.Agent.ServerID); err != nil {
140+
case tickTime := <-ticker.C:
141+
// Align collection time to interval boundary
142+
collectionTime := tickTime.Truncate(cfg.Agent.Interval)
143+
if err := scrapeAndSendWithTimestamp(scraper, sender, cfg.Agent.ServerID, collectionTime); err != nil {
140144
logger.Error("Scrape failed", logger.Err(err))
141145
}
142146
}
143147
}
144148
}
145149

146-
func scrapeAndSend(scraper *prometheus.Scraper, sender *report.Sender, serverID string) error {
150+
// scrapeAndSendWithTimestamp scrapes metrics and adds aligned collection timestamp
151+
func scrapeAndSendWithTimestamp(scraper *prometheus.Scraper, sender *report.Sender, serverID string, collectionTime time.Time) error {
147152
// Scrape Prometheus exporter
148153
data, err := scraper.Scrape()
149154
if err != nil {
150155
return fmt.Errorf("failed to scrape prometheus: %w", err)
151156
}
152157

158+
// Add explicit timestamps to metrics (aligned to collection time)
159+
// This ensures all agents report metrics at the same logical time boundaries
160+
dataWithTimestamp := prometheus.AddTimestamps(data, collectionTime)
161+
153162
// Save to buffer (WAL pattern - actual sending happens in background)
154-
if err := sender.SendPrometheus(data, serverID); err != nil {
163+
if err := sender.SendPrometheus(dataWithTimestamp, serverID); err != nil {
155164
return fmt.Errorf("failed to buffer prometheus data: %w", err)
156165
}
157166

158-
logger.Debug("Prometheus data scraped and buffered", logger.Int("bytes", len(data)))
167+
logger.Debug("Prometheus data scraped and buffered",
168+
logger.Int("bytes", len(dataWithTimestamp)),
169+
logger.Time("collection_time", collectionTime))
159170
return nil
160171
}
161172

173+
// Legacy function kept for backwards compatibility
174+
func scrapeAndSend(scraper *prometheus.Scraper, sender *report.Sender, serverID string) error {
175+
return scrapeAndSendWithTimestamp(scraper, sender, serverID, time.Now().Truncate(5*time.Second))
176+
}
177+
162178
func runInBackground() error {
163179
// Check config exists
164180
if err := config.RequireConfig(cfgFile); err != nil {

internal/prometheus/scraper.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
package prometheus
22

33
import (
4+
"bufio"
5+
"bytes"
46
"fmt"
57
"io"
68
"net/http"
9+
"strings"
710
"time"
811

912
"github.com/node-pulse/agent/internal/logger"
@@ -68,3 +71,46 @@ func (s *Scraper) Verify() error {
6871
logger.Info("Prometheus exporter verified", logger.String("endpoint", s.config.Endpoint))
6972
return nil
7073
}
74+
75+
// AddTimestamps adds explicit timestamps to Prometheus text format metrics
76+
// This ensures all metrics are reported with aligned collection times
77+
// Example: node_cpu_seconds_total{cpu="0",mode="idle"} 123.45 → node_cpu_seconds_total{cpu="0",mode="idle"} 123.45 1730102400000
78+
func AddTimestamps(data []byte, collectionTime time.Time) []byte {
79+
timestampMs := collectionTime.UnixMilli()
80+
81+
var result bytes.Buffer
82+
scanner := bufio.NewScanner(bytes.NewReader(data))
83+
84+
for scanner.Scan() {
85+
line := scanner.Text()
86+
87+
// Skip empty lines, comments, and metadata lines
88+
if len(line) == 0 || line[0] == '#' {
89+
result.WriteString(line)
90+
result.WriteString("\n")
91+
continue
92+
}
93+
94+
// Parse metric line: metric_name{labels} value [timestamp]
95+
// If timestamp already exists, skip adding
96+
if strings.Contains(line, " ") {
97+
parts := strings.Fields(line)
98+
// If line has 3 parts (name, value, timestamp), timestamp already exists
99+
if len(parts) >= 3 {
100+
result.WriteString(line)
101+
result.WriteString("\n")
102+
continue
103+
}
104+
105+
// Add timestamp (line has name and value, but no timestamp)
106+
result.WriteString(line)
107+
result.WriteString(fmt.Sprintf(" %d\n", timestampMs))
108+
} else {
109+
// Invalid line format, keep as-is
110+
result.WriteString(line)
111+
result.WriteString("\n")
112+
}
113+
}
114+
115+
return result.Bytes()
116+
}

0 commit comments

Comments
 (0)