diff --git a/inhibit/inhibit.go b/inhibit/inhibit.go index 42e20242e4..d6d37d5446 100644 --- a/inhibit/inhibit.go +++ b/inhibit/inhibit.go @@ -95,7 +95,18 @@ func (ih *Inhibitor) Run() { runCtx, runCancel := context.WithCancel(ctx) for _, rule := range ih.rules { - go rule.scache.Run(runCtx, 15*time.Minute) + go func(rule *InhibitRule) { + ticker := time.NewTicker(15 * time.Minute) + defer ticker.Stop() + for { + select { + case <-ticker.C: + rule.scache.DeleteResolved() + case <-runCtx.Done(): + return + } + } + }(rule) } g.Add(func() error { diff --git a/provider/mem/mem.go b/provider/mem/mem.go index cfc3bfc36f..b01295484f 100644 --- a/provider/mem/mem.go +++ b/provider/mem/mem.go @@ -100,33 +100,23 @@ func NewAlerts(ctx context.Context, m types.AlertMarker, intervalGC time.Duratio logger: log.With(l, "component", "provider"), callback: alertCallback, } - a.alerts.SetGCCallback(func(alerts []types.Alert) { - for _, alert := range alerts { - // As we don't persist alerts, we no longer consider them after - // they are resolved. Alerts waiting for resolved notifications are - // held in memory in aggregation groups redundantly. - m.Delete(alert.Fingerprint()) - a.callback.PostDelete(&alert) - } - - a.mtx.Lock() - for i, l := range a.listeners { - select { - case <-l.done: - delete(a.listeners, i) - close(l.alerts) - default: - // listener is not closed yet, hence proceed. - } - } - a.mtx.Unlock() - }) if r != nil { a.registerMetrics(r) } - go a.alerts.Run(ctx, intervalGC) + go func() { + ticker := time.NewTicker(intervalGC) + defer ticker.Stop() + for { + select { + case <-ticker.C: + a.doMaintenance() + case <-ctx.Done(): + return + } + } + }() return a, nil } @@ -151,11 +141,10 @@ func max(a, b int) int { func (a *Alerts) Subscribe() provider.AlertIterator { a.mtx.Lock() defer a.mtx.Unlock() - var ( - done = make(chan struct{}) - alerts = a.alerts.List() - ch = make(chan *types.Alert, max(len(alerts), alertChannelLength)) - ) + + done := make(chan struct{}) + alerts := a.alerts.List() + ch := make(chan *types.Alert, max(len(alerts), alertChannelLength)) for _, a := range alerts { ch <- a @@ -170,15 +159,16 @@ func (a *Alerts) Subscribe() provider.AlertIterator { // GetPending returns an iterator over all the alerts that have // pending notifications. func (a *Alerts) GetPending() provider.AlertIterator { - var ( - ch = make(chan *types.Alert, alertChannelLength) - done = make(chan struct{}) - ) + a.mtx.Lock() + defer a.mtx.Unlock() + alerts := a.alerts.List() + + ch := make(chan *types.Alert, alertChannelLength) + done := make(chan struct{}) go func() { defer close(ch) - - for _, a := range a.alerts.List() { + for _, a := range alerts { select { case ch <- a: case <-done: @@ -192,11 +182,15 @@ func (a *Alerts) GetPending() provider.AlertIterator { // Get returns the alert for a given fingerprint. func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) { + a.mtx.Lock() + defer a.mtx.Unlock() return a.alerts.Get(fp) } // Put adds the given alert to the set. func (a *Alerts) Put(alerts ...*types.Alert) error { + a.mtx.Lock() + defer a.mtx.Unlock() for _, alert := range alerts { fp := alert.Fingerprint() @@ -226,14 +220,12 @@ func (a *Alerts) Put(alerts ...*types.Alert) error { a.callback.PostStore(alert, existing) - a.mtx.Lock() for _, l := range a.listeners { select { case l.alerts <- alert: case <-l.done: } } - a.mtx.Unlock() } return nil @@ -241,6 +233,8 @@ func (a *Alerts) Put(alerts ...*types.Alert) error { // count returns the number of non-resolved alerts we currently have stored filtered by the provided state. func (a *Alerts) count(state types.AlertState) int { + a.mtx.Lock() + defer a.mtx.Unlock() var count int for _, alert := range a.alerts.List() { if alert.Resolved() { @@ -258,6 +252,32 @@ func (a *Alerts) count(state types.AlertState) int { return count } +func (a *Alerts) doMaintenance() { + a.mtx.Lock() + defer a.mtx.Unlock() + for _, alert := range a.alerts.List() { + if alert.Resolved() { + // TODO(grobinson-grafana): See if we can use a single method instead of calling List() and then Delete(). + a.alerts.Delete(alert.Fingerprint()) + // As we don't persist alerts, we no longer consider them after + // they are resolved. Alerts waiting for resolved notifications are + // held in memory in aggregation groups redundantly. + a.marker.Delete(alert.Fingerprint()) + a.callback.PostDelete(alert) + } + } + + for i, l := range a.listeners { + select { + case <-l.done: + delete(a.listeners, i) + close(l.alerts) + default: + // listener is not closed yet, hence proceed. + } + } +} + type noopCallback struct{} func (n noopCallback) PreStore(_ *types.Alert, _ bool) error { return nil } diff --git a/store/store.go b/store/store.go index 5de0c3eebe..56a6a6a979 100644 --- a/store/store.go +++ b/store/store.go @@ -14,10 +14,8 @@ package store import ( - "context" "errors" "sync" - "time" "github.com/prometheus/common/model" @@ -28,70 +26,19 @@ import ( var ErrNotFound = errors.New("alert not found") // Alerts provides lock-coordinated to an in-memory map of alerts, keyed by -// their fingerprint. Resolved alerts are removed from the map based on -// gcInterval. An optional callback can be set which receives a slice of all -// resolved alerts that have been removed. +// their fingerprint. type Alerts struct { sync.Mutex - c map[model.Fingerprint]*types.Alert - cb func([]types.Alert) + c map[model.Fingerprint]*types.Alert } // NewAlerts returns a new Alerts struct. func NewAlerts() *Alerts { - a := &Alerts{ - c: make(map[model.Fingerprint]*types.Alert), - cb: func(_ []types.Alert) {}, - } - - return a -} - -// SetGCCallback sets a GC callback to be executed after each GC. -func (a *Alerts) SetGCCallback(cb func([]types.Alert)) { - a.Lock() - defer a.Unlock() - - a.cb = cb -} - -// Run starts the GC loop. The interval must be greater than zero; if not, the function will panic. -func (a *Alerts) Run(ctx context.Context, interval time.Duration) { - t := time.NewTicker(interval) - defer t.Stop() - for { - select { - case <-ctx.Done(): - return - case <-t.C: - a.gc() - } + return &Alerts{ + c: make(map[model.Fingerprint]*types.Alert), } } -func (a *Alerts) gc() { - a.Lock() - var resolved []types.Alert - for fp, alert := range a.c { - if alert.Resolved() { - delete(a.c, fp) - resolved = append(resolved, types.Alert{ - Alert: model.Alert{ - Labels: alert.Labels.Clone(), - Annotations: alert.Annotations.Clone(), - StartsAt: alert.StartsAt, - EndsAt: alert.EndsAt, - GeneratorURL: alert.GeneratorURL, - }, - UpdatedAt: alert.UpdatedAt, - Timeout: alert.Timeout, - }) - } - } - a.Unlock() - a.cb(resolved) -} - // Get returns the Alert with the matching fingerprint, or an error if it is // not found. func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) { @@ -114,6 +61,12 @@ func (a *Alerts) Set(alert *types.Alert) error { return nil } +func (a *Alerts) Delete(fp model.Fingerprint) { + a.Lock() + defer a.Unlock() + delete(a.c, fp) +} + // DeleteIfNotModified deletes the slice of Alerts from the store if not // modified. func (a *Alerts) DeleteIfNotModified(alerts types.AlertSlice) error { @@ -128,6 +81,20 @@ func (a *Alerts) DeleteIfNotModified(alerts types.AlertSlice) error { return nil } +// DeleteResolved deletes all resolved alerts. +func (a *Alerts) DeleteResolved() []model.Fingerprint { + a.Lock() + defer a.Unlock() + var fps []model.Fingerprint + for fp, alert := range a.c { + if alert.Resolved() { + fps = append(fps, fp) + delete(a.c, fp) + } + } + return fps +} + // List returns a slice of Alerts currently held in memory. func (a *Alerts) List() []*types.Alert { a.Lock() diff --git a/store/store_test.go b/store/store_test.go index fe1cd0a8ae..7b5af2af67 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -14,7 +14,6 @@ package store import ( - "context" "testing" "time" @@ -138,60 +137,42 @@ func TestDeleteIfNotModified(t *testing.T) { }) } -func TestGC(t *testing.T) { - now := time.Now() - newAlert := func(key string, start, end time.Duration) *types.Alert { - return &types.Alert{ +func TestDeleteResolved(t *testing.T) { + t.Run("active alert should not be deleted", func(t *testing.T) { + a := NewAlerts() + a1 := &types.Alert{ Alert: model.Alert{ - Labels: model.LabelSet{model.LabelName(key): "b"}, - StartsAt: now.Add(start * time.Minute), - EndsAt: now.Add(end * time.Minute), + Labels: model.LabelSet{ + "foo": "bar", + }, + StartsAt: time.Now(), + EndsAt: time.Now().Add(5 * time.Minute), }, } - } - active := []*types.Alert{ - newAlert("b", 10, 20), - newAlert("c", -10, 10), - } - resolved := []*types.Alert{ - newAlert("a", -10, -5), - newAlert("d", -10, -1), - } - s := NewAlerts() - var ( - n int - done = make(chan struct{}) - ctx, cancel = context.WithCancel(context.Background()) - ) - s.SetGCCallback(func(a []types.Alert) { - n += len(a) - if n >= len(resolved) { - cancel() - } + require.NoError(t, a.Set(a1)) + require.Empty(t, a.DeleteResolved()) + // a1 should not have been deleted. + got, err := a.Get(a1.Fingerprint()) + require.NoError(t, err) + require.Equal(t, a1, got) }) - for _, alert := range append(active, resolved...) { - require.NoError(t, s.Set(alert)) - } - go func() { - s.Run(ctx, 10*time.Millisecond) - close(done) - }() - select { - case <-done: - break - case <-time.After(1 * time.Second): - t.Fatal("garbage collection didn't complete in time") - } - for _, alert := range active { - if _, err := s.Get(alert.Fingerprint()); err != nil { - t.Errorf("alert %v should not have been gc'd", alert) - } - } - for _, alert := range resolved { - if _, err := s.Get(alert.Fingerprint()); err == nil { - t.Errorf("alert %v should have been gc'd", alert) + t.Run("resolved alert should not be deleted", func(t *testing.T) { + a := NewAlerts() + a1 := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "foo": "bar", + }, + StartsAt: time.Now().Add(-5 * time.Minute), + EndsAt: time.Now().Add(-time.Second), + }, } - } - require.Len(t, resolved, n) + require.NoError(t, a.Set(a1)) + require.Equal(t, []model.Fingerprint{a1.Fingerprint()}, a.DeleteResolved()) + // a1 should have been deleted. + got, err := a.Get(a1.Fingerprint()) + require.Equal(t, ErrNotFound, err) + require.Nil(t, got) + }) }