From 64f56af8d44350e4e1f4c70eda0e03cad317b920 Mon Sep 17 00:00:00 2001 From: shalper2 <99686388+shalper2@users.noreply.github.com> Date: Thu, 4 Apr 2024 10:49:18 -0500 Subject: [PATCH] [chore][receiver/snowflakereceiver] (#31260) **Description:** Refactored the scrape function in scrape.go to take better advantage of go's language features. Namely, utilizing go routines and channels to parallelize the individual metric scrapes which make up the overall scrape function. This greatly improves the performance of the receiver. --- receiver/snowflakereceiver/scraper.go | 89 +++++++++++++++++++-------- 1 file changed, 62 insertions(+), 27 deletions(-) diff --git a/receiver/snowflakereceiver/scraper.go b/receiver/snowflakereceiver/scraper.go index cb890dff73eae..4b0e3f5960b0f 100644 --- a/receiver/snowflakereceiver/scraper.go +++ b/receiver/snowflakereceiver/scraper.go @@ -5,6 +5,7 @@ package snowflakereceiver // import "github.com/open-telemetry/opentelemetry-col import ( "context" + "sync" "time" "go.opentelemetry.io/collector/component" @@ -48,34 +49,68 @@ func (s *snowflakeMetricsScraper) shutdown(_ context.Context) (err error) { return err } +func errorListener(eQueue <-chan error, eOut chan<- *scrapererror.ScrapeErrors) { + errs := &scrapererror.ScrapeErrors{} + + for err := range eQueue { + errs.Add(err) + } + + eOut <- errs +} + // wrapper for all of the sub-scraping tasks, implements the scraper interface for // snowflakeMetricsScraper func (s *snowflakeMetricsScraper) scrape(ctx context.Context) (pmetric.Metrics, error) { - errs := &scrapererror.ScrapeErrors{} + var wg sync.WaitGroup + var errs *scrapererror.ScrapeErrors - now := pcommon.NewTimestampFromTime(time.Now()) + metricScrapes := []func(context.Context, pcommon.Timestamp, chan<- error){ + s.scrapeBillingMetrics, + s.scrapeWarehouseBillingMetrics, + s.scrapeLoginMetrics, + s.scrapeHighLevelQueryMetrics, + s.scrapeDBMetrics, + s.scrapeSessionMetrics, + s.scrapeSnowpipeMetrics, + s.scrapeStorageMetrics, + } - // each client call has its own scrape function + errChan := make(chan error, len(metricScrapes)) + errOut := make(chan *scrapererror.ScrapeErrors) - s.scrapeBillingMetrics(ctx, now, *errs) - s.scrapeWarehouseBillingMetrics(ctx, now, *errs) - s.scrapeLoginMetrics(ctx, now, *errs) - s.scrapeHighLevelQueryMetrics(ctx, now, *errs) - s.scrapeDBMetrics(ctx, now, *errs) - s.scrapeSessionMetrics(ctx, now, *errs) - s.scrapeSnowpipeMetrics(ctx, now, *errs) - s.scrapeStorageMetrics(ctx, now, *errs) + go func() { + errorListener(errChan, errOut) + }() + now := pcommon.NewTimestampFromTime(time.Now()) + + for _, fn := range metricScrapes { + wg.Add(1) + go func( + fn func(context.Context, pcommon.Timestamp, chan<- error), + ctx context.Context, + now pcommon.Timestamp, + errs chan<- error, + ) { + defer wg.Done() + fn(ctx, now, errs) + }(fn, ctx, now, errChan) + } + + wg.Wait() + close(errChan) + errs = <-errOut rb := s.mb.NewResourceBuilder() rb.SetSnowflakeAccountName(s.conf.Account) return s.mb.Emit(metadata.WithResource(rb.Emit())), errs.Combine() } -func (s *snowflakeMetricsScraper) scrapeBillingMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) { +func (s *snowflakeMetricsScraper) scrapeBillingMetrics(ctx context.Context, t pcommon.Timestamp, errs chan<- error) { billingMetrics, err := s.client.FetchBillingMetrics(ctx) if err != nil { - errs.Add(err) + errs <- err return } @@ -86,11 +121,11 @@ func (s *snowflakeMetricsScraper) scrapeBillingMetrics(ctx context.Context, t pc } } -func (s *snowflakeMetricsScraper) scrapeWarehouseBillingMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) { +func (s *snowflakeMetricsScraper) scrapeWarehouseBillingMetrics(ctx context.Context, t pcommon.Timestamp, errs chan<- error) { warehouseBillingMetrics, err := s.client.FetchWarehouseBillingMetrics(ctx) if err != nil { - errs.Add(err) + errs <- err return } @@ -101,11 +136,11 @@ func (s *snowflakeMetricsScraper) scrapeWarehouseBillingMetrics(ctx context.Cont } } -func (s *snowflakeMetricsScraper) scrapeLoginMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) { +func (s *snowflakeMetricsScraper) scrapeLoginMetrics(ctx context.Context, t pcommon.Timestamp, errs chan<- error) { loginMetrics, err := s.client.FetchLoginMetrics(ctx) if err != nil { - errs.Add(err) + errs <- err return } @@ -114,11 +149,11 @@ func (s *snowflakeMetricsScraper) scrapeLoginMetrics(ctx context.Context, t pcom } } -func (s *snowflakeMetricsScraper) scrapeHighLevelQueryMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) { +func (s *snowflakeMetricsScraper) scrapeHighLevelQueryMetrics(ctx context.Context, t pcommon.Timestamp, errs chan<- error) { highLevelQueryMetrics, err := s.client.FetchHighLevelQueryMetrics(ctx) if err != nil { - errs.Add(err) + errs <- err return } @@ -130,11 +165,11 @@ func (s *snowflakeMetricsScraper) scrapeHighLevelQueryMetrics(ctx context.Contex } } -func (s *snowflakeMetricsScraper) scrapeDBMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) { +func (s *snowflakeMetricsScraper) scrapeDBMetrics(ctx context.Context, t pcommon.Timestamp, errs chan<- error) { DBMetrics, err := s.client.FetchDbMetrics(ctx) if err != nil { - errs.Add(err) + errs <- err return } @@ -161,11 +196,11 @@ func (s *snowflakeMetricsScraper) scrapeDBMetrics(ctx context.Context, t pcommon } } -func (s *snowflakeMetricsScraper) scrapeSessionMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) { +func (s *snowflakeMetricsScraper) scrapeSessionMetrics(ctx context.Context, t pcommon.Timestamp, errs chan<- error) { sessionMetrics, err := s.client.FetchSessionMetrics(ctx) if err != nil { - errs.Add(err) + errs <- err return } @@ -174,11 +209,11 @@ func (s *snowflakeMetricsScraper) scrapeSessionMetrics(ctx context.Context, t pc } } -func (s *snowflakeMetricsScraper) scrapeSnowpipeMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) { +func (s *snowflakeMetricsScraper) scrapeSnowpipeMetrics(ctx context.Context, t pcommon.Timestamp, errs chan<- error) { snowpipeMetrics, err := s.client.FetchSnowpipeMetrics(ctx) if err != nil { - errs.Add(err) + errs <- err return } @@ -187,11 +222,11 @@ func (s *snowflakeMetricsScraper) scrapeSnowpipeMetrics(ctx context.Context, t p } } -func (s *snowflakeMetricsScraper) scrapeStorageMetrics(ctx context.Context, t pcommon.Timestamp, errs scrapererror.ScrapeErrors) { +func (s *snowflakeMetricsScraper) scrapeStorageMetrics(ctx context.Context, t pcommon.Timestamp, errs chan<- error) { storageMetrics, err := s.client.FetchStorageMetrics(ctx) if err != nil { - errs.Add(err) + errs <- err return }