@@ -120,6 +120,7 @@ type NotificationManager struct {
120120 webhookHistory []WebhookDelivery // Keep last 100 webhook deliveries for debugging
121121 webhookRateLimits map [string ]* webhookRateLimit // Track rate limits per webhook URL
122122 appriseExec appriseExecFunc
123+ queue * NotificationQueue // Persistent notification queue
123124}
124125
125126type appriseExecFunc func (ctx context.Context , path string , args []string ) ([]byte , error )
@@ -317,7 +318,15 @@ func NewNotificationManager(publicURL string) *NotificationManager {
317318 } else {
318319 log .Info ().Msg ("NotificationManager initialized without public URL - webhook links may not work" )
319320 }
320- return & NotificationManager {
321+
322+ // Initialize persistent queue
323+ queue , err := NewNotificationQueue ("" )
324+ if err != nil {
325+ log .Error ().Err (err ).Msg ("Failed to initialize notification queue, notifications will be in-memory only" )
326+ queue = nil
327+ }
328+
329+ nm := & NotificationManager {
321330 enabled : true ,
322331 cooldown : 5 * time .Minute ,
323332 lastNotified : make (map [string ]notificationRecord ),
@@ -338,7 +347,15 @@ func NewNotificationManager(publicURL string) *NotificationManager {
338347 webhookRateLimits : make (map [string ]* webhookRateLimit ),
339348 publicURL : cleanURL ,
340349 appriseExec : defaultAppriseExec ,
350+ queue : queue ,
351+ }
352+
353+ // Wire up queue processor if queue is available
354+ if queue != nil {
355+ queue .SetProcessor (nm .ProcessQueuedNotification )
341356 }
357+
358+ return nm
342359}
343360
344361// SetPublicURL updates the public URL used for webhook payloads.
@@ -600,6 +617,90 @@ func (n *NotificationManager) sendGroupedAlerts() {
600617 webhooks := copyWebhookConfigs (n .webhooks )
601618 appriseConfig := copyAppriseConfig (n .appriseConfig )
602619
620+ // Use persistent queue if available, otherwise send directly
621+ if n .queue != nil {
622+ n .enqueueNotifications (emailConfig , webhooks , appriseConfig , alertsToSend )
623+ } else {
624+ n .sendNotificationsDirect (emailConfig , webhooks , appriseConfig , alertsToSend )
625+ }
626+
627+ // Update last notified time for all alerts
628+ now := time .Now ()
629+ for _ , alert := range alertsToSend {
630+ n .lastNotified [alert .ID ] = notificationRecord {
631+ lastSent : now ,
632+ alertStart : alert .StartTime ,
633+ }
634+ }
635+ }
636+
637+ // enqueueNotifications adds notifications to the persistent queue
638+ func (n * NotificationManager ) enqueueNotifications (emailConfig EmailConfig , webhooks []WebhookConfig , appriseConfig AppriseConfig , alertsToSend []* alerts.Alert ) {
639+ // Enqueue email notification
640+ if emailConfig .Enabled {
641+ configJSON , err := json .Marshal (emailConfig )
642+ if err != nil {
643+ log .Error ().Err (err ).Msg ("Failed to marshal email config for queue" )
644+ } else {
645+ notif := & QueuedNotification {
646+ Type : "email" ,
647+ Alerts : alertsToSend ,
648+ Config : configJSON ,
649+ MaxAttempts : 3 ,
650+ }
651+ if err := n .queue .Enqueue (notif ); err != nil {
652+ log .Error ().Err (err ).Msg ("Failed to enqueue email notification" )
653+ } else {
654+ log .Debug ().Int ("alertCount" , len (alertsToSend )).Msg ("Enqueued email notification" )
655+ }
656+ }
657+ }
658+
659+ // Enqueue webhook notifications
660+ for _ , webhook := range webhooks {
661+ if webhook .Enabled {
662+ configJSON , err := json .Marshal (webhook )
663+ if err != nil {
664+ log .Error ().Err (err ).Str ("webhookName" , webhook .Name ).Msg ("Failed to marshal webhook config for queue" )
665+ } else {
666+ notif := & QueuedNotification {
667+ Type : "webhook" ,
668+ Alerts : alertsToSend ,
669+ Config : configJSON ,
670+ MaxAttempts : 3 ,
671+ }
672+ if err := n .queue .Enqueue (notif ); err != nil {
673+ log .Error ().Err (err ).Str ("webhookName" , webhook .Name ).Msg ("Failed to enqueue webhook notification" )
674+ } else {
675+ log .Debug ().Str ("webhookName" , webhook .Name ).Int ("alertCount" , len (alertsToSend )).Msg ("Enqueued webhook notification" )
676+ }
677+ }
678+ }
679+ }
680+
681+ // Enqueue apprise notification
682+ if appriseConfig .Enabled {
683+ configJSON , err := json .Marshal (appriseConfig )
684+ if err != nil {
685+ log .Error ().Err (err ).Msg ("Failed to marshal apprise config for queue" )
686+ } else {
687+ notif := & QueuedNotification {
688+ Type : "apprise" ,
689+ Alerts : alertsToSend ,
690+ Config : configJSON ,
691+ MaxAttempts : 3 ,
692+ }
693+ if err := n .queue .Enqueue (notif ); err != nil {
694+ log .Error ().Err (err ).Msg ("Failed to enqueue apprise notification" )
695+ } else {
696+ log .Debug ().Int ("alertCount" , len (alertsToSend )).Msg ("Enqueued apprise notification" )
697+ }
698+ }
699+ }
700+ }
701+
702+ // sendNotificationsDirect sends notifications without using the queue (fallback)
703+ func (n * NotificationManager ) sendNotificationsDirect (emailConfig EmailConfig , webhooks []WebhookConfig , appriseConfig AppriseConfig , alertsToSend []* alerts.Alert ) {
603704 // Send notifications using the captured snapshots outside the lock to avoid blocking writers
604705 if emailConfig .Enabled {
605706 log .Info ().
@@ -625,15 +726,6 @@ func (n *NotificationManager) sendGroupedAlerts() {
625726 if appriseConfig .Enabled {
626727 go n .sendGroupedApprise (appriseConfig , alertsToSend )
627728 }
628-
629- // Update last notified time for all alerts
630- now := time .Now ()
631- for _ , alert := range alertsToSend {
632- n .lastNotified [alert .ID ] = notificationRecord {
633- lastSent : now ,
634- alertStart : alert .StartTime ,
635- }
636- }
637729}
638730
639731// sendGroupedEmail sends a grouped email notification
@@ -2146,11 +2238,61 @@ func (n *NotificationManager) SendTestNotificationWithConfig(method string, conf
21462238 }
21472239}
21482240
2241+ // GetQueue returns the notification queue (if available)
2242+ func (n * NotificationManager ) GetQueue () * NotificationQueue {
2243+ n .mu .RLock ()
2244+ defer n .mu .RUnlock ()
2245+ return n .queue
2246+ }
2247+
2248+ // ProcessQueuedNotification processes a notification from the persistent queue
2249+ func (n * NotificationManager ) ProcessQueuedNotification (notif * QueuedNotification ) error {
2250+ log .Debug ().
2251+ Str ("notificationID" , notif .ID ).
2252+ Str ("type" , notif .Type ).
2253+ Int ("alertCount" , len (notif .Alerts )).
2254+ Msg ("Processing queued notification" )
2255+
2256+ switch notif .Type {
2257+ case "email" :
2258+ var emailConfig EmailConfig
2259+ if err := json .Unmarshal (notif .Config , & emailConfig ); err != nil {
2260+ return fmt .Errorf ("failed to unmarshal email config: %w" , err )
2261+ }
2262+ n .sendGroupedEmail (emailConfig , notif .Alerts )
2263+ return nil
2264+
2265+ case "webhook" :
2266+ var webhookConfig WebhookConfig
2267+ if err := json .Unmarshal (notif .Config , & webhookConfig ); err != nil {
2268+ return fmt .Errorf ("failed to unmarshal webhook config: %w" , err )
2269+ }
2270+ n .sendGroupedWebhook (webhookConfig , notif .Alerts )
2271+ return nil
2272+
2273+ case "apprise" :
2274+ var appriseConfig AppriseConfig
2275+ if err := json .Unmarshal (notif .Config , & appriseConfig ); err != nil {
2276+ return fmt .Errorf ("failed to unmarshal apprise config: %w" , err )
2277+ }
2278+ n .sendGroupedApprise (appriseConfig , notif .Alerts )
2279+ return nil
2280+
2281+ default :
2282+ return fmt .Errorf ("unknown notification type: %s" , notif .Type )
2283+ }
2284+ }
2285+
21492286// Stop gracefully stops the notification manager
21502287func (n * NotificationManager ) Stop () {
21512288 n .mu .Lock ()
21522289 defer n .mu .Unlock ()
21532290
2291+ // Stop the notification queue if it exists
2292+ if n .queue != nil {
2293+ n .queue .Stop ()
2294+ }
2295+
21542296 // Cancel any pending group timer
21552297 if n .groupTimer != nil {
21562298 n .groupTimer .Stop ()
0 commit comments