Skip to content

Commit

Permalink
Initial commit of host metrics disk scraper
Browse files Browse the repository at this point in the history
  • Loading branch information
james-bebbington committed May 6, 2020
1 parent 021607d commit 44e10bc
Show file tree
Hide file tree
Showing 9 changed files with 427 additions and 3 deletions.
1 change: 1 addition & 0 deletions receiver/hostmetricsreceiver/example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ receivers:
cpu:
report_per_cpu: true
memory:
disk:

exporters:
logging:
Expand Down
2 changes: 2 additions & 0 deletions receiver/hostmetricsreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal"
"github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal/scraper/cpuscraper"
"github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal/scraper/diskscraper"
"github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal/scraper/memoryscraper"
)

Expand All @@ -49,6 +50,7 @@ func NewFactory() *Factory {
return &Factory{
scraperFactories: map[string]internal.Factory{
cpuscraper.TypeStr: &cpuscraper.Factory{},
diskscraper.TypeStr: &diskscraper.Factory{},
memoryscraper.TypeStr: &memoryscraper.Factory{},
},
}
Expand Down
3 changes: 0 additions & 3 deletions receiver/hostmetricsreceiver/internal/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ type Scraper interface {

// Factory can create a Scraper.
type Factory interface {
// Type gets the type of the scraper created by this factory.
Type() string

// CreateDefaultConfig creates the default configuration for the Scraper.
CreateDefaultConfig() Config

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package diskscraper

import "github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal"

// Config relating to Disk Metric Scraper.
type Config struct {
internal.ConfigSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package diskscraper

import (
"github.com/open-telemetry/opentelemetry-collector/consumer/pdata"
)

// disk metric constants

const (
deviceLabelName = "device"
directionLabelName = "direction"
)

const (
receiveDirectionLabelValue = "receive"
transmitDirectionLabelValue = "transmit"
)

var metricDiskBytesDescriptor = createMetricDiskBytesDescriptor()

func createMetricDiskBytesDescriptor() pdata.MetricDescriptor {
descriptor := pdata.NewMetricDescriptor()
descriptor.InitEmpty()
descriptor.SetName("host/disk/bytes")
descriptor.SetDescription("Disk bytes transferred.")
descriptor.SetUnit("bytes")
descriptor.SetType(pdata.MetricTypeCounterInt64)
return descriptor
}

var metricDiskOpsDescriptor = createMetricDiskOpsDescriptor()

func createMetricDiskOpsDescriptor() pdata.MetricDescriptor {
descriptor := pdata.NewMetricDescriptor()
descriptor.InitEmpty()
descriptor.SetName("host/disk/ops")
descriptor.SetDescription("Disk operations count.")
descriptor.SetUnit("1")
descriptor.SetType(pdata.MetricTypeCounterInt64)
return descriptor
}

var metricDiskTimeDescriptor = createMetricDiskTimeDescriptor()

func createMetricDiskTimeDescriptor() pdata.MetricDescriptor {
descriptor := pdata.NewMetricDescriptor()
descriptor.InitEmpty()
descriptor.SetName("host/disk/time")
descriptor.SetDescription("Time spent in disk operations.")
descriptor.SetUnit("ms")
descriptor.SetType(pdata.MetricTypeGaugeInt64)
return descriptor
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package diskscraper

import (
"context"
"fmt"
"time"

"github.com/shirou/gopsutil/disk"
"github.com/shirou/gopsutil/host"
"go.opencensus.io/trace"

"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/consumer/pdata"
"github.com/open-telemetry/opentelemetry-collector/consumer/pdatautil"
"github.com/open-telemetry/opentelemetry-collector/internal/data"
"github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal"
)

// Scraper for Disk Metrics
type Scraper struct {
config *Config
consumer consumer.MetricsConsumer
startTime pdata.TimestampUnixNano
cancel context.CancelFunc
}

// NewDiskScraper creates a set of Disk related metrics
func NewDiskScraper(ctx context.Context, cfg *Config, consumer consumer.MetricsConsumer) (*Scraper, error) {
return &Scraper{config: cfg, consumer: consumer}, nil
}

// Start
func (s *Scraper) Start(ctx context.Context) error {
ctx, s.cancel = context.WithCancel(ctx)

bootTime, err := host.BootTime()
if err != nil {
return err
}

s.startTime = pdata.TimestampUnixNano(bootTime)

go func() {
ticker := time.NewTicker(s.config.CollectionInterval())
defer ticker.Stop()

for {
select {
case <-ticker.C:
s.scrapeMetrics(ctx)
case <-ctx.Done():
return
}
}
}()

return nil
}

// Close
func (s *Scraper) Close(ctx context.Context) error {
s.cancel()
return nil
}

func (s *Scraper) scrapeMetrics(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "diskscraper.scrapeMetrics")
defer span.End()

metricData := data.NewMetricData()
metrics := internal.InitializeMetricSlice(metricData)

err := s.scrapeAndAppendMetrics(metrics)
if err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeDataLoss, Message: fmt.Sprintf("Error(s) when scraping disk metrics: %v", err)})
return
}

if metrics.Len() > 0 {
err := s.consumer.ConsumeMetrics(ctx, pdatautil.MetricsFromInternalMetrics(metricData))
if err != nil {
span.SetStatus(trace.Status{Code: trace.StatusCodeDataLoss, Message: fmt.Sprintf("Unable to process metrics: %v", err)})
return
}
}
}

func (s *Scraper) scrapeAndAppendMetrics(metrics pdata.MetricSlice) error {
ioCounters, err := disk.IOCounters()
if err != nil {
return err
}

metric := internal.AddNewMetric(metrics)
initializeMetricDiskBytesFrom(ioCounters, s.startTime, metric)

metric = internal.AddNewMetric(metrics)
initializeMetricDiskOpsFrom(ioCounters, s.startTime, metric)

metric = internal.AddNewMetric(metrics)
initializeMetricDiskTimeFrom(ioCounters, s.startTime, metric)
return nil
}

func initializeMetricDiskBytesFrom(ioCounters map[string]disk.IOCountersStat, startTime pdata.TimestampUnixNano, metric pdata.Metric) {
metricDiskBytesDescriptor.CopyTo(metric.MetricDescriptor())

idps := metric.Int64DataPoints()
idps.Resize(2 * len(ioCounters))

idx := 0
for device, ioCounter := range ioCounters {
initializeDataPoint(idps.At(idx+0), startTime, device, receiveDirectionLabelValue, int64(ioCounter.ReadBytes))
initializeDataPoint(idps.At(idx+1), startTime, device, transmitDirectionLabelValue, int64(ioCounter.WriteBytes))
idx += 2
}
}

func initializeMetricDiskOpsFrom(ioCounters map[string]disk.IOCountersStat, startTime pdata.TimestampUnixNano, metric pdata.Metric) {
metricDiskOpsDescriptor.CopyTo(metric.MetricDescriptor())

idps := metric.Int64DataPoints()
idps.Resize(2 * len(ioCounters))

idx := 0
for device, ioCounter := range ioCounters {
initializeDataPoint(idps.At(idx+0), startTime, device, receiveDirectionLabelValue, int64(ioCounter.ReadCount))
initializeDataPoint(idps.At(idx+1), startTime, device, transmitDirectionLabelValue, int64(ioCounter.WriteCount))
idx += 2
}
}

func initializeMetricDiskTimeFrom(ioCounters map[string]disk.IOCountersStat, startTime pdata.TimestampUnixNano, metric pdata.Metric) {
metricDiskTimeDescriptor.CopyTo(metric.MetricDescriptor())

idps := metric.Int64DataPoints()
idps.Resize(2 * len(ioCounters))

idx := 0
for device, ioCounter := range ioCounters {
initializeDataPoint(idps.At(idx+0), startTime, device, receiveDirectionLabelValue, int64(ioCounter.ReadTime))
initializeDataPoint(idps.At(idx+1), startTime, device, transmitDirectionLabelValue, int64(ioCounter.WriteTime))
idx += 2
}
}

func initializeDataPoint(dataPoint pdata.Int64DataPoint, startTime pdata.TimestampUnixNano, deviceLabel string, directionLabel string, value int64) {
labelsMap := dataPoint.LabelsMap()
labelsMap.Insert(deviceLabelName, deviceLabel)
labelsMap.Insert(directionLabelName, directionLabel)
dataPoint.SetStartTime(startTime)
dataPoint.SetTimestamp(pdata.TimestampUnixNano(uint64(time.Now().UnixNano())))
dataPoint.SetValue(value)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package diskscraper

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector/consumer/pdata"
"github.com/open-telemetry/opentelemetry-collector/exporter/exportertest"
"github.com/open-telemetry/opentelemetry-collector/receiver/hostmetricsreceiver/internal"
)

type validationFn func(*testing.T, []pdata.Metrics)

func TestScrapeMetrics(t *testing.T) {
createScraperAndValidateScrapedMetrics(t, &Config{}, func(t *testing.T, got []pdata.Metrics) {
metrics := internal.AssertSingleMetricDataAndGetMetricsSlice(t, got)

// expect 3 metrics
assert.Equal(t, 3, metrics.Len())

// for disk byts metric, expect a receive & transmit datapoint for at least one drive
hostDiskBytesMetric := metrics.At(0)
internal.AssertDescriptorEqual(t, metricDiskBytesDescriptor, hostDiskBytesMetric.MetricDescriptor())
assert.GreaterOrEqual(t, hostDiskBytesMetric.Int64DataPoints().Len(), 2)
internal.AssertInt64MetricLabelHasValue(t, hostDiskBytesMetric, 0, directionLabelName, receiveDirectionLabelValue)
internal.AssertInt64MetricLabelHasValue(t, hostDiskBytesMetric, 1, directionLabelName, transmitDirectionLabelValue)

// for disk operations metric, expect a receive & transmit datapoint for at least one drive
hostDiskOpsMetric := metrics.At(1)
internal.AssertDescriptorEqual(t, metricDiskOpsDescriptor, hostDiskOpsMetric.MetricDescriptor())
assert.GreaterOrEqual(t, hostDiskOpsMetric.Int64DataPoints().Len(), 2)
internal.AssertInt64MetricLabelHasValue(t, hostDiskOpsMetric, 0, directionLabelName, receiveDirectionLabelValue)
internal.AssertInt64MetricLabelHasValue(t, hostDiskOpsMetric, 1, directionLabelName, transmitDirectionLabelValue)

// for disk time metric, expect a receive & transmit datapoint for at least one drive
hostDiskTimeMetric := metrics.At(2)
internal.AssertDescriptorEqual(t, metricDiskTimeDescriptor, hostDiskTimeMetric.MetricDescriptor())
assert.GreaterOrEqual(t, hostDiskTimeMetric.Int64DataPoints().Len(), 2)
internal.AssertInt64MetricLabelHasValue(t, hostDiskTimeMetric, 0, directionLabelName, receiveDirectionLabelValue)
internal.AssertInt64MetricLabelHasValue(t, hostDiskTimeMetric, 1, directionLabelName, transmitDirectionLabelValue)
})
}

func createScraperAndValidateScrapedMetrics(t *testing.T, config *Config, assertFn validationFn) {
config.SetCollectionInterval(5 * time.Millisecond)

sink := &exportertest.SinkMetricsExporter{}

scraper, err := NewDiskScraper(context.Background(), config, sink)
require.NoError(t, err, "Failed to create disk scraper: %v", err)

err = scraper.Start(context.Background())
require.NoError(t, err, "Failed to start disk scraper: %v", err)
defer func() { assert.NoError(t, scraper.Close(context.Background())) }()

require.Eventually(t, func() bool {
got := sink.AllMetrics()
if len(got) == 0 {
return false
}

assertFn(t, got)
return true
}, time.Second, 2*time.Millisecond, "No metrics were collected")
}

0 comments on commit 44e10bc

Please sign in to comment.