Skip to content

Commit

Permalink
Split notifier select in 2 to ensure newer targets are used. (#10948)
Browse files Browse the repository at this point in the history
* Split notifier select in 2 to ensure newer targets are used.

Signed-off-by: Julien Pivotto <roidelapluie@o11y.eu>
  • Loading branch information
roidelapluie committed Jul 1, 2022
1 parent 4aa693d commit 02f3297
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 1 deletion.
12 changes: 11 additions & 1 deletion notifier/notifier.go
Expand Up @@ -302,12 +302,22 @@ func (n *Manager) nextBatch() []*Alert {
// Run dispatches notifications continuously.
func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
for {
// The select is split in two parts, such as we will first try to read
// new alertmanager targets if they are available, before sending new
// alerts.
select {
case <-n.ctx.Done():
return
case ts := <-tsets:
n.reload(ts)
case <-n.more:
default:
select {
case <-n.ctx.Done():
return
case ts := <-tsets:
n.reload(ts)
case <-n.more:
}
}
alerts := n.nextBatch()

Expand Down
111 changes: 111 additions & 0 deletions notifier/notifier_test.go
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -571,3 +572,113 @@ func makeInputTargetGroup() *targetgroup.Group {
func TestLabelsToOpenAPILabelSet(t *testing.T) {
require.Equal(t, models.LabelSet{"aaa": "111", "bbb": "222"}, labelsToOpenAPILabelSet(labels.Labels{{Name: "aaa", Value: "111"}, {Name: "bbb", Value: "222"}}))
}

// TestHangingNotifier validates that targets updates happen even when there are
// queued alerts.
func TestHangingNotifier(t *testing.T) {
// Note: When targets are not updated in time, this test is flaky because go
// selects are not deterministic. Therefore we run 10 subtests to run into the issue.
for i := 0; i < 10; i++ {
t.Run(strconv.Itoa(i), func(t *testing.T) {
var (
done = make(chan struct{})
changed = make(chan struct{})
syncCh = make(chan map[string][]*targetgroup.Group)
)

defer func() {
close(done)
}()

// Setting up a bad server. This server hangs for 2 seconds.
badServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
select {
case <-done:
case <-time.After(2 * time.Second):
}
}))
badURL, err := url.Parse(badServer.URL)
require.NoError(t, err)
badAddress := badURL.Host // Used for __name__ label in targets.

// Setting up a bad server. This server returns fast, signaling requests on
// by closing the changed channel.
goodServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
close(changed)
}))
goodURL, err := url.Parse(goodServer.URL)
require.NoError(t, err)
goodAddress := goodURL.Host // Used for __name__ label in targets.

h := NewManager(
&Options{
QueueCapacity: 20 * maxBatchSize,
},
nil,
)

h.alertmanagers = make(map[string]*alertmanagerSet)

am1Cfg := config.DefaultAlertmanagerConfig
am1Cfg.Timeout = model.Duration(200 * time.Millisecond)

h.alertmanagers["config-0"] = &alertmanagerSet{
ams: []alertmanager{},
cfg: &am1Cfg,
metrics: h.metrics,
}
go h.Run(syncCh)
defer h.Stop()

var alerts []*Alert
for i := range make([]struct{}, 20*maxBatchSize) {
alerts = append(alerts, &Alert{
Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
})
}

// Injecting the hanging server URL.
syncCh <- map[string][]*targetgroup.Group{
"config-0": {
{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue(badAddress),
},
},
},
},
}

// Queing alerts.
h.Send(alerts...)

// Updating with a working alertmanager target.
go func() {
select {
case syncCh <- map[string][]*targetgroup.Group{
"config-0": {
{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue(goodAddress),
},
},
},
},
}:
case <-done:
}
}()

select {
case <-time.After(300 * time.Millisecond):
t.Fatalf("Timeout after 300 milliseconds, targets not synced in time.")
case <-changed:
// The good server has been hit in less than 3 seconds, therefore
// targets have been updated before a second call could be made to the
// bad server.
}
})
}
}

0 comments on commit 02f3297

Please sign in to comment.