Skip to content

Commit

Permalink
Merge pull request #4 from prometheus/refactor/mutex-based
Browse files Browse the repository at this point in the history
Change Suppressor from channel-based to mutex-based, add tests.
  • Loading branch information
juliusv committed Jul 22, 2013
2 parents 19e1ad7 + b49b7bb commit 00efa4a
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 162 deletions.
3 changes: 0 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ import (
func main() {
flag.Parse()

log.Print("Starting event suppressor...")
suppressor := manager.NewSuppressor()
defer suppressor.Close()
go suppressor.Dispatch()
log.Println("Done.")

log.Println("Starting event aggregator...")
aggregator := manager.NewAggregator()
Expand Down
2 changes: 1 addition & 1 deletion manager/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (d *SummaryDispatcher) Receive(s *EventSummary) RemoteError {
}

func (d *SummaryDispatcher) dispatchSummary(r *summaryDispatchRequest, i IsInhibitedInterrogator) {
if i.IsInhibited(r.Summary.Event) {
if inhibited, _ := i.IsInhibited(r.Summary.Event); inhibited {
r.Response <- &summaryDispatchResponse{
Disposition: SUPPRESSED,
}
Expand Down
254 changes: 96 additions & 158 deletions manager/suppressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,213 +14,151 @@
package manager

import (
"container/heap"
"fmt"
"log"
"sort"
"sync"
"time"
)

type Suppression struct {
Id uint

Description string

Filters *Filters

EndsAt time.Time
type SuppressionId uint

type Suppression struct {
// The numeric ID of the suppression.
Id SuppressionId
// Name/email of the suppression creator.
CreatedBy string
// When the suppression was first created (Unix timestamp).
CreatedAt time.Time
// When the suppression expires (Unix timestamp).
EndsAt time.Time
// Additional comment about the suppression.
Comment string
// Filters that determine which events are suppressed.
Filters Filters
// Timer used to trigger the deletion of the Suppression after its expiry
// time.
expiryTimer *time.Timer
}

type suppressionRequest struct {
Suppression Suppression

Response chan *suppressionResponse
}

type suppressionResponse struct {
Err error
}

type isInhibitedRequest struct {
Event *Event

Response chan *isInhibitedResponse
}

type isInhibitedResponse struct {
Err error

Inhibited bool
InhibitingSuppression *Suppression
}

type suppressionSummaryResponse struct {
Err error

Suppressions Suppressions
}

type suppressionSummaryRequest struct {
MatchCandidates map[string]string

Response chan *suppressionSummaryResponse
}
type Suppressions []*Suppression

type Suppressor struct {
Suppressions *Suppressions
// Suppressions managed by this Suppressor.
Suppressions map[SuppressionId]*Suppression
// Used to track the next Suppression Id to allocate.
nextId SuppressionId

suppressionReqs chan *suppressionRequest
suppressionSummaryReqs chan *suppressionSummaryRequest
isInhibitedReqs chan *isInhibitedRequest
// Mutex to protect the above.
mu sync.Mutex
}

type IsInhibitedInterrogator interface {
IsInhibited(*Event) bool
IsInhibited(*Event) (bool, *Suppression)
}

func NewSuppressor() *Suppressor {
suppressions := new(Suppressions)

heap.Init(suppressions)

return &Suppressor{
Suppressions: suppressions,

suppressionReqs: make(chan *suppressionRequest),
suppressionSummaryReqs: make(chan *suppressionSummaryRequest),
isInhibitedReqs: make(chan *isInhibitedRequest),
Suppressions: make(map[SuppressionId]*Suppression),
}
}

type Suppressions []Suppression

func (s Suppressions) Len() int {
return len(s)
func (s *Suppressor) nextSuppressionId() SuppressionId {
// BUG: Build proper ID management. For now, as we are only keeping
// data in memory anyways, this is enough.
s.nextId++
return s.nextId
}

func (s Suppressions) Less(i, j int) bool {
return s[i].EndsAt.Before(s[j].EndsAt)
}

func (s Suppressions) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
func (s *Suppressor) setupExpiryTimer(sup *Suppression) {
if sup.expiryTimer != nil {
sup.expiryTimer.Stop()
}
expDuration := sup.EndsAt.Sub(time.Now())
sup.expiryTimer = time.AfterFunc(expDuration, func() {
if err := s.DelSuppression(sup.Id); err != nil {
log.Printf("Failed to delete suppression %d: %s", sup.Id, err)
}
})
}

func (s *Suppressions) Push(v interface{}) {
*s = append(*s, v.(Suppression))
}
func (s *Suppressor) AddSuppression(sup *Suppression) SuppressionId {
s.mu.Lock()
defer s.mu.Unlock()

func (s *Suppressions) Pop() interface{} {
old := *s
n := len(old)
item := old[n-1]
*s = old[0 : n-1]
return item
sup.Id = s.nextSuppressionId()
s.setupExpiryTimer(sup)
s.Suppressions[sup.Id] = sup
return sup.Id
}

func (s *Suppressor) dispatchSuppression(r *suppressionRequest) {
log.Println("dispatching suppression", r)
func (s *Suppressor) UpdateSuppression(sup *Suppression) error {
s.mu.Lock()
defer s.mu.Unlock()

heap.Push(s.Suppressions, r.Suppression)
r.Response <- &suppressionResponse{}
close(r.Response)
origSup, ok := s.Suppressions[sup.Id]
if !ok {
return fmt.Errorf("Suppression with ID %d doesn't exist", sup.Id)
}
if sup.EndsAt != origSup.EndsAt {
origSup.expiryTimer.Stop()
}
*origSup = *sup
s.setupExpiryTimer(origSup)
return nil
}

func (s *Suppressor) reapSuppressions(t time.Time) {
log.Println("reaping suppression...")
func (s *Suppressor) GetSuppression(id SuppressionId) (*Suppression, error) {
s.mu.Lock()
defer s.mu.Unlock()

i := sort.Search(len(*s.Suppressions), func(i int) bool {
return (*s.Suppressions)[i].EndsAt.After(t)
})

*s.Suppressions = (*s.Suppressions)[i:]

// BUG(matt): Validate if strictly necessary.
heap.Init(s.Suppressions)
sup, ok := s.Suppressions[id]
if !ok {
return nil, fmt.Errorf("Suppression with ID %d doesn't exist", id)
}
return sup, nil
}

func (s *Suppressor) generateSummary(r *suppressionSummaryRequest) {
log.Println("Generating summary", r)
response := new(suppressionSummaryResponse)
func (s *Suppressor) DelSuppression(id SuppressionId) error {
s.mu.Lock()
defer s.mu.Unlock()

for _, suppression := range *s.Suppressions {
response.Suppressions = append(response.Suppressions, suppression)
if _, ok := s.Suppressions[id]; !ok {
return fmt.Errorf("Suppression with ID %d doesn't exist", id)
}

r.Response <- response
close(r.Response)
delete(s.Suppressions, id)
return nil
}

func (s *Suppressor) IsInhibited(e *Event) bool {
req := &isInhibitedRequest{
Event: e,
Response: make(chan *isInhibitedResponse),
}

s.isInhibitedReqs <- req
func (s *Suppressor) SuppressionSummary() Suppressions {
s.mu.Lock()
defer s.mu.Unlock()

resp := <-req.Response

return resp.Inhibited
suppressions := make(Suppressions, 0, len(s.Suppressions))
for _, sup := range s.Suppressions {
suppressions = append(suppressions, sup)
}
return suppressions
}

func (s *Suppressor) queryInhibit(q *isInhibitedRequest) {
response := new(isInhibitedResponse)
func (s *Suppressor) IsInhibited(e *Event) (bool, *Suppression) {
s.mu.Lock()
defer s.mu.Unlock()

for _, s := range *s.Suppressions {
if s.Filters.Handles(q.Event) {
response.Inhibited = true
response.InhibitingSuppression = &s

break
for _, s := range s.Suppressions {
if s.Filters.Handles(e) {
return true, s
}
}

q.Response <- response
close(q.Response)
return false, nil
}

func (s *Suppressor) Close() {
close(s.suppressionReqs)
close(s.suppressionSummaryReqs)
close(s.isInhibitedReqs)
}

func (s *Suppressor) Dispatch() {
// BUG: Accomplish this more intelligently by creating a timer for the least-
// likely-to-tenure item.
reaper := time.NewTicker(30 * time.Second)
defer reaper.Stop()

closed := 0

for closed < 2 {
select {
case suppression, open := <-s.suppressionReqs:
s.dispatchSuppression(suppression)

if !open {
closed++
}

case query, open := <-s.isInhibitedReqs:
s.queryInhibit(query)

if !open {
closed++
}

case summary, open := <-s.suppressionSummaryReqs:
s.generateSummary(summary)

if !open {
closed++
}
s.mu.Lock()
defer s.mu.Unlock()

case time := <-reaper.C:
s.reapSuppressions(time)
for _, sup := range s.Suppressions {
if sup.expiryTimer != nil {
sup.expiryTimer.Stop()
}
}
}

0 comments on commit 00efa4a

Please sign in to comment.