Skip to content

Commit

Permalink
feat: cache last scrape results
Browse files Browse the repository at this point in the history
  • Loading branch information
eli-jordan committed Mar 11, 2020
1 parent e5659ac commit dea3b3b
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 36 deletions.
124 changes: 99 additions & 25 deletions collectors/monitoring_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,59 @@ type MonitoringCollector struct {
lastScrapeDurationSecondsMetric prometheus.Gauge
collectorFillMissingLabels bool
monitoringDropDelegatedProjects bool

cache *CollectionCache
}

type CollectionCache struct {
// This map holds the read-only result of a collection run
// It will be served from the promethus scrape endpoint until the next
// collection is complete.
cachedTimeSeries map[string]*TimeSeriesMetrics

// This map holds the (potentially incomplete) metrics that have been collected.
// Once completed it will replace the `cachedTimeSeries` and will start being served.
activeTimeSeries map[string]*TimeSeriesMetrics

// Indicates whether there is a collection currently running, and populating `activeTimeSeries`
// at the moment.
collectionActive bool

// Guards `activeTimeSeries` and `collectionActive`
mu sync.Mutex
}

// Update the cache state to indicate that a collection has started
func (c *CollectionCache) markCollectionStarted() {
log.Debugf("markCollectionStarted")
c.mu.Lock()
c.collectionActive = true
c.mu.Unlock()
}

// Update the cache state to indicate that a collection has completed
func (c *CollectionCache) markCollectionCompleted() {
log.Debugf("markCollectionCompleted")
c.mu.Lock()
defer c.mu.Unlock()
collected := c.activeTimeSeries
c.cachedTimeSeries = collected
c.activeTimeSeries = make(map[string]*TimeSeriesMetrics)
c.collectionActive = false
}

// Check if there is a collection running int he background
func (c *CollectionCache) isCollectionActive() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.collectionActive
}

// During a collection, this func should be used to save the collected data
func (c *CollectionCache) putMetric(metricType string, timeSeries *TimeSeriesMetrics) {
c.mu.Lock()
c.activeTimeSeries[metricType] = timeSeries
c.mu.Unlock()
}

func NewMonitoringCollector(
Expand Down Expand Up @@ -114,6 +167,11 @@ func NewMonitoringCollector(
lastScrapeDurationSecondsMetric: lastScrapeDurationSecondsMetric,
collectorFillMissingLabels: collectorFillMissingLabels,
monitoringDropDelegatedProjects: monitoringDropDelegatedProjects,
cache: &CollectionCache{
cachedTimeSeries: make(map[string]*TimeSeriesMetrics),
activeTimeSeries: make(map[string]*TimeSeriesMetrics),
collectionActive: false,
},
}

return monitoringCollector, nil
Expand All @@ -129,32 +187,41 @@ func (c *MonitoringCollector) Describe(ch chan<- *prometheus.Desc) {
}

func (c *MonitoringCollector) Collect(ch chan<- prometheus.Metric) {
var begun = time.Now()

errorMetric := float64(0)
if err := c.reportMonitoringMetrics(ch); err != nil {
errorMetric = float64(1)
c.scrapeErrorsTotalMetric.Inc()
log.Errorf("Error while getting Google Stackdriver Monitoring metrics: %s", err)
for _, timeSeries := range c.cache.cachedTimeSeries {
timeSeries.Complete(ch)
}
c.scrapeErrorsTotalMetric.Collect(ch)

c.scrapeErrorsTotalMetric.Collect(ch)
c.apiCallsTotalMetric.Collect(ch)

c.scrapesTotalMetric.Inc()
c.scrapesTotalMetric.Collect(ch)

c.lastScrapeErrorMetric.Set(errorMetric)
c.lastScrapeErrorMetric.Collect(ch)

c.lastScrapeTimestampMetric.Set(float64(time.Now().Unix()))
c.lastScrapeTimestampMetric.Collect(ch)

c.lastScrapeDurationSecondsMetric.Set(time.Since(begun).Seconds())
c.lastScrapeDurationSecondsMetric.Collect(ch)

if !c.cache.isCollectionActive() {
go func() {
start := time.Now()
errorMetric := float64(0)

c.cache.markCollectionStarted()
if err := c.updateMetricsCache(); err != nil {
errorMetric = float64(1)
c.scrapeErrorsTotalMetric.Inc()
log.Errorf("Error while getting Google Stackdriver Monitoring metrics: %s", err)
}

c.scrapesTotalMetric.Inc()
c.lastScrapeErrorMetric.Set(errorMetric)
c.lastScrapeTimestampMetric.Set(float64(time.Now().Unix()))
c.lastScrapeDurationSecondsMetric.Set(time.Since(start).Seconds())

c.cache.markCollectionCompleted()
}()
}
}

func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metric) error {
func (c *MonitoringCollector) updateMetricsCache() error {
metricDescriptorsFunction := func(page *monitoring.ListMetricDescriptorsResponse) error {
var wg = &sync.WaitGroup{}

Expand All @@ -181,7 +248,7 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri

for _, metricDescriptor := range uniqueDescriptors {
wg.Add(1)
go func(metricDescriptor *monitoring.MetricDescriptor, ch chan<- prometheus.Metric) {
go func(metricDescriptor *monitoring.MetricDescriptor) {
defer wg.Done()
log.Debugf("Retrieving Google Stackdriver Monitoring metrics for descriptor `%s`...", metricDescriptor.Type)
filter := fmt.Sprintf("metric.type=\"%s\"", metricDescriptor.Type)
Expand All @@ -193,9 +260,13 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri
}
timeSeriesListCall := c.monitoringService.Projects.TimeSeries.List(utils.ProjectResource(c.projectID)).
Filter(filter).
PageSize(100000).
IntervalStartTime(startTime.Format(time.RFC3339Nano)).
IntervalEndTime(endTime.Format(time.RFC3339Nano))

pageNumber := 0

start := time.Now()
for {
c.apiCallsTotalMetric.Inc()
page, err := timeSeriesListCall.Do()
Expand All @@ -204,20 +275,26 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri
errChannel <- err
break
}

if page == nil {
break
}
if err := c.reportTimeSeriesMetrics(page, metricDescriptor, ch); err != nil {
if err := c.updateMetricsCacheForMetric(page, metricDescriptor); err != nil {
log.Errorf("Error reporting Time Series metrics for descriptor `%s`: %v", metricDescriptor.Type, err)
errChannel <- err
break
}
if page.NextPageToken == "" {
break
}
pageNumber++
timeSeriesListCall.PageToken(page.NextPageToken)
}
}(metricDescriptor, ch)

elapsed := time.Since(start)
log.Debugf("Took %s to retrieve %v pages for metric descriptor %s", elapsed, pageNumber+1, metricDescriptor.Type)

}(metricDescriptor)
}

wg.Wait()
Expand Down Expand Up @@ -257,18 +334,15 @@ func (c *MonitoringCollector) reportMonitoringMetrics(ch chan<- prometheus.Metri
return <-errChannel
}

func (c *MonitoringCollector) reportTimeSeriesMetrics(
func (c *MonitoringCollector) updateMetricsCacheForMetric(
page *monitoring.ListTimeSeriesResponse,
metricDescriptor *monitoring.MetricDescriptor,
ch chan<- prometheus.Metric,
) error {
metricDescriptor *monitoring.MetricDescriptor) error {
var metricValue float64
var metricValueType prometheus.ValueType
var newestTSPoint *monitoring.Point

timeSeriesMetrics := &TimeSeriesMetrics{
metricDescriptor: metricDescriptor,
ch: ch,
fillMissingLabels: c.collectorFillMissingLabels,
constMetrics: make(map[string][]ConstMetric),
histogramMetrics: make(map[string][]HistogramMetric),
Expand Down Expand Up @@ -354,7 +428,7 @@ func (c *MonitoringCollector) reportTimeSeriesMetrics(

timeSeriesMetrics.CollectNewConstMetric(timeSeries, labelKeys, metricValueType, metricValue, labelValues)
}
timeSeriesMetrics.Complete()
c.cache.putMetric(metricDescriptor.Type, timeSeriesMetrics)
return nil
}

Expand Down
22 changes: 11 additions & 11 deletions collectors/monitoring_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/api/monitoring/v3"

"github.com/frodenas/stackdriver_exporter/utils"
"sort"

"github.com/frodenas/stackdriver_exporter/utils"
)

func buildFQName(timeSeries *monitoring.TimeSeries) string {
Expand All @@ -18,7 +19,6 @@ func buildFQName(timeSeries *monitoring.TimeSeries) string {

type TimeSeriesMetrics struct {
metricDescriptor *monitoring.MetricDescriptor
ch chan<- prometheus.Metric

fillMissingLabels bool
constMetrics map[string][]ConstMetric
Expand Down Expand Up @@ -74,7 +74,7 @@ func (t *TimeSeriesMetrics) CollectNewConstHistogram(timeSeries *monitoring.Time
t.histogramMetrics[fqName] = append(vs, v)
return
}
t.ch <- t.newConstHistogram(fqName, labelKeys, dist, buckets, labelValues)
// t.ch <- t.newConstHistogram(fqName, labelKeys, dist, buckets, labelValues)
}

func (t *TimeSeriesMetrics) newConstHistogram(fqName string, labelKeys []string, dist *monitoring.Distribution, buckets map[float64]uint64, labelValues []string) prometheus.Metric {
Expand Down Expand Up @@ -107,7 +107,7 @@ func (t *TimeSeriesMetrics) CollectNewConstMetric(timeSeries *monitoring.TimeSer
t.constMetrics[fqName] = append(vs, v)
return
}
t.ch <- t.newConstMetric(fqName, labelKeys, metricValueType, metricValue, labelValues)
// t.ch <- t.newConstMetric(fqName, labelKeys, metricValueType, metricValue, labelValues)
}

func (t *TimeSeriesMetrics) newConstMetric(fqName string, labelKeys []string, metricValueType prometheus.ValueType, metricValue float64, labelValues []string) prometheus.Metric {
Expand All @@ -131,12 +131,12 @@ func hashLabelKeys(labelKeys []string) uint64 {
return dh
}

func (t *TimeSeriesMetrics) Complete() {
t.completeConstMetrics()
t.completeHistogramMetrics()
func (t *TimeSeriesMetrics) Complete(ch chan<- prometheus.Metric) {
t.completeConstMetrics(ch)
t.completeHistogramMetrics(ch)
}

func (t *TimeSeriesMetrics) completeConstMetrics() {
func (t *TimeSeriesMetrics) completeConstMetrics(ch chan<- prometheus.Metric) {
for _, vs := range t.constMetrics {
if len(vs) > 1 {
var needFill bool
Expand All @@ -151,12 +151,12 @@ func (t *TimeSeriesMetrics) completeConstMetrics() {
}

for _, v := range vs {
t.ch <- t.newConstMetric(v.fqName, v.labelKeys, v.valueType, v.value, v.labelValues)
ch <- t.newConstMetric(v.fqName, v.labelKeys, v.valueType, v.value, v.labelValues)
}
}
}

func (t *TimeSeriesMetrics) completeHistogramMetrics() {
func (t *TimeSeriesMetrics) completeHistogramMetrics(ch chan<- prometheus.Metric) {
for _, vs := range t.histogramMetrics {
if len(vs) > 1 {
var needFill bool
Expand All @@ -170,7 +170,7 @@ func (t *TimeSeriesMetrics) completeHistogramMetrics() {
}
}
for _, v := range vs {
t.ch <- t.newConstHistogram(v.fqName, v.labelKeys, v.dist, v.buckets, v.labelValues)
ch <- t.newConstHistogram(v.fqName, v.labelKeys, v.dist, v.buckets, v.labelValues)
}
}
}
Expand Down

0 comments on commit dea3b3b

Please sign in to comment.