diff --git a/receiver/snowflakereceiver/scraper.go b/receiver/snowflakereceiver/scraper.go index cb890dff73eae..4b0e3f5960b0f 100644 --- a/receiver/snowflakereceiver/scraper.go +++ b/receiver/snowflakereceiver/scraper.go @@ -5,6 +5,7 @@ package snowflakereceiver // import "github.com/open-telemetry/opentelemetry-col import ( "context" + "sync" "time" "go.opentelemetry.io/collector/component" @@ -48,34 +49,68 @@ func (s *snowflakeMetricsScraper) shutdown(_ context.Context) (err error) { return err } +func errorListener(eQueue <-chan error, eOut chan<- *scrapererror.ScrapeErrors) { + errs := &scrapererror.ScrapeErrors{} + + for err := range eQueue { + errs.Add(err) + } + + eOut <- errs +} + // wrapper for all of the sub-scraping tasks, implements the scraper interface for // snowflakeMetricsScraper func (s *snowflakeMetricsScraper) scrape(ctx context.Context) (pmetric.Metrics, error) { - errs := &scrapererror.ScrapeErrors{} + var wg sync.WaitGroup + var errs *scrapererror.ScrapeErrors - now := pcommon.NewTimestampFromTime(time.Now()) + metricScrapes := []func(context.Context, pcommon.Timestamp, chan<- error){ + s.scrapeBillingMetrics, + s.scrapeWarehouseBillingMetrics, + s.scrapeLoginMetrics, + s.scrapeHighLevelQueryMetrics, + s.scrapeDBMetrics, + s.scrapeSessionMetrics, + s.scrapeSnowpipeMetrics, + s.scrapeStorageMetrics, + } - // each client call has its own scrape function + errChan := make(chan error, len(metricScrapes)) + errOut := make(chan *scrapererror.ScrapeErrors) - s.scrapeBillingMetrics(ctx, now, *errs) - s.scrapeWarehouseBillingMetrics(ctx, now, *errs) - s.scrapeLoginMetrics(ctx, now, *errs) - s.scrapeHighLevelQueryMetrics(ctx, now, *errs) - s.scrapeDBMetrics(ctx, now, *errs) - s.scrapeSessionMetrics(ctx, now, *errs) - s.scrapeSnowpipeMetrics(ctx, now, *errs) - s.scrapeStorageMetrics(ctx, now, *errs) + go func() { + errorListener(errChan, errOut) + }() + now := pcommon.NewTimestampFromTime(time.Now()) + + for _, fn := range metricScrapes { + wg.Add(1) + go func( + fn func(context.Context, pcommon.Timestamp, chan<- error), + ctx context.Context, + now pcommon.Timestamp, + errs chan<- error, + ) { + defer wg.Done() + fn(ctx, now, errs) + }(fn, ctx, now, errChan) + } + + wg.Wait() + close(errChan) + errs = <-errOut rb := s.mb.NewResourceBuilder() rb.SetSnowflakeAccountName(s.conf.Account) return s.mb.Emit(metadata.WithResource(rb.Emit())), errs.Combine() } -func (s *snowflakeMetricsScraper) scrapeBillingMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) { +func (s *snowflakeMetricsScraper) scrapeBillingMetrics(ctx context.Context, t pcommon.Timestamp, errs chan<- error) { billingMetrics, err := s.client.FetchBillingMetrics(ctx) if err != nil { - errs.Add(err) + errs <- err return } @@ -86,11 +121,11 @@ func (s *snowflakeMetricsScraper) scrapeBillingMetrics(ctx context.Context, t pc } } -func (s *snowflakeMetricsScraper) scrapeWarehouseBillingMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) { +func (s *snowflakeMetricsScraper) scrapeWarehouseBillingMetrics(ctx context.Context, t pcommon.Timestamp, errs chan<- error) { warehouseBillingMetrics, err := s.client.FetchWarehouseBillingMetrics(ctx) if err != nil { - errs.Add(err) + errs <- err return } @@ -101,11 +136,11 @@ func (s *snowflakeMetricsScraper) scrapeWarehouseBillingMetrics(ctx context.Cont } } -func (s *snowflakeMetricsScraper) scrapeLoginMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) { +func (s *snowflakeMetricsScraper) scrapeLoginMetrics(ctx context.Context, t pcommon.Timestamp, errs chan<- error) { loginMetrics, err := s.client.FetchLoginMetrics(ctx) if err != nil { - errs.Add(err) + errs <- err return } @@ -114,11 +149,11 @@ func (s *snowflakeMetricsScraper) scrapeLoginMetrics(ctx context.Context, t pcom } } -func (s *snowflakeMetricsScraper) scrapeHighLevelQueryMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) { +func (s *snowflakeMetricsScraper) scrapeHighLevelQueryMetrics(ctx context.Context, t pcommon.Timestamp, errs chan<- error) { highLevelQueryMetrics, err := s.client.FetchHighLevelQueryMetrics(ctx) if err != nil { - errs.Add(err) + errs <- err return } @@ -130,11 +165,11 @@ func (s *snowflakeMetricsScraper) scrapeHighLevelQueryMetrics(ctx context.Contex } } -func (s *snowflakeMetricsScraper) scrapeDBMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) { +func (s *snowflakeMetricsScraper) scrapeDBMetrics(ctx context.Context, t pcommon.Timestamp, errs chan<- error) { DBMetrics, err := s.client.FetchDbMetrics(ctx) if err != nil { - errs.Add(err) + errs <- err return } @@ -161,11 +196,11 @@ func (s *snowflakeMetricsScraper) scrapeDBMetrics(ctx context.Context, t pcommon } } -func (s *snowflakeMetricsScraper) scrapeSessionMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) { +func (s *snowflakeMetricsScraper) scrapeSessionMetrics(ctx context.Context, t pcommon.Timestamp, errs chan<- error) { sessionMetrics, err := s.client.FetchSessionMetrics(ctx) if err != nil { - errs.Add(err) + errs <- err return } @@ -174,11 +209,11 @@ func (s *snowflakeMetricsScraper) scrapeSessionMetrics(ctx context.Context, t pc } } -func (s *snowflakeMetricsScraper) scrapeSnowpipeMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) { +func (s *snowflakeMetricsScraper) scrapeSnowpipeMetrics(ctx context.Context, t pcommon.Timestamp, errs chan<- error) { snowpipeMetrics, err := s.client.FetchSnowpipeMetrics(ctx) if err != nil { - errs.Add(err) + errs <- err return } @@ -187,11 +222,11 @@ func (s *snowflakeMetricsScraper) scrapeSnowpipeMetrics(ctx context.Context, t p } } -func (s *snowflakeMetricsScraper) scrapeStorageMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) { +func (s *snowflakeMetricsScraper) scrapeStorageMetrics(ctx context.Context, t pcommon.Timestamp, errs chan<- error) { storageMetrics, err := s.client.FetchStorageMetrics(ctx) if err != nil { - errs.Add(err) + errs <- err return }