diff --git a/main.go b/main.go index abd0e6a18e..15e75d6586 100644 --- a/main.go +++ b/main.go @@ -25,11 +25,8 @@ import ( func main() { flag.Parse() - log.Print("Starting event suppressor...") suppressor := manager.NewSuppressor() defer suppressor.Close() - go suppressor.Dispatch() - log.Println("Done.") log.Println("Starting event aggregator...") aggregator := manager.NewAggregator() diff --git a/manager/dispatcher.go b/manager/dispatcher.go index d2db9ce370..5028ada494 100644 --- a/manager/dispatcher.go +++ b/manager/dispatcher.go @@ -116,7 +116,7 @@ func (d *SummaryDispatcher) Receive(s *EventSummary) RemoteError { } func (d *SummaryDispatcher) dispatchSummary(r *summaryDispatchRequest, i IsInhibitedInterrogator) { - if i.IsInhibited(r.Summary.Event) { + if inhibited, _ := i.IsInhibited(r.Summary.Event); inhibited { r.Response <- &summaryDispatchResponse{ Disposition: SUPPRESSED, } diff --git a/manager/suppressor.go b/manager/suppressor.go index fb2685ac75..56aca9bdf7 100644 --- a/manager/suppressor.go +++ b/manager/suppressor.go @@ -14,213 +14,151 @@ package manager import ( - "container/heap" + "fmt" "log" - "sort" + "sync" "time" ) -type Suppression struct { - Id uint - - Description string - - Filters *Filters - - EndsAt time.Time +type SuppressionId uint +type Suppression struct { + // The numeric ID of the suppression. + Id SuppressionId + // Name/email of the suppression creator. CreatedBy string + // When the suppression was first created (Unix timestamp). CreatedAt time.Time + // When the suppression expires (Unix timestamp). + EndsAt time.Time + // Additional comment about the suppression. + Comment string + // Filters that determine which events are suppressed. + Filters Filters + // Timer used to trigger the deletion of the Suppression after its expiry + // time. + expiryTimer *time.Timer } -type suppressionRequest struct { - Suppression Suppression - - Response chan *suppressionResponse -} - -type suppressionResponse struct { - Err error -} - -type isInhibitedRequest struct { - Event *Event - - Response chan *isInhibitedResponse -} - -type isInhibitedResponse struct { - Err error - - Inhibited bool - InhibitingSuppression *Suppression -} - -type suppressionSummaryResponse struct { - Err error - - Suppressions Suppressions -} - -type suppressionSummaryRequest struct { - MatchCandidates map[string]string - - Response chan *suppressionSummaryResponse -} +type Suppressions []*Suppression type Suppressor struct { - Suppressions *Suppressions + // Suppressions managed by this Suppressor. + Suppressions map[SuppressionId]*Suppression + // Used to track the next Suppression Id to allocate. + nextId SuppressionId - suppressionReqs chan *suppressionRequest - suppressionSummaryReqs chan *suppressionSummaryRequest - isInhibitedReqs chan *isInhibitedRequest + // Mutex to protect the above. + mu sync.Mutex } type IsInhibitedInterrogator interface { - IsInhibited(*Event) bool + IsInhibited(*Event) (bool, *Suppression) } func NewSuppressor() *Suppressor { - suppressions := new(Suppressions) - - heap.Init(suppressions) - return &Suppressor{ - Suppressions: suppressions, - - suppressionReqs: make(chan *suppressionRequest), - suppressionSummaryReqs: make(chan *suppressionSummaryRequest), - isInhibitedReqs: make(chan *isInhibitedRequest), + Suppressions: make(map[SuppressionId]*Suppression), } } -type Suppressions []Suppression - -func (s Suppressions) Len() int { - return len(s) +func (s *Suppressor) nextSuppressionId() SuppressionId { + // BUG: Build proper ID management. For now, as we are only keeping + // data in memory anyways, this is enough. + s.nextId++ + return s.nextId } -func (s Suppressions) Less(i, j int) bool { - return s[i].EndsAt.Before(s[j].EndsAt) -} - -func (s Suppressions) Swap(i, j int) { - s[i], s[j] = s[j], s[i] +func (s *Suppressor) setupExpiryTimer(sup *Suppression) { + if sup.expiryTimer != nil { + sup.expiryTimer.Stop() + } + expDuration := sup.EndsAt.Sub(time.Now()) + sup.expiryTimer = time.AfterFunc(expDuration, func() { + if err := s.DelSuppression(sup.Id); err != nil { + log.Printf("Failed to delete suppression %d: %s", sup.Id, err) + } + }) } -func (s *Suppressions) Push(v interface{}) { - *s = append(*s, v.(Suppression)) -} +func (s *Suppressor) AddSuppression(sup *Suppression) SuppressionId { + s.mu.Lock() + defer s.mu.Unlock() -func (s *Suppressions) Pop() interface{} { - old := *s - n := len(old) - item := old[n-1] - *s = old[0 : n-1] - return item + sup.Id = s.nextSuppressionId() + s.setupExpiryTimer(sup) + s.Suppressions[sup.Id] = sup + return sup.Id } -func (s *Suppressor) dispatchSuppression(r *suppressionRequest) { - log.Println("dispatching suppression", r) +func (s *Suppressor) UpdateSuppression(sup *Suppression) error { + s.mu.Lock() + defer s.mu.Unlock() - heap.Push(s.Suppressions, r.Suppression) - r.Response <- &suppressionResponse{} - close(r.Response) + origSup, ok := s.Suppressions[sup.Id] + if !ok { + return fmt.Errorf("Suppression with ID %d doesn't exist", sup.Id) + } + if sup.EndsAt != origSup.EndsAt { + origSup.expiryTimer.Stop() + } + *origSup = *sup + s.setupExpiryTimer(origSup) + return nil } -func (s *Suppressor) reapSuppressions(t time.Time) { - log.Println("reaping suppression...") +func (s *Suppressor) GetSuppression(id SuppressionId) (*Suppression, error) { + s.mu.Lock() + defer s.mu.Unlock() - i := sort.Search(len(*s.Suppressions), func(i int) bool { - return (*s.Suppressions)[i].EndsAt.After(t) - }) - - *s.Suppressions = (*s.Suppressions)[i:] - - // BUG(matt): Validate if strictly necessary. - heap.Init(s.Suppressions) + sup, ok := s.Suppressions[id] + if !ok { + return nil, fmt.Errorf("Suppression with ID %d doesn't exist", id) + } + return sup, nil } -func (s *Suppressor) generateSummary(r *suppressionSummaryRequest) { - log.Println("Generating summary", r) - response := new(suppressionSummaryResponse) +func (s *Suppressor) DelSuppression(id SuppressionId) error { + s.mu.Lock() + defer s.mu.Unlock() - for _, suppression := range *s.Suppressions { - response.Suppressions = append(response.Suppressions, suppression) + if _, ok := s.Suppressions[id]; !ok { + return fmt.Errorf("Suppression with ID %d doesn't exist", id) } - - r.Response <- response - close(r.Response) + delete(s.Suppressions, id) + return nil } -func (s *Suppressor) IsInhibited(e *Event) bool { - req := &isInhibitedRequest{ - Event: e, - Response: make(chan *isInhibitedResponse), - } - - s.isInhibitedReqs <- req +func (s *Suppressor) SuppressionSummary() Suppressions { + s.mu.Lock() + defer s.mu.Unlock() - resp := <-req.Response - - return resp.Inhibited + suppressions := make(Suppressions, 0, len(s.Suppressions)) + for _, sup := range s.Suppressions { + suppressions = append(suppressions, sup) + } + return suppressions } -func (s *Suppressor) queryInhibit(q *isInhibitedRequest) { - response := new(isInhibitedResponse) +func (s *Suppressor) IsInhibited(e *Event) (bool, *Suppression) { + s.mu.Lock() + defer s.mu.Unlock() - for _, s := range *s.Suppressions { - if s.Filters.Handles(q.Event) { - response.Inhibited = true - response.InhibitingSuppression = &s - - break + for _, s := range s.Suppressions { + if s.Filters.Handles(e) { + return true, s } } - - q.Response <- response - close(q.Response) + return false, nil } func (s *Suppressor) Close() { - close(s.suppressionReqs) - close(s.suppressionSummaryReqs) - close(s.isInhibitedReqs) -} - -func (s *Suppressor) Dispatch() { - // BUG: Accomplish this more intelligently by creating a timer for the least- - // likely-to-tenure item. - reaper := time.NewTicker(30 * time.Second) - defer reaper.Stop() - - closed := 0 - - for closed < 2 { - select { - case suppression, open := <-s.suppressionReqs: - s.dispatchSuppression(suppression) - - if !open { - closed++ - } - - case query, open := <-s.isInhibitedReqs: - s.queryInhibit(query) - - if !open { - closed++ - } - - case summary, open := <-s.suppressionSummaryReqs: - s.generateSummary(summary) - - if !open { - closed++ - } + s.mu.Lock() + defer s.mu.Unlock() - case time := <-reaper.C: - s.reapSuppressions(time) + for _, sup := range s.Suppressions { + if sup.expiryTimer != nil { + sup.expiryTimer.Stop() } } } diff --git a/manager/suppressor_test.go b/manager/suppressor_test.go new file mode 100644 index 0000000000..045399114e --- /dev/null +++ b/manager/suppressor_test.go @@ -0,0 +1,139 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package manager + +import ( + "testing" + "time" +) + +type testSuppressorScenario struct { + suppressions Suppressions + inhibited Events + uninhibited Events +} + +func (sc *testSuppressorScenario) test(i int, t *testing.T) { + s := NewSuppressor() + + for j, sup := range sc.suppressions { + id := s.AddSuppression(sup) + retrievedSup, err := s.GetSuppression(id) + if err != nil { + t.Fatalf("%d.%d. Error getting suppression: %s", i, j, err) + } + if retrievedSup.Id != id { + t.Fatalf("%d.%d. Expected ID %d, got %d", i, j, id, retrievedSup.Id) + } + sup.Id = id + if sup != retrievedSup { + t.Fatalf("%d.%d. Expected suppression %v, got %v", i, j, sup, retrievedSup) + } + } + + for j, ev := range sc.inhibited { + inhibited, sup := s.IsInhibited(ev) + if !inhibited { + t.Fatalf("%d.%d. Expected %v to be inhibited", i, j, ev) + } + if sup == nil { + t.Fatalf("%d.%d. Expected non-nil Suppression for inhibited event %v", i, j, ev) + } + } + + for j, ev := range sc.uninhibited { + inhibited, sup := s.IsInhibited(ev) + if inhibited { + t.Fatalf("%d.%d. Expected %v to not be inhibited, was inhibited by %v", i, j, ev, sup) + } + } + + suppressions := s.SuppressionSummary() + if len(suppressions) != len(sc.suppressions) { + t.Fatalf("%d. Expected %d suppressions, got %d", i, len(sc.suppressions), len(suppressions)) + } + + for j, sup := range suppressions { + if err := s.DelSuppression(sup.Id); err != nil { + t.Fatalf("%d.%d. Got error while deleting suppression: %s", i, j, err) + } + + newSuppressions := s.SuppressionSummary() + if len(newSuppressions) != len(suppressions)-j-1 { + t.Fatalf("%d. Expected %d suppressions, got %d", i, len(suppressions), len(newSuppressions)) + } + } + + s.Close() +} + +func TestSuppressor(t *testing.T) { + scenarios := []testSuppressorScenario{ + { + // No suppressions, one event. + uninhibited: Events{ + &Event{ + Labels: map[string]string{ + "foo": "bar", + }, + }, + }, + }, + { + // One rule, two matching events, one non-matching. + suppressions: Suppressions{ + &Suppression{ + Filters: Filters{NewFilter("service", "test(-)?service")}, + EndsAt: time.Now().Add(time.Hour), + }, + &Suppression{ + Filters: Filters{NewFilter("testlabel", ".*")}, + EndsAt: time.Now().Add(time.Hour), + }, + }, + inhibited: Events{ + &Event{ + Labels: map[string]string{ + "service": "testservice", + "foo": "bar", + }, + }, + &Event{ + Labels: map[string]string{ + "service": "test-service", + "bar": "baz", + }, + }, + &Event{ + Labels: map[string]string{ + "service": "bar-service", + "testlabel": "testvalue", + }, + }, + }, + uninhibited: Events{ + &Event{ + Labels: map[string]string{ + "service": "testservice2", + "foo": "bar", + }, + }, + }, + }, + } + + for i, scenario := range scenarios { + scenario.test(i, t) + } +}