/
notifications.go
96 lines (79 loc) · 1.94 KB
/
notifications.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package main
import "sync"
const (
NEW_CONSENSUS NotificationType = "New Consensus Request"
EXECUTION_DONE NotificationType = "Execution done"
)
type NotificationService interface {
GetConsumer() func(*Message)
}
type NotificationType string
type NotificationServiceConfig interface {
IsValid() error
IsEnabled() bool
GetService() (NotificationService, error)
GetName() string
}
type Message struct {
Content string
Type NotificationType
Url string
}
func newMessage(content string, nType NotificationType) *Message {
return &Message{Content: content, Type: nType}
}
type MessageReceiver struct {
channel chan *Message
consumer func(*Message)
}
func (r *MessageReceiver) run() {
for {
select {
case msg := <-r.channel:
r.consumer(msg)
default:
}
}
}
type NotificationManager struct {
muxReceivers sync.RWMutex
listeners []*MessageReceiver
sendOut chan *Message
registerReceiver chan *MessageReceiver
}
func newNotificationManager() *NotificationManager {
b := &NotificationManager{
listeners: []*MessageReceiver{},
sendOut: make(chan *Message, 100),
}
go b.dispatcher()
return b
}
func (m *NotificationManager) dispatcher() {
for {
select {
case value := <-m.sendOut:
for _, listener := range m.listeners {
select {
case listener.channel <- value:
default:
log.Printf("Ommit passing notification to listener, channel full")
}
}
default:
}
}
}
func (m *NotificationManager) registerWithChannelSize(service NotificationService, channelSize int) {
msgReceiver := &MessageReceiver{channel: make(chan *Message, channelSize), consumer: service.GetConsumer()}
go msgReceiver.run()
m.muxReceivers.Lock()
defer m.muxReceivers.Unlock()
m.listeners = append(m.listeners, msgReceiver)
}
func (m *NotificationManager) register(service NotificationService) {
m.registerWithChannelSize(service, 1000)
}
func (m *NotificationManager) Notify(msg *Message) {
m.sendOut <- msg
}