From ec8137aa669b0f0d83bc09d064604b42d2fde9dd Mon Sep 17 00:00:00 2001 From: Guido Trotter Date: Thu, 5 Feb 2026 12:35:46 -0500 Subject: [PATCH 1/2] [dispatcher] increase concurrency of alert ingestion We create multiple goroutines on Run(), one for maintenance, one for start timer, and N for ingestion. Signed-off-by: Guido Trotter --- dispatch/dispatch.go | 111 ++++++++++++++++++++++++++++--------------- 1 file changed, 73 insertions(+), 38 deletions(-) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 131e62ac1b..ac259596fa 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "log/slog" + "runtime" "sort" "sync" "sync/atomic" @@ -105,6 +106,7 @@ type Dispatcher struct { aggrGroupsNum atomic.Int32 maintenanceInterval time.Duration + concurrency int // Number of goroutines for alert ingestion logger *slog.Logger @@ -142,6 +144,9 @@ func NewDispatcher( limits = nilLimits{} } + // Calculate concurrency: GOMAXPROC/2, minimum 4, maximum 64 + concurrency := min(max(runtime.GOMAXPROCS(0)/2, 4), 64) + disp := &Dispatcher{ alerts: alerts, stage: stage, @@ -149,6 +154,7 @@ func NewDispatcher( marker: marker, timeout: timeout, maintenanceInterval: maintenanceInterval, + concurrency: concurrency, logger: logger.With("component", "dispatcher"), metrics: metrics, limits: limits, @@ -191,53 +197,82 @@ func (d *Dispatcher) Run(dispatchStartTime time.Time) { } func (d *Dispatcher) run(it provider.AlertIterator) { - maintenance := time.NewTicker(d.maintenanceInterval) - defer maintenance.Stop() - defer it.Close() - for { - select { - case alert, ok := <-it.Next(): - if !ok { - // Iterator exhausted for some reason. - if err := it.Err(); err != nil { - d.logger.Error("Error on alert update", "err", err) - } + // Start maintenance goroutine + d.finished.Add(1) + go func() { + defer d.finished.Done() + ticker := time.NewTicker(d.maintenanceInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + d.doMaintenance() + case <-d.ctx.Done(): return } + } + }() - // Log errors but keep trying. - if err := it.Err(); err != nil { - d.logger.Error("Error on alert update", "err", err) - continue - } - - ctx := d.ctx - if alert.Header != nil { - ctx = d.propagator.Extract(ctx, propagation.MapCarrier(alert.Header)) + // Start timer goroutine + d.finished.Add(1) + go func() { + defer d.finished.Done() + <-d.startTimer.C + + if d.state.CompareAndSwap(DispatcherStateWaitingToStart, DispatcherStateRunning) { + d.logger.Debug("started", "state", "running") + d.logger.Debug("Starting all existing aggregation groups") + for rg := range d.routeGroupsSlice { + d.routeGroupsSlice[rg].groups.Range(func(_, ag any) bool { + d.runAG(ag.(*aggrGroup)) + return true + }) } - - d.routeAlert(ctx, alert.Data) - - case <-d.startTimer.C: - if d.state.CompareAndSwap(DispatcherStateWaitingToStart, DispatcherStateRunning) { - d.logger.Debug("started", "state", "running") - d.logger.Debug("Starting all existing aggregation groups") - for rg := range d.routeGroupsSlice { - d.routeGroupsSlice[rg].groups.Range(func(_, ag any) bool { - d.runAG(ag.(*aggrGroup)) - return true - }) + } + }() + + // Start multiple alert ingestion goroutines + alertCh := it.Next() + for i := 0; i < d.concurrency; i++ { + d.finished.Add(1) + go func(workerID int) { + defer d.finished.Done() + d.logger.Debug("starting alert ingestion worker", "workerID", workerID) + + for { + select { + case alert, ok := <-alertCh: + if !ok { + // Iterator exhausted for some reason. + if err := it.Err(); err != nil { + d.logger.Error("Error on alert update", "err", err, "workerID", workerID) + } + return + } + + // Log errors but keep trying. + if err := it.Err(); err != nil { + d.logger.Error("Error on alert update", "err", err, "workerID", workerID) + continue + } + + ctx := d.ctx + if alert.Header != nil { + ctx = d.propagator.Extract(ctx, propagation.MapCarrier(alert.Header)) + } + + d.routeAlert(ctx, alert.Data) + + case <-d.ctx.Done(): + return } } - - case <-maintenance.C: - d.doMaintenance() - case <-d.ctx.Done(): - return - } + }(i) } + <-d.ctx.Done() } func (d *Dispatcher) routeAlert(ctx context.Context, alert *types.Alert) { From 3e66608bdd0012cb5860c66aa869e5fbe45174fe Mon Sep 17 00:00:00 2001 From: Guido Trotter Date: Mon, 9 Feb 2026 04:34:10 -0500 Subject: [PATCH 2/2] decrease concurrency to avoid some contention on channels and ag locks Signed-off-by: Guido Trotter --- dispatch/dispatch.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index ac259596fa..2078cdfba1 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -144,8 +144,8 @@ func NewDispatcher( limits = nilLimits{} } - // Calculate concurrency: GOMAXPROC/2, minimum 4, maximum 64 - concurrency := min(max(runtime.GOMAXPROCS(0)/2, 4), 64) + // Calculate concurrency for ingestion. + concurrency := min(max(runtime.GOMAXPROCS(0)/2, 2), 8) disp := &Dispatcher{ alerts: alerts,