From 81485d8820a40ef687acb2f83691c5f39429e4c8 Mon Sep 17 00:00:00 2001 From: James Bebbington Date: Wed, 12 Aug 2020 14:00:03 +1000 Subject: [PATCH] Process scraper: Use same scrape time for all data points coming from same process --- .../scraper/processscraper/process_scraper.go | 45 ++++++++++--------- .../processscraper/process_scraper_linux.go | 14 +++--- .../processscraper/process_scraper_others.go | 2 +- .../processscraper/process_scraper_test.go | 14 +++++- .../processscraper/process_scraper_windows.go | 11 +++-- 5 files changed, 48 insertions(+), 38 deletions(-) diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go index 64a5a7b67ec..02c6a28d836 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/processor/filterset" + "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" ) // scraper for Process Metrics @@ -98,15 +99,17 @@ func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.ResourceMetricsSlice, ilms.Resize(1) metrics := ilms.At(0).Metrics() - if err = scrapeAndAppendCPUTimeMetric(metrics, s.startTime, md.handle); err != nil { + now := internal.TimeToUnixNano(time.Now()) + + if err = scrapeAndAppendCPUTimeMetric(metrics, s.startTime, now, md.handle); err != nil { errs = append(errs, errors.Wrapf(err, "error reading cpu times for process %q (pid %v)", md.executable.name, md.pid)) } - if err = scrapeAndAppendMemoryUsageMetrics(metrics, md.handle); err != nil { + if err = scrapeAndAppendMemoryUsageMetrics(metrics, now, md.handle); err != nil { errs = append(errs, errors.Wrapf(err, "error reading memory info for process %q (pid %v)", md.executable.name, md.pid)) } - if err = scrapeAndAppendDiskIOMetric(metrics, s.startTime, md.handle); err != nil { + if err = scrapeAndAppendDiskIOMetric(metrics, s.startTime, now, md.handle); err != nil { errs = append(errs, errors.Wrapf(err, "error reading disk usage for process %q (pid %v)", md.executable.name, md.pid)) } } @@ -166,7 +169,7 @@ func (s *scraper) getProcessMetadata() ([]*processMetadata, error) { return metadata, componenterror.CombineErrors(errs) } -func scrapeAndAppendCPUTimeMetric(metrics pdata.MetricSlice, startTime pdata.TimestampUnixNano, handle processHandle) error { +func scrapeAndAppendCPUTimeMetric(metrics pdata.MetricSlice, startTime, now pdata.TimestampUnixNano, handle processHandle) error { times, err := handle.Times() if err != nil { return err @@ -174,19 +177,19 @@ func scrapeAndAppendCPUTimeMetric(metrics pdata.MetricSlice, startTime pdata.Tim startIdx := metrics.Len() metrics.Resize(startIdx + 1) - initializeCPUTimeMetric(metrics.At(startIdx), startTime, times) + initializeCPUTimeMetric(metrics.At(startIdx), startTime, now, times) return nil } -func initializeCPUTimeMetric(metric pdata.Metric, startTime pdata.TimestampUnixNano, times *cpu.TimesStat) { +func initializeCPUTimeMetric(metric pdata.Metric, startTime, now pdata.TimestampUnixNano, times *cpu.TimesStat) { cpuTimeDescriptor.CopyTo(metric.MetricDescriptor()) ddps := metric.DoubleDataPoints() ddps.Resize(cpuStatesLen) - appendCPUTimeStateDataPoints(ddps, startTime, times) + appendCPUTimeStateDataPoints(ddps, startTime, now, times) } -func scrapeAndAppendMemoryUsageMetrics(metrics pdata.MetricSlice, handle processHandle) error { +func scrapeAndAppendMemoryUsageMetrics(metrics pdata.MetricSlice, now pdata.TimestampUnixNano, handle processHandle) error { mem, err := handle.MemoryInfo() if err != nil { return err @@ -194,25 +197,25 @@ func scrapeAndAppendMemoryUsageMetrics(metrics pdata.MetricSlice, handle process startIdx := metrics.Len() metrics.Resize(startIdx + 2) - initializeMemoryUsageMetric(metrics.At(startIdx+0), physicalMemoryUsageDescriptor, int64(mem.RSS)) - initializeMemoryUsageMetric(metrics.At(startIdx+1), virtualMemoryUsageDescriptor, int64(mem.VMS)) + initializeMemoryUsageMetric(metrics.At(startIdx+0), physicalMemoryUsageDescriptor, now, int64(mem.RSS)) + initializeMemoryUsageMetric(metrics.At(startIdx+1), virtualMemoryUsageDescriptor, now, int64(mem.VMS)) return nil } -func initializeMemoryUsageMetric(metric pdata.Metric, descriptor pdata.MetricDescriptor, usage int64) { +func initializeMemoryUsageMetric(metric pdata.Metric, descriptor pdata.MetricDescriptor, now pdata.TimestampUnixNano, usage int64) { descriptor.CopyTo(metric.MetricDescriptor()) idps := metric.Int64DataPoints() idps.Resize(1) - initializeMemoryUsageDataPoint(idps.At(0), usage) + initializeMemoryUsageDataPoint(idps.At(0), now, usage) } -func initializeMemoryUsageDataPoint(dataPoint pdata.Int64DataPoint, usage int64) { - dataPoint.SetTimestamp(pdata.TimestampUnixNano(uint64(time.Now().UnixNano()))) +func initializeMemoryUsageDataPoint(dataPoint pdata.Int64DataPoint, now pdata.TimestampUnixNano, usage int64) { + dataPoint.SetTimestamp(now) dataPoint.SetValue(usage) } -func scrapeAndAppendDiskIOMetric(metrics pdata.MetricSlice, startTime pdata.TimestampUnixNano, handle processHandle) error { +func scrapeAndAppendDiskIOMetric(metrics pdata.MetricSlice, startTime, now pdata.TimestampUnixNano, handle processHandle) error { io, err := handle.IOCounters() if err != nil { return err @@ -220,23 +223,23 @@ func scrapeAndAppendDiskIOMetric(metrics pdata.MetricSlice, startTime pdata.Time startIdx := metrics.Len() metrics.Resize(startIdx + 1) - initializeDiskIOMetric(metrics.At(startIdx), startTime, io) + initializeDiskIOMetric(metrics.At(startIdx), startTime, now, io) return nil } -func initializeDiskIOMetric(metric pdata.Metric, startTime pdata.TimestampUnixNano, io *process.IOCountersStat) { +func initializeDiskIOMetric(metric pdata.Metric, startTime, now pdata.TimestampUnixNano, io *process.IOCountersStat) { diskIODescriptor.CopyTo(metric.MetricDescriptor()) idps := metric.Int64DataPoints() idps.Resize(2) - initializeDiskIODataPoint(idps.At(0), startTime, int64(io.ReadBytes), readDirectionLabelValue) - initializeDiskIODataPoint(idps.At(1), startTime, int64(io.WriteBytes), writeDirectionLabelValue) + initializeDiskIODataPoint(idps.At(0), startTime, now, int64(io.ReadBytes), readDirectionLabelValue) + initializeDiskIODataPoint(idps.At(1), startTime, now, int64(io.WriteBytes), writeDirectionLabelValue) } -func initializeDiskIODataPoint(dataPoint pdata.Int64DataPoint, startTime pdata.TimestampUnixNano, value int64, directionLabel string) { +func initializeDiskIODataPoint(dataPoint pdata.Int64DataPoint, startTime, now pdata.TimestampUnixNano, value int64, directionLabel string) { labelsMap := dataPoint.LabelsMap() labelsMap.Insert(directionLabelName, directionLabel) dataPoint.SetStartTime(startTime) - dataPoint.SetTimestamp(pdata.TimestampUnixNano(uint64(time.Now().UnixNano()))) + dataPoint.SetTimestamp(now) dataPoint.SetValue(value) } diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_linux.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_linux.go index 180281c419b..36afef9b25a 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_linux.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_linux.go @@ -17,8 +17,6 @@ package processscraper import ( - "time" - "github.com/shirou/gopsutil/cpu" "go.opentelemetry.io/collector/consumer/pdata" @@ -26,17 +24,17 @@ import ( const cpuStatesLen = 3 -func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startTime pdata.TimestampUnixNano, cpuTime *cpu.TimesStat) { - initializeCPUTimeDataPoint(ddps.At(0), startTime, cpuTime.User, userStateLabelValue) - initializeCPUTimeDataPoint(ddps.At(1), startTime, cpuTime.System, systemStateLabelValue) - initializeCPUTimeDataPoint(ddps.At(2), startTime, cpuTime.Iowait, waitStateLabelValue) +func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startTime, now pdata.TimestampUnixNano, cpuTime *cpu.TimesStat) { + initializeCPUTimeDataPoint(ddps.At(0), startTime, now, cpuTime.User, userStateLabelValue) + initializeCPUTimeDataPoint(ddps.At(1), startTime, now, cpuTime.System, systemStateLabelValue) + initializeCPUTimeDataPoint(ddps.At(2), startTime, now, cpuTime.Iowait, waitStateLabelValue) } -func initializeCPUTimeDataPoint(dataPoint pdata.DoubleDataPoint, startTime pdata.TimestampUnixNano, value float64, stateLabel string) { +func initializeCPUTimeDataPoint(dataPoint pdata.DoubleDataPoint, startTime, now pdata.TimestampUnixNano, value float64, stateLabel string) { labelsMap := dataPoint.LabelsMap() labelsMap.Insert(stateLabelName, stateLabel) dataPoint.SetStartTime(startTime) - dataPoint.SetTimestamp(pdata.TimestampUnixNano(uint64(time.Now().UnixNano()))) + dataPoint.SetTimestamp(now) dataPoint.SetValue(value) } diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_others.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_others.go index 46eedad5325..bbb22705de1 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_others.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_others.go @@ -24,7 +24,7 @@ import ( const cpuStatesLen = 0 -func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startTime pdata.TimestampUnixNano, cpuTime *cpu.TimesStat) { +func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startTime, now pdata.TimestampUnixNano, cpuTime *cpu.TimesStat) { } func getProcessExecutable(proc processHandle) (*executableMetadata, error) { diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go index 28443838b91..f1ccee6c85c 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go @@ -69,14 +69,15 @@ func TestScrapeMetrics(t *testing.T) { } require.Greater(t, resourceMetrics.Len(), 1) - assertResourceAttributes(t, resourceMetrics) + assertProcessResourceAttributesExist(t, resourceMetrics) assertCPUTimeMetricValid(t, resourceMetrics, expectedStartTime) assertMemoryUsageMetricValid(t, physicalMemoryUsageDescriptor, resourceMetrics) assertMemoryUsageMetricValid(t, virtualMemoryUsageDescriptor, resourceMetrics) assertDiskIOMetricValid(t, resourceMetrics, expectedStartTime) + assertSameTimeStampForAllMetricsWithinResource(t, resourceMetrics) } -func assertResourceAttributes(t *testing.T, resourceMetrics pdata.ResourceMetricsSlice) { +func assertProcessResourceAttributesExist(t *testing.T, resourceMetrics pdata.ResourceMetricsSlice) { for i := 0; i < resourceMetrics.Len(); i++ { attr := resourceMetrics.At(0).Resource().Attributes() internal.AssertContainsAttribute(t, attr, conventions.AttributeProcessID) @@ -116,6 +117,15 @@ func assertDiskIOMetricValid(t *testing.T, resourceMetrics pdata.ResourceMetrics internal.AssertInt64MetricLabelHasValue(t, diskIOMetric, 1, directionLabelName, writeDirectionLabelValue) } +func assertSameTimeStampForAllMetricsWithinResource(t *testing.T, resourceMetrics pdata.ResourceMetricsSlice) { + for i := 0; i < resourceMetrics.Len(); i++ { + ilms := resourceMetrics.At(i).InstrumentationLibraryMetrics() + for j := 0; j < ilms.Len(); j++ { + internal.AssertSameTimeStampForAllMetrics(t, ilms.At(j).Metrics()) + } + } +} + func getMetric(t *testing.T, descriptor pdata.MetricDescriptor, rms pdata.ResourceMetricsSlice) pdata.Metric { for i := 0; i < rms.Len(); i++ { metrics := getMetricSlice(t, rms.At(i)) diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_windows.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_windows.go index 76f10b58171..1a00310a59a 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_windows.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_windows.go @@ -19,7 +19,6 @@ package processscraper import ( "path/filepath" "regexp" - "time" "github.com/shirou/gopsutil/cpu" @@ -28,16 +27,16 @@ import ( const cpuStatesLen = 2 -func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startTime pdata.TimestampUnixNano, cpuTime *cpu.TimesStat) { - initializeCPUTimeDataPoint(ddps.At(0), startTime, cpuTime.User, userStateLabelValue) - initializeCPUTimeDataPoint(ddps.At(1), startTime, cpuTime.System, systemStateLabelValue) +func appendCPUTimeStateDataPoints(ddps pdata.DoubleDataPointSlice, startTime, now pdata.TimestampUnixNano, cpuTime *cpu.TimesStat) { + initializeCPUTimeDataPoint(ddps.At(0), startTime, now, cpuTime.User, userStateLabelValue) + initializeCPUTimeDataPoint(ddps.At(1), startTime, now, cpuTime.System, systemStateLabelValue) } -func initializeCPUTimeDataPoint(dataPoint pdata.DoubleDataPoint, startTime pdata.TimestampUnixNano, value float64, stateLabel string) { +func initializeCPUTimeDataPoint(dataPoint pdata.DoubleDataPoint, startTime, now pdata.TimestampUnixNano, value float64, stateLabel string) { labelsMap := dataPoint.LabelsMap() labelsMap.Insert(stateLabelName, stateLabel) dataPoint.SetStartTime(startTime) - dataPoint.SetTimestamp(pdata.TimestampUnixNano(uint64(time.Now().UnixNano()))) + dataPoint.SetTimestamp(now) dataPoint.SetValue(value) }