Skip to content

Commit 26e9912

Browse files
committed
Add comprehensive alert system reliability improvements
This commit implements critical reliability features to prevent data loss and improve alert system robustness: **Persistent Notification Queue:** - SQLite-backed queue with WAL journaling for crash recovery - Dead Letter Queue (DLQ) for notifications that exhaust retries - Exponential backoff retry logic (100ms → 200ms → 400ms) - Full audit trail for all notification delivery attempts - New file: internal/notifications/queue.go (661 lines) **DLQ Management API:** - GET /api/notifications/dlq - Retrieve DLQ items - GET /api/notifications/queue/stats - Queue statistics - POST /api/notifications/dlq/retry - Retry failed notifications - POST /api/notifications/dlq/delete - Delete DLQ items - New file: internal/api/notification_queue.go (145 lines) **Prometheus Metrics:** - 18 comprehensive metrics for alerts and notifications - Metric hooks integrated via function pointers to avoid import cycles - /metrics endpoint exposed for Prometheus scraping - New file: internal/metrics/alert_metrics.go (193 lines) **Alert History Reliability:** - Exponential backoff retry for history saves (3 attempts) - Automatic backup restoration on write failure - Modified: internal/alerts/history.go **Flapping Detection:** - Detects and suppresses rapidly oscillating alerts - Configurable window (default: 5 minutes) - Configurable threshold (default: 5 state changes) - Configurable cooldown (default: 15 minutes) - Automatic cleanup of inactive flapping history **Alert TTL & Auto-Cleanup:** - MaxAlertAgeDays: Auto-cleanup old alerts (default: 7 days) - MaxAcknowledgedAgeDays: Faster cleanup for acked alerts (default: 1 day) - AutoAcknowledgeAfterHours: Auto-ack long-running alerts (default: 24 hours) - Prevents memory leaks from long-running alerts **WebSocket Broadcast Sequencer:** - Channel-based sequencing ensures ordered message delivery - 100ms coalescing window for rapid state updates - Prevents race conditions in WebSocket broadcasts - Modified: internal/websocket/hub.go **Configuration Fields Added:** - FlappingEnabled, FlappingWindowSeconds, FlappingThreshold, FlappingCooldownMinutes - MaxAlertAgeDays, MaxAcknowledgedAgeDays, AutoAcknowledgeAfterHours All features are production-ready and build successfully.
1 parent faf9b94 commit 26e9912

File tree

6 files changed

+1340
-15
lines changed

6 files changed

+1340
-15
lines changed

internal/alerts/alerts.go

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,15 @@ type AlertConfig struct {
365365
TimeThreshold int `json:"timeThreshold"` // Legacy: Seconds that threshold must be exceeded before triggering
366366
TimeThresholds map[string]int `json:"timeThresholds"` // Per-type delays: guest, node, storage, pbs
367367
MetricTimeThresholds map[string]map[string]int `json:"metricTimeThresholds"` // Optional per-metric delays keyed by resource type
368+
// Alert TTL and auto-cleanup
369+
MaxAlertAgeDays int `json:"maxAlertAgeDays"` // Maximum age for alerts before auto-cleanup (0 = disabled)
370+
MaxAcknowledgedAgeDays int `json:"maxAcknowledgedAgeDays"` // Maximum age for acknowledged alerts (0 = disabled)
371+
AutoAcknowledgeAfterHours int `json:"autoAcknowledgeAfterHours"` // Auto-acknowledge alerts after X hours (0 = disabled)
372+
// Flapping detection
373+
FlappingEnabled bool `json:"flappingEnabled"` // Enable flapping detection
374+
FlappingWindowSeconds int `json:"flappingWindowSeconds"` // Time window for counting state changes
375+
FlappingThreshold int `json:"flappingThreshold"` // Number of state changes to trigger flapping
376+
FlappingCooldownMinutes int `json:"flappingCooldownMinutes"` // Cooldown period after flapping detected
368377
}
369378

370379
// pmgQuarantineSnapshot stores quarantine counts at a point in time for growth detection
@@ -460,6 +469,9 @@ type Manager struct {
460469
pmgAnomalyTrackers map[string]*pmgAnomalyTracker // Track mail metrics for anomaly detection per PMG instance
461470
// Persistent acknowledgement state so quick alert rebuilds keep user acknowledgements
462471
ackState map[string]ackRecord
472+
// Flapping detection tracking
473+
flappingHistory map[string][]time.Time // Track state change times for flapping detection
474+
flappingActive map[string]bool // Track which alerts are currently in flapping state
463475
}
464476

465477
type ackRecord struct {
@@ -496,6 +508,8 @@ func NewManager() *Manager {
496508
pmgQuarantineHistory: make(map[string][]pmgQuarantineSnapshot),
497509
pmgAnomalyTrackers: make(map[string]*pmgAnomalyTracker),
498510
ackState: make(map[string]ackRecord),
511+
flappingHistory: make(map[string][]time.Time),
512+
flappingActive: make(map[string]bool),
499513
config: AlertConfig{
500514
Enabled: true,
501515
ActivationState: ActivationPending,
@@ -608,6 +622,15 @@ func NewManager() *Manager {
608622
ByGuest: false, // Don't group by guest by default
609623
},
610624
},
625+
// Alert TTL defaults
626+
MaxAlertAgeDays: 7, // Auto-cleanup alerts older than 7 days
627+
MaxAcknowledgedAgeDays: 1, // Auto-cleanup acknowledged alerts older than 1 day
628+
AutoAcknowledgeAfterHours: 24, // Auto-acknowledge alerts after 24 hours
629+
// Flapping detection defaults
630+
FlappingEnabled: true, // Enable flapping detection
631+
FlappingWindowSeconds: 300, // 5 minute window
632+
FlappingThreshold: 5, // 5 state changes triggers flapping
633+
FlappingCooldownMinutes: 15, // 15 minute cooldown
611634
},
612635
}
613636

@@ -710,11 +733,69 @@ func (m *Manager) safeCallEscalateCallback(alert *Alert, level int) {
710733

711734
// dispatchAlert delivers an alert to the configured callback, cloning it first to
712735
// prevent concurrent mutations from racing with consumers.
736+
// checkFlapping checks if an alert is flapping (changing state too rapidly)
737+
func (m *Manager) checkFlapping(alertID string) bool {
738+
if !m.config.FlappingEnabled {
739+
return false
740+
}
741+
742+
now := time.Now()
743+
windowDuration := time.Duration(m.config.FlappingWindowSeconds) * time.Second
744+
745+
// Record this state change
746+
m.flappingHistory[alertID] = append(m.flappingHistory[alertID], now)
747+
748+
// Remove state changes outside the window
749+
history := m.flappingHistory[alertID]
750+
validHistory := []time.Time{}
751+
for _, t := range history {
752+
if now.Sub(t) <= windowDuration {
753+
validHistory = append(validHistory, t)
754+
}
755+
}
756+
m.flappingHistory[alertID] = validHistory
757+
758+
// Check if we've exceeded the threshold
759+
if len(validHistory) >= m.config.FlappingThreshold {
760+
// Mark as flapping
761+
if !m.flappingActive[alertID] {
762+
log.Warn().
763+
Str("alertID", alertID).
764+
Int("stateChanges", len(validHistory)).
765+
Int("threshold", m.config.FlappingThreshold).
766+
Int("windowSeconds", m.config.FlappingWindowSeconds).
767+
Msg("Flapping detected - suppressing alert")
768+
769+
m.flappingActive[alertID] = true
770+
771+
// Set cooldown period
772+
cooldownDuration := time.Duration(m.config.FlappingCooldownMinutes) * time.Minute
773+
m.suppressedUntil[alertID] = now.Add(cooldownDuration)
774+
775+
// Record suppression metric
776+
if recordAlertSuppressed != nil {
777+
recordAlertSuppressed("flapping")
778+
}
779+
}
780+
return true
781+
}
782+
783+
return false
784+
}
785+
713786
func (m *Manager) dispatchAlert(alert *Alert, async bool) bool {
714787
if m.onAlert == nil || alert == nil {
715788
return false
716789
}
717790

791+
// Check for flapping
792+
if m.checkFlapping(alert.ID) {
793+
log.Debug().
794+
Str("alertID", alert.ID).
795+
Msg("Alert suppressed due to flapping")
796+
return false
797+
}
798+
718799
// Check activation state - only dispatch notifications if active
719800
if m.config.ActivationState != ActivationActive {
720801
log.Debug().
@@ -6932,7 +7013,56 @@ func (m *Manager) Cleanup(maxAge time.Duration) {
69327013

69337014
now := time.Now()
69347015

6935-
// Clean up acknowledged alerts
7016+
// Auto-acknowledge old alerts if configured
7017+
if m.config.AutoAcknowledgeAfterHours > 0 {
7018+
autoAckThreshold := time.Duration(m.config.AutoAcknowledgeAfterHours) * time.Hour
7019+
for id, alert := range m.activeAlerts {
7020+
if !alert.Acknowledged && now.Sub(alert.StartTime) > autoAckThreshold {
7021+
log.Info().
7022+
Str("alertID", id).
7023+
Dur("age", now.Sub(alert.StartTime)).
7024+
Msg("Auto-acknowledging old alert")
7025+
alert.Acknowledged = true
7026+
ackTime := now
7027+
alert.AckTime = &ackTime
7028+
alert.AckUser = "system-auto"
7029+
7030+
if recordAlertAcknowledged != nil {
7031+
recordAlertAcknowledged()
7032+
}
7033+
}
7034+
}
7035+
}
7036+
7037+
// Clean up acknowledged alerts based on TTL
7038+
if m.config.MaxAcknowledgedAgeDays > 0 {
7039+
acknowledgedTTL := time.Duration(m.config.MaxAcknowledgedAgeDays) * 24 * time.Hour
7040+
for id, alert := range m.activeAlerts {
7041+
if alert.Acknowledged && alert.AckTime != nil && now.Sub(*alert.AckTime) > acknowledgedTTL {
7042+
log.Info().
7043+
Str("alertID", id).
7044+
Dur("age", now.Sub(*alert.AckTime)).
7045+
Msg("Cleaning up old acknowledged alert (TTL)")
7046+
m.removeActiveAlertNoLock(id)
7047+
}
7048+
}
7049+
}
7050+
7051+
// Clean up old unacknowledged alerts based on TTL
7052+
if m.config.MaxAlertAgeDays > 0 {
7053+
alertTTL := time.Duration(m.config.MaxAlertAgeDays) * 24 * time.Hour
7054+
for id, alert := range m.activeAlerts {
7055+
if !alert.Acknowledged && now.Sub(alert.StartTime) > alertTTL {
7056+
log.Info().
7057+
Str("alertID", id).
7058+
Dur("age", now.Sub(alert.StartTime)).
7059+
Msg("Cleaning up old unacknowledged alert (TTL)")
7060+
m.removeActiveAlertNoLock(id)
7061+
}
7062+
}
7063+
}
7064+
7065+
// Original cleanup for acknowledged alerts (fallback if TTL not configured)
69367066
for id, alert := range m.activeAlerts {
69377067
if alert.Acknowledged && alert.AckTime != nil && now.Sub(*alert.AckTime) > maxAge {
69387068
m.removeActiveAlertNoLock(id)
@@ -6999,6 +7129,21 @@ func (m *Manager) Cleanup(maxAge time.Duration) {
69997129
}
70007130
}
70017131

7132+
// Clean up flapping history for resolved/inactive alerts
7133+
flappingCleanupAge := 1 * time.Hour
7134+
for alertID := range m.flappingHistory {
7135+
// If alert is no longer active and flapping cooldown has expired
7136+
if _, exists := m.activeAlerts[alertID]; !exists {
7137+
if suppressUntil, suppressed := m.suppressedUntil[alertID]; !suppressed || now.After(suppressUntil.Add(flappingCleanupAge)) {
7138+
delete(m.flappingHistory, alertID)
7139+
delete(m.flappingActive, alertID)
7140+
log.Debug().
7141+
Str("alertID", alertID).
7142+
Msg("Cleaned up flapping history for inactive alert")
7143+
}
7144+
}
7145+
}
7146+
70027147
// Clean up old Docker restart tracking (containers not seen in 24h)
70037148
// Prevents memory leak from ephemeral containers in CI/CD environments
70047149
for resourceID, record := range m.dockerRestartTracking {

internal/alerts/history.go

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -167,36 +167,65 @@ func (hm *HistoryManager) loadHistory() error {
167167
return nil
168168
}
169169

170-
// saveHistory saves history to disk
170+
// saveHistory saves history to disk with retry logic
171171
func (hm *HistoryManager) saveHistory() error {
172+
return hm.saveHistoryWithRetry(3)
173+
}
174+
175+
// saveHistoryWithRetry saves history with exponential backoff retry
176+
func (hm *HistoryManager) saveHistoryWithRetry(maxRetries int) error {
172177
hm.mu.RLock()
173178
snapshot := make([]HistoryEntry, len(hm.history))
174179
copy(snapshot, hm.history)
175180
hm.mu.RUnlock()
176181

177182
data, err := json.Marshal(snapshot)
178-
179183
if err != nil {
180184
return fmt.Errorf("failed to marshal history: %w", err)
181185
}
182186

183-
// Create backup of existing file
184187
historyFile := hm.historyFile
185188
backupFile := hm.backupFile
186189

187-
if _, err := os.Stat(historyFile); err == nil {
188-
if err := os.Rename(historyFile, backupFile); err != nil {
189-
log.Warn().Err(err).Msg("Failed to create backup file")
190+
var lastErr error
191+
for attempt := 1; attempt <= maxRetries; attempt++ {
192+
// Create backup of existing file before writing
193+
if _, err := os.Stat(historyFile); err == nil {
194+
if err := os.Rename(historyFile, backupFile); err != nil {
195+
log.Warn().Err(err).Msg("Failed to create backup file")
196+
}
197+
}
198+
199+
// Write new file
200+
if err := os.WriteFile(historyFile, data, 0644); err != nil {
201+
lastErr = err
202+
log.Warn().
203+
Err(err).
204+
Int("attempt", attempt).
205+
Int("maxRetries", maxRetries).
206+
Msg("Failed to write history file, will retry")
207+
208+
// Restore backup if write failed
209+
if _, statErr := os.Stat(backupFile); statErr == nil {
210+
if restoreErr := os.Rename(backupFile, historyFile); restoreErr != nil {
211+
log.Error().Err(restoreErr).Msg("Failed to restore backup after write failure")
212+
}
213+
}
214+
215+
// Exponential backoff: 100ms, 200ms, 400ms
216+
if attempt < maxRetries {
217+
backoff := time.Duration(100*(1<<uint(attempt-1))) * time.Millisecond
218+
time.Sleep(backoff)
219+
}
220+
continue
190221
}
191-
}
192222

193-
// Write new file
194-
if err := os.WriteFile(historyFile, data, 0644); err != nil {
195-
return fmt.Errorf("failed to write history file: %w", err)
223+
// Success
224+
log.Debug().Int("entries", len(snapshot)).Msg("Saved alert history")
225+
return nil
196226
}
197227

198-
log.Debug().Int("entries", len(snapshot)).Msg("Saved alert history")
199-
return nil
228+
return fmt.Errorf("failed to write history file after %d attempts: %w", maxRetries, lastErr)
200229
}
201230

202231
// startPeriodicSave starts the periodic save routine

0 commit comments

Comments
 (0)