Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 73 additions & 38 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"log/slog"
"runtime"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -105,6 +106,7 @@ type Dispatcher struct {
aggrGroupsNum atomic.Int32

maintenanceInterval time.Duration
concurrency int // Number of goroutines for alert ingestion

logger *slog.Logger

Expand Down Expand Up @@ -142,13 +144,17 @@ func NewDispatcher(
limits = nilLimits{}
}

// Calculate concurrency for ingestion.
concurrency := min(max(runtime.GOMAXPROCS(0)/2, 2), 8)

disp := &Dispatcher{
alerts: alerts,
stage: stage,
route: route,
marker: marker,
timeout: timeout,
maintenanceInterval: maintenanceInterval,
concurrency: concurrency,
logger: logger.With("component", "dispatcher"),
metrics: metrics,
limits: limits,
Expand Down Expand Up @@ -191,53 +197,82 @@ func (d *Dispatcher) Run(dispatchStartTime time.Time) {
}

func (d *Dispatcher) run(it provider.AlertIterator) {
maintenance := time.NewTicker(d.maintenanceInterval)
defer maintenance.Stop()

defer it.Close()

for {
select {
case alert, ok := <-it.Next():
if !ok {
// Iterator exhausted for some reason.
if err := it.Err(); err != nil {
d.logger.Error("Error on alert update", "err", err)
}
// Start maintenance goroutine
d.finished.Add(1)
go func() {
defer d.finished.Done()
ticker := time.NewTicker(d.maintenanceInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
d.doMaintenance()
case <-d.ctx.Done():
return
}
}
}()

// Log errors but keep trying.
if err := it.Err(); err != nil {
d.logger.Error("Error on alert update", "err", err)
continue
}

ctx := d.ctx
if alert.Header != nil {
ctx = d.propagator.Extract(ctx, propagation.MapCarrier(alert.Header))
// Start timer goroutine
d.finished.Add(1)
go func() {
defer d.finished.Done()
<-d.startTimer.C

if d.state.CompareAndSwap(DispatcherStateWaitingToStart, DispatcherStateRunning) {
d.logger.Debug("started", "state", "running")
d.logger.Debug("Starting all existing aggregation groups")
for rg := range d.routeGroupsSlice {
d.routeGroupsSlice[rg].groups.Range(func(_, ag any) bool {
d.runAG(ag.(*aggrGroup))
return true
})
}

d.routeAlert(ctx, alert.Data)

case <-d.startTimer.C:
if d.state.CompareAndSwap(DispatcherStateWaitingToStart, DispatcherStateRunning) {
d.logger.Debug("started", "state", "running")
d.logger.Debug("Starting all existing aggregation groups")
for rg := range d.routeGroupsSlice {
d.routeGroupsSlice[rg].groups.Range(func(_, ag any) bool {
d.runAG(ag.(*aggrGroup))
return true
})
}
}()

// Start multiple alert ingestion goroutines
alertCh := it.Next()
for i := 0; i < d.concurrency; i++ {
d.finished.Add(1)
go func(workerID int) {
defer d.finished.Done()
d.logger.Debug("starting alert ingestion worker", "workerID", workerID)

for {
select {
case alert, ok := <-alertCh:
if !ok {
// Iterator exhausted for some reason.
if err := it.Err(); err != nil {
d.logger.Error("Error on alert update", "err", err, "workerID", workerID)
}
return
}

// Log errors but keep trying.
if err := it.Err(); err != nil {
d.logger.Error("Error on alert update", "err", err, "workerID", workerID)
continue
}

ctx := d.ctx
if alert.Header != nil {
ctx = d.propagator.Extract(ctx, propagation.MapCarrier(alert.Header))
}

d.routeAlert(ctx, alert.Data)

case <-d.ctx.Done():
return
}
}

case <-maintenance.C:
d.doMaintenance()
case <-d.ctx.Done():
return
}
}(i)
}
<-d.ctx.Done()
}

func (d *Dispatcher) routeAlert(ctx context.Context, alert *types.Alert) {
Expand Down
Loading