Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ability to configure collection interval per scraper #1947

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 2 additions & 20 deletions receiver/receiverhelper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package receiverhelper

import (
"context"
"time"

"go.opentelemetry.io/collector/consumer/pdata"
)
Expand All @@ -38,25 +37,11 @@ type Close func(ctx context.Context) error
// ScraperOption apply changes to internal options.
type ScraperOption func(*baseScraper)

// ScraperSettings defines common settings for a scraper configuration.
// Specific scrapers can embed this struct and extend it with more fields if needed.
type ScraperSettings struct {
CollectionIntervalVal time.Duration `mapstructure:"collection_interval"`
}

// CollectionInterval gets the scraper collection interval.
func (ss *ScraperSettings) CollectionInterval() time.Duration {
return ss.CollectionIntervalVal
}

type BaseScraper interface {
// Initialize performs any timely initialization tasks such as
// setting up performance counters for initial collection.
Initialize(ctx context.Context) error

// CollectionInterval gets the scraper collection interval.
CollectionInterval() time.Duration

// Close should clean up any unmanaged resources such as
// performance counter handles.
Close(ctx context.Context) error
Expand All @@ -77,7 +62,6 @@ type ResourceMetricsScraper interface {
var _ BaseScraper = (*baseScraper)(nil)

type baseScraper struct {
ScraperSettings
initialize Initialize
close Close
}
Expand Down Expand Up @@ -121,12 +105,11 @@ var _ MetricsScraper = (*metricsScraper)(nil)
// collection interval, reports observability information, and passes the
// scraped metrics to the next consumer.
func NewMetricsScraper(
cfg ScraperSettings,
scrape ScrapeMetrics,
options ...ScraperOption,
) MetricsScraper {
ms := &metricsScraper{
baseScraper: baseScraper{ScraperSettings: cfg},
baseScraper: baseScraper{},
ScrapeMetrics: scrape,
}

Expand All @@ -152,12 +135,11 @@ var _ ResourceMetricsScraper = (*resourceMetricsScraper)(nil)
// specified collection interval, reports observability information, and
// passes the scraped resource metrics to the next consumer.
func NewResourceMetricsScraper(
cfg ScraperSettings,
scrape ScrapeResourceMetrics,
options ...ScraperOption,
) ResourceMetricsScraper {
rms := &resourceMetricsScraper{
baseScraper: baseScraper{ScraperSettings: cfg},
baseScraper: baseScraper{},
ScrapeResourceMetrics: scrape,
}

Expand Down
179 changes: 104 additions & 75 deletions receiver/receiverhelper/scrapercontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package receiverhelper

import (
"context"
"errors"
"time"

"go.opentelemetry.io/collector/component"
Expand All @@ -25,96 +26,126 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
)

// ScraperControllerOption apply changes to internal options.
type ScraperControllerOption func(*scraperController)
// ScraperControllerSettings defines common settings for a scraper controller
// configuration. Scraper controller receivers can embed this struct, instead
// of configmodels.ReceiverSettings, and extend it with more fields if needed.
type ScraperControllerSettings struct {
configmodels.ReceiverSettings `mapstructure:"squash"`
CollectionInterval time.Duration `mapstructure:"collection_interval"`
}

// WithDefaultCollectionInterval overrides the default collection
// interval (1 minute) that will be applied to all scrapers if not
// overridden by the individual scraper.
func WithDefaultCollectionInterval(defaultCollectionInterval time.Duration) ScraperControllerOption {
return func(o *scraperController) {
o.defaultCollectionInterval = defaultCollectionInterval
// DefaultScraperControllerSettings returns default scraper controller
// settings with a collection interval of one minute.
func DefaultScraperControllerSettings(cfgType configmodels.Type) ScraperControllerSettings {
return ScraperControllerSettings{
ReceiverSettings: configmodels.ReceiverSettings{
NameVal: string(cfgType),
TypeVal: cfgType,
},
CollectionInterval: time.Minute,
}
}

// ScraperControllerOption apply changes to internal options.
type ScraperControllerOption func(*scraperController)

// AddMetricsScraper configures the provided scrape function to be called
// with the specified options, and at the specified collection interval
// (one minute by default).
// with the specified options, and at the specified collection interval.
//
// Observability information will be reported, and the scraped metrics
// will be passed to the next consumer.
func AddMetricsScraper(scraper MetricsScraper) ScraperControllerOption {
return func(o *scraperController) {
// TODO: Instead of creating one wrapper per MetricsScraper, combine all with same collection interval.
o.metricScrapers = append(o.metricScrapers, &multiMetricScraper{scrapers: []MetricsScraper{scraper}})
o.metricsScrapers.scrapers = append(o.metricsScrapers.scrapers, scraper)
}
}

// AddResourceMetricsScraper configures the provided scrape function to
// be called with the specified options, and at the specified collection
// interval (one minute by default).
// interval.
//
// Observability information will be reported, and the scraped resource
// metrics will be passed to the next consumer.
func AddResourceMetricsScraper(scrape ResourceMetricsScraper) ScraperControllerOption {
return func(o *scraperController) {
o.metricScrapers = append(o.metricScrapers, scrape)
o.resourceMetricScrapers = append(o.resourceMetricScrapers, scrape)
}
}

// WithTickerChannel allows you to override the scraper controllers ticker
// channel to specify when scrape is called. This is only expected to be
// used by tests.
func WithTickerChannel(tickerCh <-chan time.Time) ScraperControllerOption {
return func(o *scraperController) {
o.tickerCh = tickerCh
}
}

type scraperController struct {
defaultCollectionInterval time.Duration
nextConsumer consumer.MetricsConsumer
collectionInterval time.Duration
nextConsumer consumer.MetricsConsumer

metricsScrapers *multiMetricScraper
resourceMetricScrapers []ResourceMetricsScraper

metricScrapers []ResourceMetricsScraper
done chan struct{}
tickerCh <-chan time.Time
done chan struct{}
}

// NewScraperControllerReceiver creates a Receiver with the configured options, that can control multiple scrapers.
func NewScraperControllerReceiver(_ configmodels.Receiver, nextConsumer consumer.MetricsConsumer, options ...ScraperControllerOption) (component.Receiver, error) {
func NewScraperControllerReceiver(cfg *ScraperControllerSettings, nextConsumer consumer.MetricsConsumer, options ...ScraperControllerOption) (component.Receiver, error) {
if nextConsumer == nil {
return nil, componenterror.ErrNilNextConsumer
}

mr := &scraperController{
defaultCollectionInterval: time.Minute,
nextConsumer: nextConsumer,
done: make(chan struct{}),
if cfg.CollectionInterval <= 0 {
return nil, errors.New("collection_interval must be a positive duration")
}

sc := &scraperController{
collectionInterval: cfg.CollectionInterval,
nextConsumer: nextConsumer,
metricsScrapers: &multiMetricScraper{},
done: make(chan struct{}),
}

for _, op := range options {
op(mr)
op(sc)
}

if len(sc.metricsScrapers.scrapers) > 0 {
sc.resourceMetricScrapers = append(sc.resourceMetricScrapers, sc.metricsScrapers)
}

return mr, nil
return sc, nil
}

// Start the receiver, invoked during service start.
func (mr *scraperController) Start(ctx context.Context, _ component.Host) error {
if err := mr.initializeScrapers(ctx); err != nil {
func (sc *scraperController) Start(ctx context.Context, _ component.Host) error {
if err := sc.initializeScrapers(ctx); err != nil {
return err
}

mr.startScraping()
sc.startScraping()
return nil
}

// Shutdown the receiver, invoked during service shutdown.
func (mr *scraperController) Shutdown(ctx context.Context) error {
mr.stopScraping()
func (sc *scraperController) Shutdown(ctx context.Context) error {
sc.stopScraping()

var errors []error

if err := mr.closeScrapers(ctx); err != nil {
if err := sc.closeScrapers(ctx); err != nil {
errors = append(errors, err)
}

return componenterror.CombineErrors(errors)
}

// initializeScrapers initializes all the scrapers
func (mr *scraperController) initializeScrapers(ctx context.Context) error {
for _, scraper := range mr.metricScrapers {
func (sc *scraperController) initializeScrapers(ctx context.Context) error {
for _, scraper := range sc.resourceMetricScrapers {
if err := scraper.Initialize(ctx); err != nil {
return err
}
Expand All @@ -125,60 +156,62 @@ func (mr *scraperController) initializeScrapers(ctx context.Context) error {

// startScraping initiates a ticker that calls Scrape based on the configured
// collection interval.
func (mr *scraperController) startScraping() {
// TODO: use one ticker for each set of scrapers that have the same collection interval.

for i := 0; i < len(mr.metricScrapers); i++ {
scraper := mr.metricScrapers[i]
go func() {
collectionInterval := mr.defaultCollectionInterval
if scraper.CollectionInterval() != 0 {
collectionInterval = scraper.CollectionInterval()
}

ticker := time.NewTicker(collectionInterval)
func (sc *scraperController) startScraping() {
go func() {
if sc.tickerCh == nil {
ticker := time.NewTicker(sc.collectionInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
mr.scrapeMetricsAndReport(context.Background(), scraper)
case <-mr.done:
return
}
sc.tickerCh = ticker.C
}

for {
select {
case <-sc.tickerCh:
sc.scrapeMetricsAndReport(context.Background())
case <-sc.done:
return
}
}()
}
}
}()
}

// scrapeMetricsAndReport calls the Scrape function of the provided Scraper, records observability information,
// and passes the scraped metrics to the next component.
func (mr *scraperController) scrapeMetricsAndReport(ctx context.Context, rms ResourceMetricsScraper) {
// TODO: Add observability metrics support
metrics, err := rms.Scrape(ctx)
if err != nil {
return
// scrapeMetricsAndReport calls the Scrape function for each of the configured
// Scrapers, records observability information, and passes the scraped metrics
// to the next component.
func (sc *scraperController) scrapeMetricsAndReport(ctx context.Context) {
// TODO: add observability metrics support
var errs []error

metrics := pdata.NewMetrics()

for _, rms := range sc.resourceMetricScrapers {
resourceMetrics, err := rms.Scrape(ctx)
if err != nil {
errs = append(errs, err) // TODO: handle partial errors
}

resourceMetrics.MoveAndAppendTo(metrics.ResourceMetrics())
}

mr.nextConsumer.ConsumeMetrics(ctx, resourceMetricsSliceToMetricData(metrics))
}
// TODO: report error
if len(errs) > 0 {
componenterror.CombineErrors(errs)
}

func resourceMetricsSliceToMetricData(resourceMetrics pdata.ResourceMetricsSlice) pdata.Metrics {
md := pdata.NewMetrics()
resourceMetrics.MoveAndAppendTo(md.ResourceMetrics())
return md
sc.nextConsumer.ConsumeMetrics(ctx, metrics)
}

// stopScraping stops the ticker
func (mr *scraperController) stopScraping() {
close(mr.done)
func (sc *scraperController) stopScraping() {
close(sc.done)
}

// closeScrapers closes all the scrapers
func (mr *scraperController) closeScrapers(ctx context.Context) error {
func (sc *scraperController) closeScrapers(ctx context.Context) error {
var errs []error

for _, scraper := range mr.metricScrapers {
for _, scraper := range sc.resourceMetricScrapers {
if err := scraper.Close(ctx); err != nil {
errs = append(errs, err)
}
Expand All @@ -202,10 +235,6 @@ func (mms *multiMetricScraper) Initialize(ctx context.Context) error {
return nil
}

func (mms *multiMetricScraper) CollectionInterval() time.Duration {
return mms.scrapers[0].CollectionInterval()
}

func (mms *multiMetricScraper) Close(ctx context.Context) error {
var errs []error
for _, scraper := range mms.scrapers {
Expand Down