Skip to content

Commit

Permalink
Remove GC and callback from store.go
Browse files Browse the repository at this point in the history
This commit removes the GC and callback function from store.go
to address a number of data races that have occurred in the past
(#2040 and #3648). The store is no longer responsible for removing
resolved alerts after some elapsed period of time, and is instead
deferred to the consumer of the store (as done in #2040 and #3648).

Signed-off-by: George Robinson <george.robinson@grafana.com>
  • Loading branch information
grobinson-grafana committed May 13, 2024
1 parent c4a763c commit 90dfc8f
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 144 deletions.
13 changes: 12 additions & 1 deletion inhibit/inhibit.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,18 @@ func (ih *Inhibitor) Run() {
runCtx, runCancel := context.WithCancel(ctx)

for _, rule := range ih.rules {
go rule.scache.Run(runCtx, 15*time.Minute)
go func(rule *InhibitRule) {
ticker := time.NewTicker(15 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
rule.scache.DeleteResolved()
case <-runCtx.Done():
return
}
}
}(rule)
}

g.Add(func() error {
Expand Down
90 changes: 55 additions & 35 deletions provider/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,33 +100,23 @@ func NewAlerts(ctx context.Context, m types.AlertMarker, intervalGC time.Duratio
logger: log.With(l, "component", "provider"),
callback: alertCallback,
}
a.alerts.SetGCCallback(func(alerts []types.Alert) {
for _, alert := range alerts {
// As we don't persist alerts, we no longer consider them after
// they are resolved. Alerts waiting for resolved notifications are
// held in memory in aggregation groups redundantly.
m.Delete(alert.Fingerprint())
a.callback.PostDelete(&alert)
}

a.mtx.Lock()
for i, l := range a.listeners {
select {
case <-l.done:
delete(a.listeners, i)
close(l.alerts)
default:
// listener is not closed yet, hence proceed.
}
}
a.mtx.Unlock()
})

if r != nil {
a.registerMetrics(r)
}

go a.alerts.Run(ctx, intervalGC)
go func() {
ticker := time.NewTicker(intervalGC)
defer ticker.Stop()
for {
select {
case <-ticker.C:
a.doMaintenance()
case <-ctx.Done():
return
}
}
}()

return a, nil
}
Expand All @@ -151,11 +141,10 @@ func max(a, b int) int {
func (a *Alerts) Subscribe() provider.AlertIterator {
a.mtx.Lock()
defer a.mtx.Unlock()
var (
done = make(chan struct{})
alerts = a.alerts.List()
ch = make(chan *types.Alert, max(len(alerts), alertChannelLength))
)

done := make(chan struct{})
alerts := a.alerts.List()
ch := make(chan *types.Alert, max(len(alerts), alertChannelLength))

for _, a := range alerts {
ch <- a
Expand All @@ -170,15 +159,16 @@ func (a *Alerts) Subscribe() provider.AlertIterator {
// GetPending returns an iterator over all the alerts that have
// pending notifications.
func (a *Alerts) GetPending() provider.AlertIterator {
var (
ch = make(chan *types.Alert, alertChannelLength)
done = make(chan struct{})
)
a.mtx.Lock()
defer a.mtx.Unlock()
alerts := a.alerts.List()

ch := make(chan *types.Alert, alertChannelLength)
done := make(chan struct{})

go func() {
defer close(ch)

for _, a := range a.alerts.List() {
for _, a := range alerts {
select {
case ch <- a:
case <-done:
Expand All @@ -192,11 +182,15 @@ func (a *Alerts) GetPending() provider.AlertIterator {

// Get returns the alert for a given fingerprint.
func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
return a.alerts.Get(fp)
}

// Put adds the given alert to the set.
func (a *Alerts) Put(alerts ...*types.Alert) error {
a.mtx.Lock()
defer a.mtx.Unlock()
for _, alert := range alerts {
fp := alert.Fingerprint()

Expand Down Expand Up @@ -226,21 +220,21 @@ func (a *Alerts) Put(alerts ...*types.Alert) error {

a.callback.PostStore(alert, existing)

a.mtx.Lock()
for _, l := range a.listeners {
select {
case l.alerts <- alert:
case <-l.done:
}
}
a.mtx.Unlock()
}

return nil
}

// count returns the number of non-resolved alerts we currently have stored filtered by the provided state.
func (a *Alerts) count(state types.AlertState) int {
a.mtx.Lock()
defer a.mtx.Unlock()
var count int
for _, alert := range a.alerts.List() {
if alert.Resolved() {
Expand All @@ -258,6 +252,32 @@ func (a *Alerts) count(state types.AlertState) int {
return count
}

func (a *Alerts) doMaintenance() {
a.mtx.Lock()
defer a.mtx.Unlock()
for _, alert := range a.alerts.List() {
if alert.Resolved() {
// TODO(grobinson-grafana): See if we can use a single method instead of calling List() and then Delete().
a.alerts.Delete(alert.Fingerprint())
// As we don't persist alerts, we no longer consider them after
// they are resolved. Alerts waiting for resolved notifications are
// held in memory in aggregation groups redundantly.
a.marker.Delete(alert.Fingerprint())
a.callback.PostDelete(alert)
}
}

for i, l := range a.listeners {
select {
case <-l.done:
delete(a.listeners, i)
close(l.alerts)
default:
// listener is not closed yet, hence proceed.
}
}
}

type noopCallback struct{}

func (n noopCallback) PreStore(_ *types.Alert, _ bool) error { return nil }
Expand Down
81 changes: 24 additions & 57 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
package store

import (
"context"
"errors"
"sync"
"time"

"github.com/prometheus/common/model"

Expand All @@ -28,70 +26,19 @@ import (
var ErrNotFound = errors.New("alert not found")

// Alerts provides lock-coordinated to an in-memory map of alerts, keyed by
// their fingerprint. Resolved alerts are removed from the map based on
// gcInterval. An optional callback can be set which receives a slice of all
// resolved alerts that have been removed.
// their fingerprint.
type Alerts struct {
sync.Mutex
c map[model.Fingerprint]*types.Alert
cb func([]types.Alert)
c map[model.Fingerprint]*types.Alert
}

// NewAlerts returns a new Alerts struct.
func NewAlerts() *Alerts {
a := &Alerts{
c: make(map[model.Fingerprint]*types.Alert),
cb: func(_ []types.Alert) {},
}

return a
}

// SetGCCallback sets a GC callback to be executed after each GC.
func (a *Alerts) SetGCCallback(cb func([]types.Alert)) {
a.Lock()
defer a.Unlock()

a.cb = cb
}

// Run starts the GC loop. The interval must be greater than zero; if not, the function will panic.
func (a *Alerts) Run(ctx context.Context, interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
a.gc()
}
return &Alerts{
c: make(map[model.Fingerprint]*types.Alert),
}
}

func (a *Alerts) gc() {
a.Lock()
var resolved []types.Alert
for fp, alert := range a.c {
if alert.Resolved() {
delete(a.c, fp)
resolved = append(resolved, types.Alert{
Alert: model.Alert{
Labels: alert.Labels.Clone(),
Annotations: alert.Annotations.Clone(),
StartsAt: alert.StartsAt,
EndsAt: alert.EndsAt,
GeneratorURL: alert.GeneratorURL,
},
UpdatedAt: alert.UpdatedAt,
Timeout: alert.Timeout,
})
}
}
a.Unlock()
a.cb(resolved)
}

// Get returns the Alert with the matching fingerprint, or an error if it is
// not found.
func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) {
Expand All @@ -114,6 +61,12 @@ func (a *Alerts) Set(alert *types.Alert) error {
return nil
}

func (a *Alerts) Delete(fp model.Fingerprint) {
a.Lock()
defer a.Unlock()
delete(a.c, fp)
}

// DeleteIfNotModified deletes the slice of Alerts from the store if not
// modified.
func (a *Alerts) DeleteIfNotModified(alerts types.AlertSlice) error {
Expand All @@ -128,6 +81,20 @@ func (a *Alerts) DeleteIfNotModified(alerts types.AlertSlice) error {
return nil
}

// DeleteResolved deletes all resolved alerts.
func (a *Alerts) DeleteResolved() []model.Fingerprint {
a.Lock()
defer a.Unlock()
var fps []model.Fingerprint
for fp, alert := range a.c {
if alert.Resolved() {
fps = append(fps, fp)
delete(a.c, fp)
}
}
return fps
}

// List returns a slice of Alerts currently held in memory.
func (a *Alerts) List() []*types.Alert {
a.Lock()
Expand Down
83 changes: 32 additions & 51 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package store

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -138,60 +137,42 @@ func TestDeleteIfNotModified(t *testing.T) {
})
}

func TestGC(t *testing.T) {
now := time.Now()
newAlert := func(key string, start, end time.Duration) *types.Alert {
return &types.Alert{
func TestDeleteResolved(t *testing.T) {
t.Run("active alert should not be deleted", func(t *testing.T) {
a := NewAlerts()
a1 := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{model.LabelName(key): "b"},
StartsAt: now.Add(start * time.Minute),
EndsAt: now.Add(end * time.Minute),
Labels: model.LabelSet{
"foo": "bar",
},
StartsAt: time.Now(),
EndsAt: time.Now().Add(5 * time.Minute),
},
}
}
active := []*types.Alert{
newAlert("b", 10, 20),
newAlert("c", -10, 10),
}
resolved := []*types.Alert{
newAlert("a", -10, -5),
newAlert("d", -10, -1),
}
s := NewAlerts()
var (
n int
done = make(chan struct{})
ctx, cancel = context.WithCancel(context.Background())
)
s.SetGCCallback(func(a []types.Alert) {
n += len(a)
if n >= len(resolved) {
cancel()
}
require.NoError(t, a.Set(a1))
require.Empty(t, a.DeleteResolved())
// a1 should not have been deleted.
got, err := a.Get(a1.Fingerprint())
require.NoError(t, err)
require.Equal(t, a1, got)
})
for _, alert := range append(active, resolved...) {
require.NoError(t, s.Set(alert))
}
go func() {
s.Run(ctx, 10*time.Millisecond)
close(done)
}()
select {
case <-done:
break
case <-time.After(1 * time.Second):
t.Fatal("garbage collection didn't complete in time")
}

for _, alert := range active {
if _, err := s.Get(alert.Fingerprint()); err != nil {
t.Errorf("alert %v should not have been gc'd", alert)
}
}
for _, alert := range resolved {
if _, err := s.Get(alert.Fingerprint()); err == nil {
t.Errorf("alert %v should have been gc'd", alert)
t.Run("resolved alert should not be deleted", func(t *testing.T) {
a := NewAlerts()
a1 := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{
"foo": "bar",
},
StartsAt: time.Now().Add(-5 * time.Minute),
EndsAt: time.Now().Add(-time.Second),
},
}
}
require.Len(t, resolved, n)
require.NoError(t, a.Set(a1))
require.Equal(t, []model.Fingerprint{a1.Fingerprint()}, a.DeleteResolved())
// a1 should have been deleted.
got, err := a.Get(a1.Fingerprint())
require.Equal(t, ErrNotFound, err)
require.Nil(t, got)
})
}

0 comments on commit 90dfc8f

Please sign in to comment.