Skip to content

Commit

Permalink
feat(topics): make topic queue length configurable (#2661)
Browse files Browse the repository at this point in the history
  • Loading branch information
docmerlin committed Dec 9, 2021
1 parent a353257 commit af204c7
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 31 deletions.
59 changes: 34 additions & 25 deletions alert/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,25 @@ import (
)

const (
// eventBufferSize is the number of events to buffer to each handler per topic.
eventBufferSize = 5000
// DefaultEventBufferSize is the default number of events to buffer to each handler per topic.
DefaultEventBufferSize = 5000
MinimumEventBufferSize = 1000
)

type Topics struct {
mu sync.RWMutex

topics map[string]*Topic
mu sync.RWMutex
eventBufferSize int
topics map[string]*Topic
}

func NewTopics() *Topics {
// NewTopics creates a new Topics struct with a minimum bufferSize of 500.
func NewTopics(bufferSize int) *Topics {
if bufferSize < MinimumEventBufferSize {
bufferSize = DefaultEventBufferSize
}
s := &Topics{
topics: make(map[string]*Topic),
eventBufferSize: bufferSize,
topics: make(map[string]*Topic),
}
return s
}
Expand Down Expand Up @@ -54,7 +60,7 @@ func (s *Topics) RestoreTopic(id string, eventStates map[string]EventState) {
defer s.mu.Unlock()
t, ok := s.topics[id]
if !ok {
t = newTopic(id)
t = s.newTopic(id)
s.topics[id] = t
}
t.restoreEventStates(eventStates)
Expand All @@ -65,7 +71,7 @@ func (s *Topics) UpdateEvent(id string, event EventState) {
defer s.mu.Unlock()
t, ok := s.topics[id]
if !ok {
t = newTopic(id)
t = s.newTopic(id)
s.topics[id] = t
}
t.updateEvent(event)
Expand Down Expand Up @@ -93,7 +99,7 @@ func (s *Topics) Collect(event Event) error {
// Check again if the topic was created, now that we have the write lock
topic = s.topics[event.Topic]
if topic == nil {
topic = newTopic(event.Topic)
topic = s.newTopic(event.Topic)
s.topics[event.Topic] = topic
}
s.mu.Unlock()
Expand Down Expand Up @@ -122,7 +128,7 @@ func (s *Topics) RegisterHandler(topic string, h Handler) {

t, ok := s.topics[topic]
if !ok {
t = newTopic(topic)
t = s.newTopic(topic)
s.topics[topic] = t
}
t.addHandler(h)
Expand All @@ -147,7 +153,7 @@ func (s *Topics) ReplaceHandler(topic string, oldH, newH Handler) {

t, ok := s.topics[topic]
if !ok {
t = newTopic(topic)
t = s.newTopic(topic)
s.topics[topic] = t
}

Expand Down Expand Up @@ -185,24 +191,24 @@ func PatternMatch(pattern, id string) bool {
}

type Topic struct {
id string

mu sync.RWMutex

events map[string]*EventState
sorted []*EventState
id string
mu sync.RWMutex
bufferLength int
events map[string]*EventState
sorted []*EventState

collected *expvar.Int
statsKey string

handlers []*bufHandler
}

func newTopic(id string) *Topic {
func (s *Topics) newTopic(id string) *Topic {
t := &Topic{
id: id,
events: make(map[string]*EventState),
collected: new(expvar.Int),
id: id,
events: make(map[string]*EventState),
collected: new(expvar.Int),
bufferLength: s.eventBufferSize,
}
statsKey, statsMap := vars.NewStatistic("topics", map[string]string{
"id": id,
Expand Down Expand Up @@ -240,7 +246,7 @@ func (t *Topic) addHandler(h Handler) {
return
}
}
hdlr := newHandler(h)
hdlr := newHandler(h, t.bufferLength)
t.handlers = append(t.handlers, hdlr)
}

Expand Down Expand Up @@ -385,10 +391,13 @@ type bufHandler struct {
wg sync.WaitGroup
}

func newHandler(h Handler) *bufHandler {
func newHandler(h Handler, bufferSize int) *bufHandler {
if bufferSize < MinimumEventBufferSize {
bufferSize = DefaultEventBufferSize
}
hdlr := &bufHandler{
h: h,
events: make(chan Event, eventBufferSize),
events: make(chan Event, bufferSize),
aborting: make(chan struct{}),
}
hdlr.wg.Add(1)
Expand Down
2 changes: 1 addition & 1 deletion integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13865,7 +13865,7 @@ func createTaskMaster(name string) (*kapacitor.TaskMaster, error) {
tm.TaskStore = taskStore{}
tm.DeadmanService = deadman{}
tm.HTTPPostService, _ = httppost.NewService(nil, diagService.NewHTTPPostHandler())
as := alertservice.NewService(diagService.NewAlertServiceHandler(), nil)
as := alertservice.NewService(diagService.NewAlertServiceHandler(), nil, 0)
as.StorageService = storagetest.New()
as.HTTPDService = httpdService
if err := as.Open(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func (s *Server) appendConfigOverrideService() {

func (s *Server) initAlertService() {
d := s.DiagService.NewAlertServiceHandler()
srv := alert.NewService(d, s.DisabledHandlers)
srv := alert.NewService(d, s.DisabledHandlers, s.config.Alert.TopicBufferLength)

srv.Commander = s.Commander
srv.HTTPDService = s.HTTPDService
Expand Down
7 changes: 5 additions & 2 deletions services/alert/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/influxdata/influxdb/toml"
"github.com/influxdata/kapacitor/alert"
)

const (
Expand All @@ -12,12 +13,14 @@ const (

type Config struct {
// Whether we persist the alert topics to BoltDB or not
PersistTopics bool `toml:"persist-topics"`
PersistTopics bool `toml:"persist-topics"`
TopicBufferLength int `toml:"topic-buffer-length"`
}

func NewConfig() Config {
return Config{
PersistTopics: true,
PersistTopics: true,
TopicBufferLength: alert.DefaultEventBufferSize,
}
}

Expand Down
4 changes: 2 additions & 2 deletions services/alert/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,12 @@ type Service struct {
}
}

func NewService(d Diagnostic, disabled map[string]struct{}) *Service {
func NewService(d Diagnostic, disabled map[string]struct{}, topicBufLen int) *Service {
s := &Service{
disabled: disabled,
handlers: make(map[string]map[string]handler),
closedTopics: make(map[string]bool),
topics: alert.NewTopics(),
topics: alert.NewTopics(topicBufLen),
diag: d,
inhibitorLookup: alert.NewInhibitorLookup(),
}
Expand Down

0 comments on commit af204c7

Please sign in to comment.