Skip to content

Commit

Permalink
process scrape loops reloading in parallel (#4526)
Browse files Browse the repository at this point in the history
The scrape manage receiver's channel now just saves the target sets
and another backgorund runner updates the scrape loops every 5 seconds.
This is so that the scrape manager doesn't block the receiving channel
when it does the long background reloading of the scrape loops.

Active and dropped targets are now saved in each scrape pool instead of
the scrape manager. This is mainly to avoid races when getting the
targets via the web api.

When reloading the scrape loops now happens in parallel to speed up the
final disared state and this also speeds up the prometheus's shutting
down.

Also updated some funcs signatures in the web package for consistency.

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
  • Loading branch information
krasi-georgiev committed Sep 26, 2018
1 parent abf6fe0 commit 47a673c
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 190 deletions.
151 changes: 94 additions & 57 deletions scrape/manager.go
Expand Up @@ -14,9 +14,9 @@
package scrape

import (
"fmt"
"reflect"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand All @@ -33,13 +33,16 @@ type Appendable interface {

// NewManager is the Manager constructor
func NewManager(logger log.Logger, app Appendable) *Manager {
if logger == nil {
logger = log.NewNopLogger()
}
return &Manager{
append: app,
logger: logger,
scrapeConfigs: make(map[string]*config.ScrapeConfig),
scrapePools: make(map[string]*scrapePool),
graceShut: make(chan struct{}),
targetsAll: make(map[string][]*Target),
triggerReload: make(chan struct{}, 1),
}
}

Expand All @@ -50,28 +53,83 @@ type Manager struct {
append Appendable
graceShut chan struct{}

mtxTargets sync.Mutex // Guards the fields below.
targetsActive []*Target
targetsDropped []*Target
targetsAll map[string][]*Target

mtxScrape sync.Mutex // Guards the fields below.
scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool
targetSets map[string][]*targetgroup.Group

triggerReload chan struct{}
}

// Run starts background processing to handle target updates and reload the scraping loops.
// Run receives and saves target set updates and triggers the scraping loops reloading.
// Reloading happens in the background so that it doesn't block receiving targets updates.
func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
go m.reloader()
for {
select {
case ts := <-tsets:
m.reload(ts)
m.updateTsets(ts)

select {
case m.triggerReload <- struct{}{}:
default:
}

case <-m.graceShut:
return nil
}
}
}

func (m *Manager) reloader() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-m.graceShut:
return
case <-ticker.C:
select {
case <-m.triggerReload:
m.reload()
case <-m.graceShut:
return
}
}
}
}

func (m *Manager) reload() {
m.mtxScrape.Lock()
var wg sync.WaitGroup
for setName, groups := range m.targetSets {
var sp *scrapePool
existing, ok := m.scrapePools[setName]
if !ok {
scrapeConfig, ok := m.scrapeConfigs[setName]
if !ok {
level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName)
return
}
sp = newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", setName))
m.scrapePools[setName] = sp
} else {
sp = existing
}

wg.Add(1)
// Run the sync in parallel as these take a while and at high load can't catch up.
go func(sp *scrapePool, groups []*targetgroup.Group) {
sp.Sync(groups)
wg.Done()
}(sp, groups)

}
m.mtxScrape.Unlock()
wg.Wait()
}

// Stop cancels all running scrape pools and blocks until all have exited.
func (m *Manager) Stop() {
m.mtxScrape.Lock()
Expand All @@ -83,6 +141,12 @@ func (m *Manager) Stop() {
close(m.graceShut)
}

func (m *Manager) updateTsets(tsets map[string][]*targetgroup.Group) {
m.mtxScrape.Lock()
m.targetSets = tsets
m.mtxScrape.Unlock()
}

// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg.
func (m *Manager) ApplyConfig(cfg *config.Config) error {
m.mtxScrape.Lock()
Expand All @@ -109,64 +173,37 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {

// TargetsAll returns active and dropped targets grouped by job_name.
func (m *Manager) TargetsAll() map[string][]*Target {
m.mtxTargets.Lock()
defer m.mtxTargets.Unlock()
return m.targetsAll
}
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()

// TargetsActive returns the active targets currently being scraped.
func (m *Manager) TargetsActive() []*Target {
m.mtxTargets.Lock()
defer m.mtxTargets.Unlock()
return m.targetsActive
}
targets := make(map[string][]*Target, len(m.scrapePools))
for tset, sp := range m.scrapePools {
targets[tset] = append(sp.ActiveTargets(), sp.DroppedTargets()...)

// TargetsDropped returns the dropped targets during relabelling.
func (m *Manager) TargetsDropped() []*Target {
m.mtxTargets.Lock()
defer m.mtxTargets.Unlock()
return m.targetsDropped
}
return targets
}

func (m *Manager) targetsUpdate(active, dropped map[string][]*Target) {
m.mtxTargets.Lock()
defer m.mtxTargets.Unlock()

m.targetsAll = make(map[string][]*Target)
m.targetsActive = nil
m.targetsDropped = nil
for jobName, targets := range active {
m.targetsAll[jobName] = append(m.targetsAll[jobName], targets...)
m.targetsActive = append(m.targetsActive, targets...)
// TargetsActive returns the active targets currently being scraped.
func (m *Manager) TargetsActive() map[string][]*Target {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()

targets := make(map[string][]*Target, len(m.scrapePools))
for tset, sp := range m.scrapePools {
targets[tset] = sp.ActiveTargets()
}
for jobName, targets := range dropped {
m.targetsAll[jobName] = append(m.targetsAll[jobName], targets...)
m.targetsDropped = append(m.targetsDropped, targets...)
}
return targets
}

func (m *Manager) reload(t map[string][]*targetgroup.Group) {
// TargetsDropped returns the dropped targets during relabelling.
func (m *Manager) TargetsDropped() map[string][]*Target {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()

tDropped := make(map[string][]*Target)
tActive := make(map[string][]*Target)

for tsetName, tgroup := range t {
var sp *scrapePool
if existing, ok := m.scrapePools[tsetName]; !ok {
scrapeConfig, ok := m.scrapeConfigs[tsetName]
if !ok {
level.Error(m.logger).Log("msg", "error reloading target set", "err", fmt.Sprintf("invalid config id:%v", tsetName))
continue
}
sp = newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName))
m.scrapePools[tsetName] = sp
} else {
sp = existing
}
tActive[tsetName], tDropped[tsetName] = sp.Sync(tgroup)
targets := make(map[string][]*Target, len(m.scrapePools))
for tset, sp := range m.scrapePools {
targets[tset] = sp.DroppedTargets()
}
m.targetsUpdate(tActive, tDropped)
return targets
}
43 changes: 41 additions & 2 deletions scrape/manager_test.go
Expand Up @@ -15,10 +15,13 @@ package scrape

import (
"fmt"
"strconv"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/util/testutil"

Expand Down Expand Up @@ -252,8 +255,8 @@ scrape_configs:
}

sp := &scrapePool{
appendable: &nopAppendable{},
targets: map[uint64]*Target{},
appendable: &nopAppendable{},
activeTargets: map[uint64]*Target{},
loops: map[uint64]loop{
1: &testLoop{},
},
Expand All @@ -267,3 +270,39 @@ scrape_configs:

scrapeManager.ApplyConfig(cfg)
}

func TestManagerTargetsUpdates(t *testing.T) {
m := NewManager(nil, nil)

ts := make(chan map[string][]*targetgroup.Group)
go m.Run(ts)

tgSent := make(map[string][]*targetgroup.Group)
for x := 0; x < 10; x++ {

tgSent[strconv.Itoa(x)] = []*targetgroup.Group{
&targetgroup.Group{
Source: strconv.Itoa(x),
},
}

select {
case ts <- tgSent:
case <-time.After(10 * time.Millisecond):
t.Error("Scrape manager's channel remained blocked after the set threshold.")
}
}

m.mtxScrape.Lock()
tsetActual := m.targetSets
m.mtxScrape.Unlock()

// Make sure all updates have been received.
testutil.Equals(t, tgSent, tsetActual)

select {
case <-m.triggerReload:
default:
t.Error("No scrape loops reload was triggered after targets update.")
}
}

0 comments on commit 47a673c

Please sign in to comment.