Skip to content

Commit

Permalink
Fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
james-bebbington committed Oct 13, 2020
1 parent 78b6fc6 commit 8a9fcf5
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 20 deletions.
48 changes: 31 additions & 17 deletions receiver/receiverhelper/scrapercontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ type ScraperControllerConfig interface {
// configuration. Scraper controller receivers can embed this struct, instead
// of configmodels.ReceiverSettings, and extend it with more fields if needed.
type ScraperControllerSettings struct {
configmodels.ReceiverSettings
CollectionIntervalVal time.Duration `mapstructure:"collection_interval"`
configmodels.ReceiverSettings `mapstructure:"squash"`
CollectionIntervalVal time.Duration `mapstructure:"collection_interval"`
}

// DefaultScraperControllerSettings returns default scraper controller
// settings with a collection interval of one minute.
func DefaultScraperControllerSettings(cfgType configmodels.Type) *ScraperControllerSettings {
return &ScraperControllerSettings{
func DefaultScraperControllerSettings(cfgType configmodels.Type) ScraperControllerSettings {
return ScraperControllerSettings{
ReceiverSettings: configmodels.ReceiverSettings{
NameVal: string(cfgType),
TypeVal: cfgType,
Expand Down Expand Up @@ -84,13 +84,24 @@ func AddResourceMetricsScraper(scrape ResourceMetricsScraper) ScraperControllerO
}
}

// WithTickerChannel allows you to override the scraper controllers ticker
// channel to specify when scrape is called. This is only expected to be
// used by tests.
func WithTickerChannel(tickerCh <-chan time.Time) ScraperControllerOption {
return func(o *scraperController) {
o.tickerCh = tickerCh
}
}

type scraperController struct {
collectionInterval time.Duration
nextConsumer consumer.MetricsConsumer

metricsScrapers *multiMetricScraper
resourceMetricScrapers []ResourceMetricsScraper
done chan struct{}

tickerCh <-chan time.Time
done chan struct{}
}

// NewScraperControllerReceiver creates a Receiver with the configured options, that can control multiple scrapers.
Expand Down Expand Up @@ -159,42 +170,45 @@ func (sc *scraperController) initializeScrapers(ctx context.Context) error {
// collection interval.
func (sc *scraperController) startScraping() {
go func() {
ticker := time.NewTicker(sc.collectionInterval)
defer ticker.Stop()
if sc.tickerCh == nil {
ticker := time.NewTicker(sc.collectionInterval)
defer ticker.Stop()

sc.tickerCh = ticker.C
}

for {
select {
case <-ticker.C:
for i := 0; i < len(sc.resourceMetricScrapers); i++ {
sc.scrapeMetricsAndReport(context.Background(), sc.resourceMetricScrapers[i])
}
case <-sc.tickerCh:
sc.scrapeMetricsAndReport(context.Background())
case <-sc.done:
return
}
}
}()
}

// scrapeMetricsAndReport calls the Scrape function of the provided Scraper, records observability information,
// and passes the scraped metrics to the next component.
func (sc *scraperController) scrapeMetricsAndReport(ctx context.Context, rms ResourceMetricsScraper) {
// scrapeMetricsAndReport calls the Scrape function for each of the configured
// Scrapers, records observability information, and passes the scraped metrics
// to the next component.
func (sc *scraperController) scrapeMetricsAndReport(ctx context.Context) {
// TODO: add observability metrics support
var errs []error

metrics := pdata.NewMetrics()

for i := 0; i < len(sc.resourceMetricScrapers); i++ {
for _, rms := range sc.resourceMetricScrapers {
resourceMetrics, err := rms.Scrape(ctx)
if err != nil {
errs = append(errs, err) // TODO: handle partial errors
continue
}

resourceMetrics.MoveAndAppendTo(metrics.ResourceMetrics())
}

// TODO: report error
if len(errs) > 0 {
// TODO: report error
componenterror.CombineErrors(errs)
}

sc.nextConsumer.ConsumeMetrics(ctx, metrics)
Expand Down
45 changes: 42 additions & 3 deletions receiver/receiverhelper/scrapercontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ type metricsTestCase struct {
closeErr error
}

func TestMetricReceiver(t *testing.T) {
func TestScrapeController(t *testing.T) {
testCases := []metricsTestCase{
{
name: "Standard",
Expand Down Expand Up @@ -191,7 +191,8 @@ func TestMetricReceiver(t *testing.T) {
if !test.nilNextConsumer {
nextConsumer = sink
}
cfg := DefaultScraperControllerSettings("")
defaultCfg := DefaultScraperControllerSettings("")
cfg := &defaultCfg
if test.scraperControllerSettings != nil {
cfg = test.scraperControllerSettings
}
Expand All @@ -215,7 +216,7 @@ func TestMetricReceiver(t *testing.T) {
// i.e. if test.scrapeErr != nil { ... }

// validate that scrape is called at least 5 times for each configured scraper
if test.expectScraped {
if test.expectScraped || test.scrapeErr != nil {
for _, ch := range scrapeMetricsChs {
require.Eventually(t, func() bool { return (<-ch) > 5 }, 500*time.Millisecond, time.Millisecond)
}
Expand Down Expand Up @@ -325,3 +326,41 @@ func singleResourceMetric() pdata.ResourceMetricsSlice {
singleMetric().MoveAndAppendTo(ilm.Metrics())
return rms
}

func TestSingleScrapePerTick(t *testing.T) {
scrapeMetricsCh := make(chan int, 10)
tsm := &testScrapeMetrics{ch: scrapeMetricsCh}

scrapeResourceMetricsCh := make(chan int, 10)
tsrm := &testScrapeResourceMetrics{ch: scrapeResourceMetricsCh}

defaultCfg := DefaultScraperControllerSettings("")
cfg := &defaultCfg

tickerCh := make(chan time.Time)

receiver, err := NewScraperControllerReceiver(
cfg,
&exportertest.SinkMetricsExporter{},
AddMetricsScraper(NewMetricsScraper(tsm.scrape)),
AddResourceMetricsScraper(NewResourceMetricsScraper(tsrm.scrape)),
WithTickerChannel(tickerCh),
)
require.NoError(t, err)

require.NoError(t, receiver.Start(context.Background(), componenttest.NewNopHost()))

tickerCh <- time.Now()

assert.Equal(t, 1, <-scrapeMetricsCh)
assert.Equal(t, 1, <-scrapeResourceMetricsCh)

select {
case <-scrapeMetricsCh:
assert.Fail(t, "Scrape was called more than once")
case <-scrapeResourceMetricsCh:
assert.Fail(t, "Scrape was called more than once")
case <-time.After(100 * time.Millisecond):
return
}
}

0 comments on commit 8a9fcf5

Please sign in to comment.