Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Suppressor from channel-based to mutex-based, add tests. #4

Merged
merged 1 commit into from
Jul 22, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 0 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ import (
func main() {
flag.Parse()

log.Print("Starting event suppressor...")
suppressor := manager.NewSuppressor()
defer suppressor.Close()
go suppressor.Dispatch()
log.Println("Done.")

log.Println("Starting event aggregator...")
aggregator := manager.NewAggregator()
Expand Down
2 changes: 1 addition & 1 deletion manager/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (d *SummaryDispatcher) Receive(s *EventSummary) RemoteError {
}

func (d *SummaryDispatcher) dispatchSummary(r *summaryDispatchRequest, i IsInhibitedInterrogator) {
if i.IsInhibited(r.Summary.Event) {
if inhibited, _ := i.IsInhibited(r.Summary.Event); inhibited {
r.Response <- &summaryDispatchResponse{
Disposition: SUPPRESSED,
}
Expand Down
254 changes: 96 additions & 158 deletions manager/suppressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,213 +14,151 @@
package manager

import (
"container/heap"
"fmt"
"log"
"sort"
"sync"
"time"
)

type Suppression struct {
Id uint

Description string

Filters *Filters

EndsAt time.Time
type SuppressionId uint

type Suppression struct {
// The numeric ID of the suppression.
Id SuppressionId
// Name/email of the suppression creator.
CreatedBy string
// When the suppression was first created (Unix timestamp).
CreatedAt time.Time
// When the suppression expires (Unix timestamp).
EndsAt time.Time
// Additional comment about the suppression.
Comment string
// Filters that determine which events are suppressed.
Filters Filters
// Timer used to trigger the deletion of the Suppression after its expiry
// time.
expiryTimer *time.Timer
}

type suppressionRequest struct {
Suppression Suppression

Response chan *suppressionResponse
}

type suppressionResponse struct {
Err error
}

type isInhibitedRequest struct {
Event *Event

Response chan *isInhibitedResponse
}

type isInhibitedResponse struct {
Err error

Inhibited bool
InhibitingSuppression *Suppression
}

type suppressionSummaryResponse struct {
Err error

Suppressions Suppressions
}

type suppressionSummaryRequest struct {
MatchCandidates map[string]string

Response chan *suppressionSummaryResponse
}
type Suppressions []*Suppression

type Suppressor struct {
Suppressions *Suppressions
// Suppressions managed by this Suppressor.
Suppressions map[SuppressionId]*Suppression
// Used to track the next Suppression Id to allocate.
nextId SuppressionId

suppressionReqs chan *suppressionRequest
suppressionSummaryReqs chan *suppressionSummaryRequest
isInhibitedReqs chan *isInhibitedRequest
// Mutex to protect the above.
mu sync.Mutex
}

type IsInhibitedInterrogator interface {
IsInhibited(*Event) bool
IsInhibited(*Event) (bool, *Suppression)
}

func NewSuppressor() *Suppressor {
suppressions := new(Suppressions)

heap.Init(suppressions)

return &Suppressor{
Suppressions: suppressions,

suppressionReqs: make(chan *suppressionRequest),
suppressionSummaryReqs: make(chan *suppressionSummaryRequest),
isInhibitedReqs: make(chan *isInhibitedRequest),
Suppressions: make(map[SuppressionId]*Suppression),
}
}

type Suppressions []Suppression

func (s Suppressions) Len() int {
return len(s)
func (s *Suppressor) nextSuppressionId() SuppressionId {
// BUG: Build proper ID management. For now, as we are only keeping
// data in memory anyways, this is enough.
s.nextId++
return s.nextId
}

func (s Suppressions) Less(i, j int) bool {
return s[i].EndsAt.Before(s[j].EndsAt)
}

func (s Suppressions) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
func (s *Suppressor) setupExpiryTimer(sup *Suppression) {
if sup.expiryTimer != nil {
sup.expiryTimer.Stop()
}
expDuration := sup.EndsAt.Sub(time.Now())
sup.expiryTimer = time.AfterFunc(expDuration, func() {
if err := s.DelSuppression(sup.Id); err != nil {
log.Printf("Failed to delete suppression %d: %s", sup.Id, err)
}
})
}

func (s *Suppressions) Push(v interface{}) {
*s = append(*s, v.(Suppression))
}
func (s *Suppressor) AddSuppression(sup *Suppression) SuppressionId {
s.mu.Lock()
defer s.mu.Unlock()

func (s *Suppressions) Pop() interface{} {
old := *s
n := len(old)
item := old[n-1]
*s = old[0 : n-1]
return item
sup.Id = s.nextSuppressionId()
s.setupExpiryTimer(sup)
s.Suppressions[sup.Id] = sup
return sup.Id
}

func (s *Suppressor) dispatchSuppression(r *suppressionRequest) {
log.Println("dispatching suppression", r)
func (s *Suppressor) UpdateSuppression(sup *Suppression) error {
s.mu.Lock()
defer s.mu.Unlock()

heap.Push(s.Suppressions, r.Suppression)
r.Response <- &suppressionResponse{}
close(r.Response)
origSup, ok := s.Suppressions[sup.Id]
if !ok {
return fmt.Errorf("Suppression with ID %d doesn't exist", sup.Id)
}
if sup.EndsAt != origSup.EndsAt {
origSup.expiryTimer.Stop()
}
*origSup = *sup
s.setupExpiryTimer(origSup)
return nil
}

func (s *Suppressor) reapSuppressions(t time.Time) {
log.Println("reaping suppression...")
func (s *Suppressor) GetSuppression(id SuppressionId) (*Suppression, error) {
s.mu.Lock()
defer s.mu.Unlock()

i := sort.Search(len(*s.Suppressions), func(i int) bool {
return (*s.Suppressions)[i].EndsAt.After(t)
})

*s.Suppressions = (*s.Suppressions)[i:]

// BUG(matt): Validate if strictly necessary.
heap.Init(s.Suppressions)
sup, ok := s.Suppressions[id]
if !ok {
return nil, fmt.Errorf("Suppression with ID %d doesn't exist", id)
}
return sup, nil
}

func (s *Suppressor) generateSummary(r *suppressionSummaryRequest) {
log.Println("Generating summary", r)
response := new(suppressionSummaryResponse)
func (s *Suppressor) DelSuppression(id SuppressionId) error {
s.mu.Lock()
defer s.mu.Unlock()

for _, suppression := range *s.Suppressions {
response.Suppressions = append(response.Suppressions, suppression)
if _, ok := s.Suppressions[id]; !ok {
return fmt.Errorf("Suppression with ID %d doesn't exist", id)
}

r.Response <- response
close(r.Response)
delete(s.Suppressions, id)
return nil
}

func (s *Suppressor) IsInhibited(e *Event) bool {
req := &isInhibitedRequest{
Event: e,
Response: make(chan *isInhibitedResponse),
}

s.isInhibitedReqs <- req
func (s *Suppressor) SuppressionSummary() Suppressions {
s.mu.Lock()
defer s.mu.Unlock()

resp := <-req.Response

return resp.Inhibited
suppressions := make(Suppressions, 0, len(s.Suppressions))
for _, sup := range s.Suppressions {
suppressions = append(suppressions, sup)
}
return suppressions
}

func (s *Suppressor) queryInhibit(q *isInhibitedRequest) {
response := new(isInhibitedResponse)
func (s *Suppressor) IsInhibited(e *Event) (bool, *Suppression) {
s.mu.Lock()
defer s.mu.Unlock()

for _, s := range *s.Suppressions {
if s.Filters.Handles(q.Event) {
response.Inhibited = true
response.InhibitingSuppression = &s

break
for _, s := range s.Suppressions {
if s.Filters.Handles(e) {
return true, s
}
}

q.Response <- response
close(q.Response)
return false, nil
}

func (s *Suppressor) Close() {
close(s.suppressionReqs)
close(s.suppressionSummaryReqs)
close(s.isInhibitedReqs)
}

func (s *Suppressor) Dispatch() {
// BUG: Accomplish this more intelligently by creating a timer for the least-
// likely-to-tenure item.
reaper := time.NewTicker(30 * time.Second)
defer reaper.Stop()

closed := 0

for closed < 2 {
select {
case suppression, open := <-s.suppressionReqs:
s.dispatchSuppression(suppression)

if !open {
closed++
}

case query, open := <-s.isInhibitedReqs:
s.queryInhibit(query)

if !open {
closed++
}

case summary, open := <-s.suppressionSummaryReqs:
s.generateSummary(summary)

if !open {
closed++
}
s.mu.Lock()
defer s.mu.Unlock()

case time := <-reaper.C:
s.reapSuppressions(time)
for _, sup := range s.Suppressions {
if sup.expiryTimer != nil {
sup.expiryTimer.Stop()
}
}
}