Skip to content

Commit

Permalink
Remove ability to configure collection interval per scraper (#1947)
Browse files Browse the repository at this point in the history
* Remove ability to configure collection interval per scraper and combine (batch) scrapers

* Review comments

* Add test for invalid collection interval config

* Change DefaultScraperControllerSettings definition

* Fix bugs
  • Loading branch information
james-bebbington committed Oct 14, 2020
1 parent ec9d37d commit 015827f
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 128 deletions.
22 changes: 2 additions & 20 deletions receiver/receiverhelper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package receiverhelper

import (
"context"
"time"

"go.opentelemetry.io/collector/consumer/pdata"
)
Expand All @@ -38,25 +37,11 @@ type Close func(ctx context.Context) error
// ScraperOption apply changes to internal options.
type ScraperOption func(*baseScraper)

// ScraperSettings defines common settings for a scraper configuration.
// Specific scrapers can embed this struct and extend it with more fields if needed.
type ScraperSettings struct {
CollectionIntervalVal time.Duration `mapstructure:"collection_interval"`
}

// CollectionInterval gets the scraper collection interval.
func (ss *ScraperSettings) CollectionInterval() time.Duration {
return ss.CollectionIntervalVal
}

type BaseScraper interface {
// Initialize performs any timely initialization tasks such as
// setting up performance counters for initial collection.
Initialize(ctx context.Context) error

// CollectionInterval gets the scraper collection interval.
CollectionInterval() time.Duration

// Close should clean up any unmanaged resources such as
// performance counter handles.
Close(ctx context.Context) error
Expand All @@ -77,7 +62,6 @@ type ResourceMetricsScraper interface {
var _ BaseScraper = (*baseScraper)(nil)

type baseScraper struct {
ScraperSettings
initialize Initialize
close Close
}
Expand Down Expand Up @@ -121,12 +105,11 @@ var _ MetricsScraper = (*metricsScraper)(nil)
// collection interval, reports observability information, and passes the
// scraped metrics to the next consumer.
func NewMetricsScraper(
cfg ScraperSettings,
scrape ScrapeMetrics,
options ...ScraperOption,
) MetricsScraper {
ms := &metricsScraper{
baseScraper: baseScraper{ScraperSettings: cfg},
baseScraper: baseScraper{},
ScrapeMetrics: scrape,
}

Expand All @@ -152,12 +135,11 @@ var _ ResourceMetricsScraper = (*resourceMetricsScraper)(nil)
// specified collection interval, reports observability information, and
// passes the scraped resource metrics to the next consumer.
func NewResourceMetricsScraper(
cfg ScraperSettings,
scrape ScrapeResourceMetrics,
options ...ScraperOption,
) ResourceMetricsScraper {
rms := &resourceMetricsScraper{
baseScraper: baseScraper{ScraperSettings: cfg},
baseScraper: baseScraper{},
ScrapeResourceMetrics: scrape,
}

Expand Down
179 changes: 104 additions & 75 deletions receiver/receiverhelper/scrapercontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package receiverhelper

import (
"context"
"errors"
"time"

"go.opentelemetry.io/collector/component"
Expand All @@ -25,96 +26,126 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
)

// ScraperControllerOption apply changes to internal options.
type ScraperControllerOption func(*scraperController)
// ScraperControllerSettings defines common settings for a scraper controller
// configuration. Scraper controller receivers can embed this struct, instead
// of configmodels.ReceiverSettings, and extend it with more fields if needed.
type ScraperControllerSettings struct {
configmodels.ReceiverSettings `mapstructure:"squash"`
CollectionInterval time.Duration `mapstructure:"collection_interval"`
}

// WithDefaultCollectionInterval overrides the default collection
// interval (1 minute) that will be applied to all scrapers if not
// overridden by the individual scraper.
func WithDefaultCollectionInterval(defaultCollectionInterval time.Duration) ScraperControllerOption {
return func(o *scraperController) {
o.defaultCollectionInterval = defaultCollectionInterval
// DefaultScraperControllerSettings returns default scraper controller
// settings with a collection interval of one minute.
func DefaultScraperControllerSettings(cfgType configmodels.Type) ScraperControllerSettings {
return ScraperControllerSettings{
ReceiverSettings: configmodels.ReceiverSettings{
NameVal: string(cfgType),
TypeVal: cfgType,
},
CollectionInterval: time.Minute,
}
}

// ScraperControllerOption apply changes to internal options.
type ScraperControllerOption func(*scraperController)

// AddMetricsScraper configures the provided scrape function to be called
// with the specified options, and at the specified collection interval
// (one minute by default).
// with the specified options, and at the specified collection interval.
//
// Observability information will be reported, and the scraped metrics
// will be passed to the next consumer.
func AddMetricsScraper(scraper MetricsScraper) ScraperControllerOption {
return func(o *scraperController) {
// TODO: Instead of creating one wrapper per MetricsScraper, combine all with same collection interval.
o.metricScrapers = append(o.metricScrapers, &multiMetricScraper{scrapers: []MetricsScraper{scraper}})
o.metricsScrapers.scrapers = append(o.metricsScrapers.scrapers, scraper)
}
}

// AddResourceMetricsScraper configures the provided scrape function to
// be called with the specified options, and at the specified collection
// interval (one minute by default).
// interval.
//
// Observability information will be reported, and the scraped resource
// metrics will be passed to the next consumer.
func AddResourceMetricsScraper(scrape ResourceMetricsScraper) ScraperControllerOption {
return func(o *scraperController) {
o.metricScrapers = append(o.metricScrapers, scrape)
o.resourceMetricScrapers = append(o.resourceMetricScrapers, scrape)
}
}

// WithTickerChannel allows you to override the scraper controllers ticker
// channel to specify when scrape is called. This is only expected to be
// used by tests.
func WithTickerChannel(tickerCh <-chan time.Time) ScraperControllerOption {
return func(o *scraperController) {
o.tickerCh = tickerCh
}
}

type scraperController struct {
defaultCollectionInterval time.Duration
nextConsumer consumer.MetricsConsumer
collectionInterval time.Duration
nextConsumer consumer.MetricsConsumer

metricsScrapers *multiMetricScraper
resourceMetricScrapers []ResourceMetricsScraper

metricScrapers []ResourceMetricsScraper
done chan struct{}
tickerCh <-chan time.Time
done chan struct{}
}

// NewScraperControllerReceiver creates a Receiver with the configured options, that can control multiple scrapers.
func NewScraperControllerReceiver(_ configmodels.Receiver, nextConsumer consumer.MetricsConsumer, options ...ScraperControllerOption) (component.Receiver, error) {
func NewScraperControllerReceiver(cfg *ScraperControllerSettings, nextConsumer consumer.MetricsConsumer, options ...ScraperControllerOption) (component.Receiver, error) {
if nextConsumer == nil {
return nil, componenterror.ErrNilNextConsumer
}

mr := &scraperController{
defaultCollectionInterval: time.Minute,
nextConsumer: nextConsumer,
done: make(chan struct{}),
if cfg.CollectionInterval <= 0 {
return nil, errors.New("collection_interval must be a positive duration")
}

sc := &scraperController{
collectionInterval: cfg.CollectionInterval,
nextConsumer: nextConsumer,
metricsScrapers: &multiMetricScraper{},
done: make(chan struct{}),
}

for _, op := range options {
op(mr)
op(sc)
}

if len(sc.metricsScrapers.scrapers) > 0 {
sc.resourceMetricScrapers = append(sc.resourceMetricScrapers, sc.metricsScrapers)
}

return mr, nil
return sc, nil
}

// Start the receiver, invoked during service start.
func (mr *scraperController) Start(ctx context.Context, _ component.Host) error {
if err := mr.initializeScrapers(ctx); err != nil {
func (sc *scraperController) Start(ctx context.Context, _ component.Host) error {
if err := sc.initializeScrapers(ctx); err != nil {
return err
}

mr.startScraping()
sc.startScraping()
return nil
}

// Shutdown the receiver, invoked during service shutdown.
func (mr *scraperController) Shutdown(ctx context.Context) error {
mr.stopScraping()
func (sc *scraperController) Shutdown(ctx context.Context) error {
sc.stopScraping()

var errors []error

if err := mr.closeScrapers(ctx); err != nil {
if err := sc.closeScrapers(ctx); err != nil {
errors = append(errors, err)
}

return componenterror.CombineErrors(errors)
}

// initializeScrapers initializes all the scrapers
func (mr *scraperController) initializeScrapers(ctx context.Context) error {
for _, scraper := range mr.metricScrapers {
func (sc *scraperController) initializeScrapers(ctx context.Context) error {
for _, scraper := range sc.resourceMetricScrapers {
if err := scraper.Initialize(ctx); err != nil {
return err
}
Expand All @@ -125,60 +156,62 @@ func (mr *scraperController) initializeScrapers(ctx context.Context) error {

// startScraping initiates a ticker that calls Scrape based on the configured
// collection interval.
func (mr *scraperController) startScraping() {
// TODO: use one ticker for each set of scrapers that have the same collection interval.

for i := 0; i < len(mr.metricScrapers); i++ {
scraper := mr.metricScrapers[i]
go func() {
collectionInterval := mr.defaultCollectionInterval
if scraper.CollectionInterval() != 0 {
collectionInterval = scraper.CollectionInterval()
}

ticker := time.NewTicker(collectionInterval)
func (sc *scraperController) startScraping() {
go func() {
if sc.tickerCh == nil {
ticker := time.NewTicker(sc.collectionInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
mr.scrapeMetricsAndReport(context.Background(), scraper)
case <-mr.done:
return
}
sc.tickerCh = ticker.C
}

for {
select {
case <-sc.tickerCh:
sc.scrapeMetricsAndReport(context.Background())
case <-sc.done:
return
}
}()
}
}
}()
}

// scrapeMetricsAndReport calls the Scrape function of the provided Scraper, records observability information,
// and passes the scraped metrics to the next component.
func (mr *scraperController) scrapeMetricsAndReport(ctx context.Context, rms ResourceMetricsScraper) {
// TODO: Add observability metrics support
metrics, err := rms.Scrape(ctx)
if err != nil {
return
// scrapeMetricsAndReport calls the Scrape function for each of the configured
// Scrapers, records observability information, and passes the scraped metrics
// to the next component.
func (sc *scraperController) scrapeMetricsAndReport(ctx context.Context) {
// TODO: add observability metrics support
var errs []error

metrics := pdata.NewMetrics()

for _, rms := range sc.resourceMetricScrapers {
resourceMetrics, err := rms.Scrape(ctx)
if err != nil {
errs = append(errs, err) // TODO: handle partial errors
}

resourceMetrics.MoveAndAppendTo(metrics.ResourceMetrics())
}

mr.nextConsumer.ConsumeMetrics(ctx, resourceMetricsSliceToMetricData(metrics))
}
// TODO: report error
if len(errs) > 0 {
componenterror.CombineErrors(errs)
}

func resourceMetricsSliceToMetricData(resourceMetrics pdata.ResourceMetricsSlice) pdata.Metrics {
md := pdata.NewMetrics()
resourceMetrics.MoveAndAppendTo(md.ResourceMetrics())
return md
sc.nextConsumer.ConsumeMetrics(ctx, metrics)
}

// stopScraping stops the ticker
func (mr *scraperController) stopScraping() {
close(mr.done)
func (sc *scraperController) stopScraping() {
close(sc.done)
}

// closeScrapers closes all the scrapers
func (mr *scraperController) closeScrapers(ctx context.Context) error {
func (sc *scraperController) closeScrapers(ctx context.Context) error {
var errs []error

for _, scraper := range mr.metricScrapers {
for _, scraper := range sc.resourceMetricScrapers {
if err := scraper.Close(ctx); err != nil {
errs = append(errs, err)
}
Expand All @@ -202,10 +235,6 @@ func (mms *multiMetricScraper) Initialize(ctx context.Context) error {
return nil
}

func (mms *multiMetricScraper) CollectionInterval() time.Duration {
return mms.scrapers[0].CollectionInterval()
}

func (mms *multiMetricScraper) Close(ctx context.Context) error {
var errs []error
for _, scraper := range mms.scrapers {
Expand Down

0 comments on commit 015827f

Please sign in to comment.