Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ability to configure collection interval per scraper #1947

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 2 additions & 20 deletions receiver/receiverhelper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package receiverhelper

import (
"context"
"time"

"go.opentelemetry.io/collector/consumer/pdata"
)
Expand All @@ -38,25 +37,11 @@ type Close func(ctx context.Context) error
// ScraperOption apply changes to internal options.
type ScraperOption func(*baseScraper)

// ScraperSettings defines common settings for a scraper configuration.
// Specific scrapers can embed this struct and extend it with more fields if needed.
type ScraperSettings struct {
CollectionIntervalVal time.Duration `mapstructure:"collection_interval"`
}

// CollectionInterval gets the scraper collection interval.
func (ss *ScraperSettings) CollectionInterval() time.Duration {
return ss.CollectionIntervalVal
}

type BaseScraper interface {
// Initialize performs any timely initialization tasks such as
// setting up performance counters for initial collection.
Initialize(ctx context.Context) error

// CollectionInterval gets the scraper collection interval.
CollectionInterval() time.Duration

// Close should clean up any unmanaged resources such as
// performance counter handles.
Close(ctx context.Context) error
Expand All @@ -77,7 +62,6 @@ type ResourceMetricsScraper interface {
var _ BaseScraper = (*baseScraper)(nil)

type baseScraper struct {
ScraperSettings
initialize Initialize
close Close
}
Expand Down Expand Up @@ -121,12 +105,11 @@ var _ MetricsScraper = (*metricsScraper)(nil)
// collection interval, reports observability information, and passes the
// scraped metrics to the next consumer.
func NewMetricsScraper(
cfg ScraperSettings,
scrape ScrapeMetrics,
options ...ScraperOption,
) MetricsScraper {
ms := &metricsScraper{
baseScraper: baseScraper{ScraperSettings: cfg},
baseScraper: baseScraper{},
ScrapeMetrics: scrape,
}

Expand All @@ -152,12 +135,11 @@ var _ ResourceMetricsScraper = (*resourceMetricsScraper)(nil)
// specified collection interval, reports observability information, and
// passes the scraped resource metrics to the next consumer.
func NewResourceMetricsScraper(
cfg ScraperSettings,
scrape ScrapeResourceMetrics,
options ...ScraperOption,
) ResourceMetricsScraper {
rms := &resourceMetricsScraper{
baseScraper: baseScraper{ScraperSettings: cfg},
baseScraper: baseScraper{},
ScrapeResourceMetrics: scrape,
}

Expand Down
171 changes: 96 additions & 75 deletions receiver/receiverhelper/scrapercontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package receiverhelper

import (
"context"
"errors"
"time"

"go.opentelemetry.io/collector/component"
Expand All @@ -25,96 +26,121 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
)

// ScraperControllerOption apply changes to internal options.
type ScraperControllerOption func(*scraperController)
// ScraperControllerConfig is the configuration of a scraper controller.
// Specific scrapers must implement this interface and will typically embed
// ScraperControllerSettings struct or a struct that extends it.
type ScraperControllerConfig interface {
CollectionInterval() time.Duration
james-bebbington marked this conversation as resolved.
Show resolved Hide resolved
}

// WithDefaultCollectionInterval overrides the default collection
// interval (1 minute) that will be applied to all scrapers if not
// overridden by the individual scraper.
func WithDefaultCollectionInterval(defaultCollectionInterval time.Duration) ScraperControllerOption {
return func(o *scraperController) {
o.defaultCollectionInterval = defaultCollectionInterval
}
// ScraperControllerSettings defines common settings for a scraper controller
// configuration. Scraper controller receivers can embed this struct, instead
// of configmodels.ReceiverSettings, and extend it with more fields if needed.
type ScraperControllerSettings struct {
configmodels.ReceiverSettings
CollectionIntervalVal time.Duration `mapstructure:"collection_interval"`
}

// DefaultScraperControllerSettings returns default scraper controller
// settings with a collection interval of one minute.
func DefaultScraperControllerSettings(receiverSettings configmodels.ReceiverSettings) *ScraperControllerSettings {
james-bebbington marked this conversation as resolved.
Show resolved Hide resolved
return &ScraperControllerSettings{ReceiverSettings: receiverSettings, CollectionIntervalVal: time.Minute}
}

// CollectionInterval gets the scraper controller collection interval.
func (scs *ScraperControllerSettings) CollectionInterval() time.Duration {
return scs.CollectionIntervalVal
}

// ScraperControllerOption apply changes to internal options.
type ScraperControllerOption func(*scraperController)

// AddMetricsScraper configures the provided scrape function to be called
// with the specified options, and at the specified collection interval
// (one minute by default).
// with the specified options, and at the specified collection interval.
//
// Observability information will be reported, and the scraped metrics
// will be passed to the next consumer.
func AddMetricsScraper(scraper MetricsScraper) ScraperControllerOption {
return func(o *scraperController) {
// TODO: Instead of creating one wrapper per MetricsScraper, combine all with same collection interval.
o.metricScrapers = append(o.metricScrapers, &multiMetricScraper{scrapers: []MetricsScraper{scraper}})
o.metricsScrapers.scrapers = append(o.metricsScrapers.scrapers, scraper)
}
}

// AddResourceMetricsScraper configures the provided scrape function to
// be called with the specified options, and at the specified collection
// interval (one minute by default).
// interval.
//
// Observability information will be reported, and the scraped resource
// metrics will be passed to the next consumer.
func AddResourceMetricsScraper(scrape ResourceMetricsScraper) ScraperControllerOption {
return func(o *scraperController) {
o.metricScrapers = append(o.metricScrapers, scrape)
o.resourceMetricScrapers = append(o.resourceMetricScrapers, scrape)
}
}

type scraperController struct {
defaultCollectionInterval time.Duration
nextConsumer consumer.MetricsConsumer
collectionInterval time.Duration
nextConsumer consumer.MetricsConsumer

metricScrapers []ResourceMetricsScraper
done chan struct{}
metricsScrapers *multiMetricScraper
resourceMetricScrapers []ResourceMetricsScraper
done chan struct{}
}

// NewScraperControllerReceiver creates a Receiver with the configured options, that can control multiple scrapers.
func NewScraperControllerReceiver(_ configmodels.Receiver, nextConsumer consumer.MetricsConsumer, options ...ScraperControllerOption) (component.Receiver, error) {
func NewScraperControllerReceiver(cfg ScraperControllerConfig, nextConsumer consumer.MetricsConsumer, options ...ScraperControllerOption) (component.Receiver, error) {
if nextConsumer == nil {
return nil, componenterror.ErrNilNextConsumer
}

mr := &scraperController{
defaultCollectionInterval: time.Minute,
nextConsumer: nextConsumer,
done: make(chan struct{}),
if cfg.CollectionInterval() <= 0 {
return nil, errors.New("collection_interval must be a positive duration")
}

sc := &scraperController{
collectionInterval: cfg.CollectionInterval(),
nextConsumer: nextConsumer,
metricsScrapers: &multiMetricScraper{},
done: make(chan struct{}),
}

for _, op := range options {
op(mr)
op(sc)
}

if len(sc.metricsScrapers.scrapers) > 0 {
sc.resourceMetricScrapers = append(sc.resourceMetricScrapers, sc.metricsScrapers)
}

return mr, nil
return sc, nil
}

// Start the receiver, invoked during service start.
func (mr *scraperController) Start(ctx context.Context, _ component.Host) error {
if err := mr.initializeScrapers(ctx); err != nil {
func (sc *scraperController) Start(ctx context.Context, _ component.Host) error {
if err := sc.initializeScrapers(ctx); err != nil {
return err
}

mr.startScraping()
sc.startScraping()
return nil
}

// Shutdown the receiver, invoked during service shutdown.
func (mr *scraperController) Shutdown(ctx context.Context) error {
mr.stopScraping()
func (sc *scraperController) Shutdown(ctx context.Context) error {
sc.stopScraping()

var errors []error

if err := mr.closeScrapers(ctx); err != nil {
if err := sc.closeScrapers(ctx); err != nil {
errors = append(errors, err)
}

return componenterror.CombineErrors(errors)
}

// initializeScrapers initializes all the scrapers
func (mr *scraperController) initializeScrapers(ctx context.Context) error {
for _, scraper := range mr.metricScrapers {
func (sc *scraperController) initializeScrapers(ctx context.Context) error {
for _, scraper := range sc.resourceMetricScrapers {
if err := scraper.Initialize(ctx); err != nil {
return err
}
Expand All @@ -125,60 +151,59 @@ func (mr *scraperController) initializeScrapers(ctx context.Context) error {

// startScraping initiates a ticker that calls Scrape based on the configured
// collection interval.
func (mr *scraperController) startScraping() {
// TODO: use one ticker for each set of scrapers that have the same collection interval.

for i := 0; i < len(mr.metricScrapers); i++ {
scraper := mr.metricScrapers[i]
go func() {
collectionInterval := mr.defaultCollectionInterval
if scraper.CollectionInterval() != 0 {
collectionInterval = scraper.CollectionInterval()
}

ticker := time.NewTicker(collectionInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
mr.scrapeMetricsAndReport(context.Background(), scraper)
case <-mr.done:
return
func (sc *scraperController) startScraping() {
go func() {
ticker := time.NewTicker(sc.collectionInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
for i := 0; i < len(sc.resourceMetricScrapers); i++ {
sc.scrapeMetricsAndReport(context.Background(), sc.resourceMetricScrapers[i])
}
case <-sc.done:
return
}
}()
}
}
}()
}

// scrapeMetricsAndReport calls the Scrape function of the provided Scraper, records observability information,
// and passes the scraped metrics to the next component.
func (mr *scraperController) scrapeMetricsAndReport(ctx context.Context, rms ResourceMetricsScraper) {
// TODO: Add observability metrics support
metrics, err := rms.Scrape(ctx)
if err != nil {
return
func (sc *scraperController) scrapeMetricsAndReport(ctx context.Context, rms ResourceMetricsScraper) {
// TODO: add observability metrics support
var errs []error

metrics := pdata.NewMetrics()

for i := 0; i < len(sc.resourceMetricScrapers); i++ {
resourceMetrics, err := rms.Scrape(ctx)
if err != nil {
errs = append(errs, err) // TODO: handle partial errors
continue
}

resourceMetrics.MoveAndAppendTo(metrics.ResourceMetrics())
}

mr.nextConsumer.ConsumeMetrics(ctx, resourceMetricsSliceToMetricData(metrics))
}
if len(errs) > 0 {
// TODO: report error
}

func resourceMetricsSliceToMetricData(resourceMetrics pdata.ResourceMetricsSlice) pdata.Metrics {
md := pdata.NewMetrics()
resourceMetrics.MoveAndAppendTo(md.ResourceMetrics())
return md
sc.nextConsumer.ConsumeMetrics(ctx, metrics)
}

// stopScraping stops the ticker
func (mr *scraperController) stopScraping() {
close(mr.done)
func (sc *scraperController) stopScraping() {
close(sc.done)
}

// closeScrapers closes all the scrapers
func (mr *scraperController) closeScrapers(ctx context.Context) error {
func (sc *scraperController) closeScrapers(ctx context.Context) error {
var errs []error

for _, scraper := range mr.metricScrapers {
for _, scraper := range sc.resourceMetricScrapers {
if err := scraper.Close(ctx); err != nil {
errs = append(errs, err)
}
Expand All @@ -202,10 +227,6 @@ func (mms *multiMetricScraper) Initialize(ctx context.Context) error {
return nil
}

func (mms *multiMetricScraper) CollectionInterval() time.Duration {
return mms.scrapers[0].CollectionInterval()
}

func (mms *multiMetricScraper) Close(ctx context.Context) error {
var errs []error
for _, scraper := range mms.scrapers {
Expand Down