Skip to content

Commit

Permalink
Change scraperhelper to follow the recommended append model for pdata (
Browse files Browse the repository at this point in the history
…#4202)

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Oct 15, 2021
1 parent 028fc46 commit bd87fb6
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 209 deletions.
135 changes: 29 additions & 106 deletions receiver/scraperhelper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,38 @@ package scraperhelper // import "go.opentelemetry.io/collector/receiver/scraperh

import (
"context"
"errors"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/receiver/scrapererror"
)

// ScrapeMetrics scrapes metrics.
type ScrapeMetrics func(context.Context) (pdata.MetricSlice, error)
var errNilFunc = errors.New("nil scrape func")

// ScrapeResourceMetrics scrapes resource metrics.
type ScrapeResourceMetrics func(context.Context) (pdata.ResourceMetricsSlice, error)
// ScrapeFunc scrapes metrics.
type ScrapeFunc func(context.Context) (pdata.Metrics, error)

type baseSettings struct {
componentOptions []componenthelper.Option
func (sf ScrapeFunc) Scrape(ctx context.Context) (pdata.Metrics, error) {
return sf(ctx)
}

// ScraperOption apply changes to internal options.
type ScraperOption func(*baseSettings)

// Scraper is the base interface for scrapers.
type Scraper interface {
component.Component

// ID returns the scraper id.
ID() config.ComponentID
Scrape(context.Context, config.ComponentID, component.ReceiverCreateSettings) (pdata.Metrics, error)
Scrape(context.Context) (pdata.Metrics, error)
}

type baseScraper struct {
component.Component
id config.ComponentID
type baseSettings struct {
componentOptions []componenthelper.Option
}

func (b baseScraper) ID() config.ComponentID {
return b.id
}
// ScraperOption apply changes to internal options.
type ScraperOption func(*baseSettings)

// WithStart sets the function that will be called on startup.
func WithStart(start componenthelper.StartFunc) ScraperOption {
Expand All @@ -70,104 +63,34 @@ func WithShutdown(shutdown componenthelper.ShutdownFunc) ScraperOption {
}
}

type metricsScraper struct {
baseScraper
ScrapeMetrics
}

var _ Scraper = (*metricsScraper)(nil)

// NewMetricsScraper creates a Scraper that calls Scrape at the specified
// collection interval, reports observability information, and passes the
// scraped metrics to the next consumer.
func NewMetricsScraper(
name string,
scrape ScrapeMetrics,
options ...ScraperOption,
) Scraper {
set := &baseSettings{}
for _, op := range options {
op(set)
}

ms := &metricsScraper{
baseScraper: baseScraper{
Component: componenthelper.New(set.componentOptions...),
id: config.NewComponentID(config.Type(name)),
},
ScrapeMetrics: scrape,
}
var _ Scraper = (*baseScraper)(nil)

return ms
}

func (ms metricsScraper) Scrape(ctx context.Context, receiverID config.ComponentID, set component.ReceiverCreateSettings) (pdata.Metrics, error) {
scrp := obsreport.NewScraper(obsreport.ScraperSettings{
ReceiverID: receiverID,
Scraper: ms.ID(),
ReceiverCreateSettings: set,
})
ctx = scrp.StartMetricsOp(ctx)
metrics, err := ms.ScrapeMetrics(ctx)
count := 0
md := pdata.Metrics{}
if err == nil || scrapererror.IsPartialScrapeError(err) {
md = pdata.NewMetrics()
metrics.MoveAndAppendTo(md.ResourceMetrics().AppendEmpty().InstrumentationLibraryMetrics().AppendEmpty().Metrics())
count = md.MetricCount()
}
scrp.EndMetricsOp(ctx, count, err)
return md, err
type baseScraper struct {
component.Component
ScrapeFunc
id config.ComponentID
}

type resourceMetricsScraper struct {
baseScraper
ScrapeResourceMetrics
func (b *baseScraper) ID() config.ComponentID {
return b.id
}

var _ Scraper = (*resourceMetricsScraper)(nil)

// NewResourceMetricsScraper creates a Scraper that calls Scrape at the
// specified collection interval, reports observability information, and
// passes the scraped resource metrics to the next consumer.
func NewResourceMetricsScraper(
id config.ComponentID,
scrape ScrapeResourceMetrics,
options ...ScraperOption,
) Scraper {
// NewScraper creates a Scraper that calls Scrape at the specified collection interval,
// reports observability information, and passes the scraped metrics to the next consumer.
func NewScraper(name string, scrape ScrapeFunc, options ...ScraperOption) (Scraper, error) {
if scrape == nil {
return nil, errNilFunc
}
set := &baseSettings{}
for _, op := range options {
op(set)
}

rms := &resourceMetricsScraper{
baseScraper: baseScraper{
Component: componenthelper.New(set.componentOptions...),
id: id,
},
ScrapeResourceMetrics: scrape,
}

return rms
}

func (rms resourceMetricsScraper) Scrape(ctx context.Context, receiverID config.ComponentID, set component.ReceiverCreateSettings) (pdata.Metrics, error) {
scrp := obsreport.NewScraper(obsreport.ScraperSettings{
ReceiverID: receiverID,
Scraper: rms.ID(),
ReceiverCreateSettings: set,
})
ctx = scrp.StartMetricsOp(ctx)
resourceMetrics, err := rms.ScrapeResourceMetrics(ctx)

count := 0
md := pdata.Metrics{}
if err == nil || scrapererror.IsPartialScrapeError(err) {
md = pdata.NewMetrics()
resourceMetrics.MoveAndAppendTo(md.ResourceMetrics())
count = md.MetricCount()
ms := &baseScraper{
Component: componenthelper.New(set.componentOptions...),
ScrapeFunc: scrape,
id: config.NewComponentID(config.Type(name)),
}

scrp.EndMetricsOp(ctx, count, err)
return md, err
return ms, nil
}
12 changes: 10 additions & 2 deletions receiver/scraperhelper/scrapercontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,22 @@ func (sc *controller) scrapeMetricsAndReport(ctx context.Context) {
metrics := pdata.NewMetrics()

for _, scraper := range sc.scrapers {
md, err := scraper.Scrape(ctx, sc.id, sc.recvSettings)
scrp := obsreport.NewScraper(obsreport.ScraperSettings{
ReceiverID: sc.id,
Scraper: scraper.ID(),
ReceiverCreateSettings: sc.recvSettings,
})
ctx = scrp.StartMetricsOp(ctx)
md, err := scraper.Scrape(ctx)

if err != nil {
sc.logger.Error("Error scraping metrics", zap.Error(err), zap.Stringer("scraper", scraper.ID()))

if !scrapererror.IsPartialScrapeError(err) {
scrp.EndMetricsOp(ctx, 0, err)
continue
}
}
scrp.EndMetricsOp(ctx, md.MetricCount(), err)
md.ResourceMetrics().MoveAndAppendTo(metrics.ResourceMetrics())
}

Expand Down

0 comments on commit bd87fb6

Please sign in to comment.