diff --git a/receiver/receiverhelper/receiver.go b/receiver/receiverhelper/receiver.go new file mode 100644 index 00000000000..ad0b3881aec --- /dev/null +++ b/receiver/receiverhelper/receiver.go @@ -0,0 +1,265 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receiverhelper + +import ( + "context" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer" +) + +// Start specifies the function invoked when the receiver is being started. +type Start func(context.Context, component.Host) error + +// Shutdown specifies the function invoked when the receiver is being shutdown. +type Shutdown func(context.Context) error + +// Option apply changes to internal options. +type Option func(*baseReceiver) + +// WithStart overrides the default Start function for a receiver. +// The default shutdown function does nothing and always returns nil. +func WithStart(start Start) Option { + return func(o *baseReceiver) { + o.start = start + } +} + +// WithShutdown overrides the default Shutdown function for a receiver. +// The default shutdown function does nothing and always returns nil. +func WithShutdown(shutdown Shutdown) Option { + return func(o *baseReceiver) { + o.shutdown = shutdown + } +} + +type baseReceiver struct { + fullName string + start Start + shutdown Shutdown +} + +// Construct the internalOptions from multiple Option. +func newBaseReceiver(fullName string, options ...Option) baseReceiver { + br := baseReceiver{fullName: fullName} + + for _, op := range options { + op(&br) + } + + return br +} + +// Start the receiver, invoked during service start. +func (br *baseReceiver) Start(ctx context.Context, host component.Host) error { + if br.start != nil { + return br.start(ctx, host) + } + return nil +} + +// Shutdown the receiver, invoked during service shutdown. +func (br *baseReceiver) Shutdown(ctx context.Context) error { + if br.shutdown != nil { + return br.shutdown(ctx) + } + return nil +} + +// MetricOption apply changes to internal options. +type MetricOption func(*metricsReceiver) + +// WithBaseOptions applies any base options to a metrics receiver. +func WithBaseOptions(options ...Option) MetricOption { + return func(o *metricsReceiver) { + for _, option := range options { + option(&o.baseReceiver) + } + } +} + +// WithDefaultCollectionInterval overrides the default collection +// interval (1 minute) that will be applied to all scrapers if not +// overridden by the individual scraper. +func WithDefaultCollectionInterval(defaultCollectionInterval time.Duration) MetricOption { + return func(o *metricsReceiver) { + o.defaultCollectionInterval = defaultCollectionInterval + } +} + +// AddScraper configures the provided scrape function to be called with +// the specified options, and at the specified collection interval (one +// minute by default). +// +// Observability information will be reported, and the scraped metrics +// will be passed to the next consumer. +func AddScraper(cfg ScraperConfig, scrape Scrape, options ...ScraperOption) MetricOption { + return func(o *metricsReceiver) { + o.scrapers = append(o.scrapers, newScraper(cfg, scrape, options...)) + } +} + +type metricsReceiver struct { + baseReceiver + defaultCollectionInterval time.Duration + nextConsumer consumer.MetricsConsumer + + scrapers []*scraper + done chan struct{} +} + +// NewMetricReceiver creates a Receiver with the configured options. +func NewMetricReceiver(config configmodels.Receiver, nextConsumer consumer.MetricsConsumer, options ...MetricOption) (component.Receiver, error) { + if nextConsumer == nil { + return nil, componenterror.ErrNilNextConsumer + } + + mr := &metricsReceiver{ + baseReceiver: newBaseReceiver(config.Name()), + defaultCollectionInterval: time.Minute, + nextConsumer: nextConsumer, + done: make(chan struct{}), + } + + for _, op := range options { + op(mr) + } + + // wrap the start function with a call to initialize scrapers + // and start scraping + start := mr.start + mr.start = func(ctx context.Context, host component.Host) error { + if start != nil { + if err := start(ctx, host); err != nil { + return err + } + } + + if err := mr.initializeScrapers(ctx); err != nil { + return err + } + + mr.startScraping() + return nil + } + + // wrap the shutdown function with a call to close scrapers + // and stop scraping + shutdown := mr.shutdown + mr.shutdown = func(ctx context.Context) error { + mr.stopScraping() + + var errors []error + + if err := mr.closeScrapers(ctx); err != nil { + errors = append(errors, err) + } + + if shutdown != nil { + if err := shutdown(ctx); err != nil { + errors = append(errors, err) + } + } + + return componenterror.CombineErrors(errors) + } + + return mr, nil +} + +// initializeScrapers initializes all the scrapers +func (mr *metricsReceiver) initializeScrapers(ctx context.Context) error { + for _, scraper := range mr.scrapers { + if scraper.initialize == nil { + continue + } + + if err := scraper.initialize(ctx); err != nil { + return err + } + } + + return nil +} + +// startScraping initiates a ticker that calls Scrape based on the configured +// collection interval. +func (mr *metricsReceiver) startScraping() { + // TODO1: use one ticker for each set of scrapers that have the same collection interval. + // TODO2: consider allowing different "Scrape" functions to be configured, i.e. functions + // that return MetricsSlice or ResourceMetricsSlice (similar to the existing Scraper + // & ResourceScraper interfaces in the host metrics receiver). That will allow data + // from multiple scrapers (that have the same collection interval) to be batched. + + for i := 0; i < len(mr.scrapers); i++ { + scraper := mr.scrapers[i] + go func() { + collectionInterval := mr.defaultCollectionInterval + if scraper.cfg.CollectionInterval() != 0 { + collectionInterval = scraper.cfg.CollectionInterval() + } + + ticker := time.NewTicker(collectionInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + mr.scrapeAndReport(context.Background(), scraper) + case <-mr.done: + return + } + } + }() + } +} + +// scrapeAndReport calls the Scrape function of the provided Scraper, records +// observability information, and passes the scraped metrics to the next component. +func (mr *metricsReceiver) scrapeAndReport(ctx context.Context, scraper *scraper) { + // TODO: Add observability metrics support + metrics, err := scraper.scrape(ctx) + if err != nil { + return + } + + mr.nextConsumer.ConsumeMetrics(ctx, metrics) +} + +// stopScraping stops the ticker +func (mr *metricsReceiver) stopScraping() { + close(mr.done) +} + +// closeScrapers closes all the scrapers +func (mr *metricsReceiver) closeScrapers(ctx context.Context) error { + var errors []error + + for _, scraper := range mr.scrapers { + if scraper.close == nil { + continue + } + + if err := scraper.close(ctx); err != nil { + errors = append(errors, err) + } + } + + return componenterror.CombineErrors(errors) +} diff --git a/receiver/receiverhelper/receiver_test.go b/receiver/receiverhelper/receiver_test.go new file mode 100644 index 00000000000..fdf8fd8e0cc --- /dev/null +++ b/receiver/receiverhelper/receiver_test.go @@ -0,0 +1,320 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receiverhelper + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/exporter/exportertest" +) + +type testStart struct { + ch chan bool + err error +} + +func (ts *testStart) start(context.Context, component.Host) error { + ts.ch <- true + return ts.err +} + +type testShutdown struct { + ch chan bool + err error +} + +func (ts *testShutdown) shutdown(context.Context) error { + ts.ch <- true + return ts.err +} + +type baseTestCase struct { + name string + start bool + shutdown bool + startErr error + shutdownErr error +} + +func TestBaseReceiver(t *testing.T) { + testCases := []baseTestCase{ + { + name: "Standard", + }, + { + name: "WithStartAndShutdown", + start: true, + shutdown: true, + }, + { + name: "WithStartAndShutdownErrors", + start: true, + shutdown: true, + startErr: errors.New("err1"), + shutdownErr: errors.New("err2"), + }, + } + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + startCh := make(chan bool, 1) + shutdownCh := make(chan bool, 1) + options := configureBaseOptions(test.start, test.shutdown, test.startErr, test.shutdownErr, startCh, shutdownCh) + + bp := newBaseReceiver("", options...) + + err := bp.Start(context.Background(), componenttest.NewNopHost()) + if test.startErr != nil { + assert.Equal(t, test.startErr, err) + } else { + assert.NoError(t, err) + if test.start { + assertChannelCalled(t, startCh, "start was not called") + } + } + + err = bp.Shutdown(context.Background()) + if test.shutdownErr != nil { + assert.Equal(t, test.shutdownErr, err) + } else { + assert.NoError(t, err) + if test.shutdown { + assertChannelCalled(t, shutdownCh, "shutdown was not called") + } + } + }) + } +} + +type testInitialize struct { + ch chan bool + err error +} + +func (ts *testInitialize) initialize(context.Context) error { + ts.ch <- true + return ts.err +} + +type testClose struct { + ch chan bool + err error +} + +func (ts *testClose) close(context.Context) error { + ts.ch <- true + return ts.err +} + +type testScrape struct { + ch chan int + timesScrapeCalled int +} + +func (ts *testScrape) scrape(ctx context.Context) (pdata.Metrics, error) { + ts.timesScrapeCalled++ + ts.ch <- ts.timesScrapeCalled + return pdata.NewMetrics(), nil +} + +type metricsTestCase struct { + name string + + start bool + shutdown bool + + scrapers int + defaultCollectionInterval time.Duration + scraperSettings ScraperSettings + nilNextConsumer bool + expectedNewErr string + expectScraped bool + + initialize bool + close bool + initializeErr error + closeErr error +} + +func TestMetricReceiver(t *testing.T) { + testCases := []metricsTestCase{ + { + name: "Standard", + }, + { + name: "WithBaseOptions", + start: true, + shutdown: true, + }, + { + name: "AddScrapersWithDefaultCollectionInterval", + scrapers: 2, + defaultCollectionInterval: time.Millisecond, + expectScraped: true, + }, + { + name: "AddScrapersWithCollectionInterval", + scrapers: 2, + scraperSettings: ScraperSettings{CollectionIntervalVal: time.Millisecond}, + expectScraped: true, + }, + { + name: "AddScrapers_NewError", + scrapers: 2, + nilNextConsumer: true, + expectedNewErr: "nil nextConsumer", + }, + { + name: "AddScrapersWithInitializeAndClose", + scrapers: 2, + initialize: true, + close: true, + }, + { + name: "AddScrapersWithInitializeAndCloseErrors", + scrapers: 2, + initialize: true, + close: true, + initializeErr: errors.New("err1"), + closeErr: errors.New("err2"), + }, + } + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + startCh := make(chan bool, 1) + shutdownCh := make(chan bool, 1) + baseOptions := configureBaseOptions(test.start, test.shutdown, nil, nil, startCh, shutdownCh) + + initializeChs := make([]chan bool, test.scrapers) + scrapeChs := make([]chan int, test.scrapers) + closeChs := make([]chan bool, test.scrapers) + options := configureMetricOptions(baseOptions, test, initializeChs, scrapeChs, closeChs) + + nextConsumer := exportertest.NewNopMetricsExporter() + if test.nilNextConsumer { + nextConsumer = nil + } + mr, err := NewMetricReceiver(&configmodels.ReceiverSettings{}, nextConsumer, options...) + if test.expectedNewErr != "" { + assert.EqualError(t, err, test.expectedNewErr) + return + } + require.NoError(t, err) + + err = mr.Start(context.Background(), componenttest.NewNopHost()) + if test.initializeErr != nil { + assert.Equal(t, test.initializeErr, err) + } else { + if test.start { + assertChannelCalled(t, startCh, "start was not called") + } + if test.initialize { + assertChannelsCalled(t, initializeChs, "initialize was not called") + } + } + + // validate that scrape is called at least 5 times for each configured scraper + if test.expectScraped { + for _, ch := range scrapeChs { + require.Eventually(t, func() bool { return (<-ch) > 5 }, 500*time.Millisecond, time.Millisecond) + } + } + + err = mr.Shutdown(context.Background()) + if test.closeErr != nil { + var errors []error + for i := 0; i < test.scrapers; i++ { + errors = append(errors, test.closeErr) + } + assert.EqualError(t, err, componenterror.CombineErrors(errors).Error()) + } else { + if test.shutdown { + assertChannelCalled(t, shutdownCh, "shutdown was not called") + } + if test.close { + assertChannelsCalled(t, closeChs, "clost was not called") + } + } + }) + } +} + +func configureBaseOptions(start, shutdown bool, startErr, shutdownErr error, startCh, shutdownCh chan bool) []Option { + baseOptions := []Option{} + if start { + ts := &testStart{ch: startCh, err: startErr} + baseOptions = append(baseOptions, WithStart(ts.start)) + } + if shutdown { + ts := &testShutdown{ch: shutdownCh, err: shutdownErr} + baseOptions = append(baseOptions, WithShutdown(ts.shutdown)) + } + return baseOptions +} + +func configureMetricOptions(baseOptions []Option, test metricsTestCase, initializeChs []chan bool, scrapeChs []chan int, closeChs []chan bool) []MetricOption { + metricOptions := []MetricOption{} + metricOptions = append(metricOptions, WithBaseOptions(baseOptions...)) + + if test.defaultCollectionInterval != 0 { + metricOptions = append(metricOptions, WithDefaultCollectionInterval(test.defaultCollectionInterval)) + } + + for i := 0; i < test.scrapers; i++ { + scraperOptions := []ScraperOption{} + if test.initialize { + initializeChs[i] = make(chan bool, 1) + ti := &testInitialize{ch: initializeChs[i], err: test.initializeErr} + scraperOptions = append(scraperOptions, WithInitialize(ti.initialize)) + } + if test.close { + closeChs[i] = make(chan bool, 1) + tc := &testClose{ch: closeChs[i], err: test.closeErr} + scraperOptions = append(scraperOptions, WithClose(tc.close)) + } + + scrapeChs[i] = make(chan int, 10) + ts := &testScrape{ch: scrapeChs[i]} + metricOptions = append(metricOptions, AddScraper(&test.scraperSettings, ts.scrape, scraperOptions...)) + } + + return metricOptions +} + +func assertChannelsCalled(t *testing.T, chs []chan bool, message string) { + for _, ic := range chs { + assertChannelCalled(t, ic, message) + } +} + +func assertChannelCalled(t *testing.T, ch chan bool, message string) { + select { + case <-ch: + default: + assert.Fail(t, message) + } +} diff --git a/receiver/receiverhelper/scraper.go b/receiver/receiverhelper/scraper.go new file mode 100644 index 00000000000..dee3e45e945 --- /dev/null +++ b/receiver/receiverhelper/scraper.go @@ -0,0 +1,97 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package receiverhelper + +import ( + "context" + "time" + + "go.opentelemetry.io/collector/consumer/pdata" +) + +// Scraper provides a function to scrape metrics. +type Scrape func(context.Context) (pdata.Metrics, error) + +// Initialize performs any timely initialization tasks such as +// setting up performance counters for initial collection. +type Initialize func(ctx context.Context) error + +// Close should clean up any unmanaged resources such as +// performance counter handles. +type Close func(ctx context.Context) error + +// ScraperOption apply changes to internal options. +type ScraperOption func(*scraper) + +// ScraperConfig is the configuration of a scraper. Specific scrapers must implement this +// interface and will typically embed ScraperSettings struct or a struct that extends it. +type ScraperConfig interface { + CollectionInterval() time.Duration + SetCollectionInterval(collectionInterval time.Duration) +} + +// ScraperSettings defines common settings for a scraper configuration. +// Specific scrapers can embed this struct and extend it with more fields if needed. +type ScraperSettings struct { + CollectionIntervalVal time.Duration `mapstructure:"collection_interval"` +} + +// CollectionInterval gets the scraper collection interval. +func (ss *ScraperSettings) CollectionInterval() time.Duration { + return ss.CollectionIntervalVal +} + +// SetCollectionInterval sets the scraper collection interval. +func (ss *ScraperSettings) SetCollectionInterval(collectionInterval time.Duration) { + ss.CollectionIntervalVal = collectionInterval +} + +type scraper struct { + cfg ScraperConfig + scrape Scrape + initialize Initialize + close Close +} + +// NewScraper creates a Scraper that calls Scrape at the specified collection +// interval, reports observability information, and passes the scraped metrics +// to the next consumer. +func newScraper( + cfg ScraperConfig, + scrape Scrape, + options ...ScraperOption, +) *scraper { + bs := &scraper{cfg: cfg, scrape: scrape} + + for _, op := range options { + op(bs) + } + + return bs +} + +// WithInitialize sets the function that will be called on startup. +func WithInitialize(initialize Initialize) ScraperOption { + return func(o *scraper) { + o.initialize = initialize + } +} + +// WithClose sets the function that will be called on shutdown. +func WithClose(close Close) ScraperOption { + return func(o *scraper) { + o.close = close + } +}