-
Notifications
You must be signed in to change notification settings - Fork 25
/
manager.go
99 lines (90 loc) · 2.91 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package aggregator
import (
"sync"
"time"
"github.com/ncarlier/feedpushr/v3/pkg/model"
"github.com/ncarlier/feedpushr/v3/pkg/output"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
// Manager of the feed aggregators
type Manager struct {
feedAggregators map[string]*FeedAggregator
shutdownWaitGroup sync.WaitGroup
outputs *output.Manager
log zerolog.Logger
delay time.Duration
timeout time.Duration
callbackURL string
}
// NewAggregatorManager creates a new aggregator manager
func NewAggregatorManager(outputs *output.Manager, delay time.Duration, timeout time.Duration, callbackURL string) *Manager {
return &Manager{
feedAggregators: make(map[string]*FeedAggregator),
outputs: outputs,
log: log.With().Str("component", "aggregator").Logger(),
delay: delay,
timeout: timeout,
callbackURL: callbackURL,
}
}
// GetFeedAggregator returns a feed aggregator
func (m *Manager) GetFeedAggregator(id string) *FeedAggregator {
return m.feedAggregators[id]
}
// RegisterFeedAggregator register and start a new feed aggregator
func (m *Manager) RegisterFeedAggregator(feed *model.FeedDef, delay time.Duration) *FeedAggregator {
fa := m.GetFeedAggregator(feed.ID)
if fa != nil {
m.log.Debug().Str("source", feed.ID).Msg("feed aggregator already registered")
return fa
}
fa = NewFeedAggregator(feed, m.outputs, m.delay, m.timeout, m.callbackURL)
m.feedAggregators[feed.ID] = fa
m.shutdownWaitGroup.Add(1)
if delay > 0 {
fa.StartWithDelay(delay)
} else {
fa.Start()
}
m.log.Debug().Str("source", feed.ID).Dur("delay", delay).Msg("feed aggregator registered")
return fa
}
// UnRegisterFeedAggregator stop and un-register a feed aggregator
func (m *Manager) UnRegisterFeedAggregator(id string) {
fa := m.GetFeedAggregator(id)
if fa == nil {
m.log.Warn().Str("feed", id).Msg("unable to deregister feed aggregator: not found")
return
}
fa.Stop()
m.feedAggregators[id] = nil
delete(m.feedAggregators, id)
m.shutdownWaitGroup.Done()
m.log.Debug().Str("feed", id).Msg("feed aggregator unregistered")
}
// RestartFeedAggregator restart feed aggregator with delay
func (m *Manager) RestartFeedAggregator(id string, delay time.Duration) {
fa := m.GetFeedAggregator(id)
if fa == nil {
m.log.Warn().Str("feed", id).Msg("unable to restart feed aggregator: not found")
return
}
fa.Stop()
fa.StartWithDelay(delay)
}
// Shutdown stop the manager (aka. stop and unregister all feed aggregator)
func (m *Manager) Shutdown() {
m.log.Debug().Msg("shutting down all aggregators")
// Build temporary list of IDs
// This is necessary because feddAggregators will be mutate
ids := make([]string, 0, len(m.feedAggregators))
for id := range m.feedAggregators {
ids = append(ids, id)
}
for _, id := range ids {
go m.UnRegisterFeedAggregator(id)
}
m.shutdownWaitGroup.Wait()
m.log.Debug().Msg("all aggregators stopped")
}