Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

discovery: add metrics + send updates from one goroutine only #4667

Merged
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.
+57 −20
Diff settings

Always

Just for now

@@ -47,10 +47,34 @@ var (
Help: "Total number of service discovery configurations that failed to load.",
},
)
discoveredTargets = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "prometheus_sd_discovered_targets",
Help: "Current number of discovered targets.",
},
)
receivedUpdates = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_sd_received_updates_total",
Help: "Total number of update events received from the SD providers.",
},
)
delayedUpdates = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_sd_updates_delayed_total",
Help: "Total number of update events that couldn't be sent immediately.",
},
)
sentUpdates = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_sd_updates_total",
Help: "Total number of update events sent to the SD consumers.",
},
)
)

func init() {
prometheus.MustRegister(failedConfigs)
prometheus.MustRegister(failedConfigs, discoveredTargets, receivedUpdates, delayedUpdates, sentUpdates)
}

// Discoverer provides information about target groups. It maintains a set
@@ -62,7 +86,7 @@ func init() {
//
// Discoverers should initially send a full set of all discoverable TargetGroups.
type Discoverer interface {
// Run hands a channel to the discovery provider(consul,dns etc) through which it can send
// Run hands a channel to the discovery provider (Consul, DNS etc) through which it can send
// updated target groups.
// Must returns if the context gets canceled. It should not close the update
// channel on returning.
@@ -94,6 +118,7 @@ func NewManager(ctx context.Context, logger log.Logger) *Manager {
discoverCancel: []context.CancelFunc{},
ctx: ctx,
updatert: 5 * time.Second,
trigger: make(chan struct{}, 1),
}
}

@@ -110,24 +135,28 @@ type Manager struct {
targets map[poolKey]map[string]*targetgroup.Group
// providers keeps track of SD providers.
providers []*provider
// The sync channels sends the updates in map[targetSetName] where targetSetName is the job value from the scrape config.
// The sync channel sends the updates as a map where the key is the job value from the scrape config.
syncCh chan map[string][]*targetgroup.Group

// How long to wait before sending updates to the channel. The variable
// should only be modified in unit tests.
updatert time.Duration

// The trigger channel signals to the manager that new updates have been received from providers.
trigger chan struct{}

This comment has been minimized.

Copy link
@krasi-georgiev

krasi-georgiev Sep 27, 2018

Member

how about triggerSend

}

// Run starts the background processing
func (m *Manager) Run() error {
go m.sendUpdates()
for range m.ctx.Done() {
m.cancelDiscoverers()
return m.ctx.Err()
}
return nil
}

// SyncCh returns a read only channel used by all Discoverers to send target updates.
// SyncCh returns a read only channel used by all the clients to receive target updates.
func (m *Manager) SyncCh() <-chan map[string][]*targetgroup.Group {
return m.syncCh
}
@@ -171,43 +200,48 @@ func (m *Manager) startProvider(ctx context.Context, p *provider) {
}

func (m *Manager) updater(ctx context.Context, p *provider, updates chan []*targetgroup.Group) {
ticker := time.NewTicker(m.updatert)
defer ticker.Stop()

triggerUpdate := make(chan struct{}, 1)

for {
select {
case <-ctx.Done():
return
case tgs, ok := <-updates:
receivedUpdates.Inc()
if !ok {
level.Debug(m.logger).Log("msg", "discoverer channel closed, sending the last update", "provider", p.name)
select {
case m.syncCh <- m.allGroups(): // Waiting until the receiver can accept the last update.
level.Debug(m.logger).Log("msg", "discoverer exited", "provider", p.name)
case <-ctx.Done():
}

level.Debug(m.logger).Log("msg", "discoverer channel closed", "provider", p.name)
return
}

for _, s := range p.subs {
m.updateGroup(poolKey{setName: s, provider: p.name}, tgs)
}

select {
case triggerUpdate <- struct{}{}:
case m.trigger <- struct{}{}:
default:
}
}
}
}

func (m *Manager) sendUpdates() {

This comment has been minimized.

Copy link
@krasi-georgiev

krasi-georgiev Sep 27, 2018

Member

how about just sender() ?

ticker := time.NewTicker(m.updatert)
defer ticker.Stop()

for {
select {
case <-m.ctx.Done():
return
case <-ticker.C: // Some discoverers send updates too often so we throttle these with the ticker.
select {
case <-triggerUpdate:
case <-m.trigger:
sentUpdates.Inc()
select {
case m.syncCh <- m.allGroups():
default:
level.Debug(m.logger).Log("msg", "discovery receiver's channel was full so will retry the next cycle", "provider", p.name)
delayedUpdates.Inc()
level.Debug(m.logger).Log("msg", "discovery receiver's channel was full so will retry the next cycle")
select {
case triggerUpdate <- struct{}{}:
case m.trigger <- struct{}{}:
default:
}
}
@@ -245,13 +279,16 @@ func (m *Manager) allGroups() map[string][]*targetgroup.Group {
defer m.mtx.Unlock()

tSets := map[string][]*targetgroup.Group{}
var n int
for pkey, tsets := range m.targets {
for _, tg := range tsets {
// Even if the target group 'tg' is empty we still need to send it to the 'Scrape manager'
// to signal that it needs to stop all scrape loops for this target set.
tSets[pkey.setName] = append(tSets[pkey.setName], tg)
n += len(tg.Targets)
}
}
discoveredTargets.Set(float64(n))
return tSets
}

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.