-
Notifications
You must be signed in to change notification settings - Fork 2
/
listeners.go
127 lines (108 loc) · 2.87 KB
/
listeners.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package mqtt
import (
"net"
"sync"
"github.com/snple/mqtt/system"
)
// EstablishFunc is a callback function for establishing new clients.
type EstablishFunc func(id string, c net.Conn, auth Auth) error
// CloseFunc is a callback function for closing all listener clients.
type CloseFunc func(id string)
// Listener is an interface for network listeners. A network listener listens
// for incoming client connections and adds them to the server.
type Listener interface {
Listen(s *system.Info) error // open the network address.
Serve(EstablishFunc) error // starting actively listening for new connections.
ID() string // return the id of the listener.
Auth() Auth
Close(CloseFunc) // stop and close the listener.
}
// Listeners contains the network listeners for the broker.
type Listeners struct {
sync.RWMutex
wg sync.WaitGroup // a waitgroup that waits for all listeners to finish.
internal map[string]Listener // a map of active listeners.
system *system.Info // pointers to system info.
}
// New returns a new instance of Listeners.
func NewListeners(s *system.Info) *Listeners {
return &Listeners{
internal: map[string]Listener{},
system: s,
}
}
// Add adds a new listener to the listeners map, keyed on id.
func (l *Listeners) Add(val Listener) {
l.Lock()
l.internal[val.ID()] = val
l.Unlock()
}
// Get returns the value of a listener if it exists.
func (l *Listeners) Get(id string) (Listener, bool) {
l.RLock()
val, ok := l.internal[id]
l.RUnlock()
return val, ok
}
// Len returns the length of the listeners map.
func (l *Listeners) Len() int {
l.RLock()
val := len(l.internal)
l.RUnlock()
return val
}
// Delete removes a listener from the internal map.
func (l *Listeners) Delete(id string) {
l.Lock()
delete(l.internal, id)
l.Unlock()
}
// Serve starts a listener serving from the internal map.
func (l *Listeners) Serve(id string, establisher EstablishFunc) {
l.RLock()
listener := l.internal[id]
l.RUnlock()
go func(e EstablishFunc) {
defer l.wg.Done()
l.wg.Add(1)
err := listener.Serve(e)
if err != nil {
}
}(establisher)
}
// ServeAll starts all listeners serving from the internal map.
func (l *Listeners) ServeAll(establisher EstablishFunc) {
l.RLock()
i := 0
ids := make([]string, len(l.internal))
for id := range l.internal {
ids[i] = id
i++
}
l.RUnlock()
for _, id := range ids {
l.Serve(id, establisher)
}
}
// Close stops a listener from the internal map.
func (l *Listeners) Close(id string, closer CloseFunc) {
l.RLock()
listener := l.internal[id]
l.RUnlock()
listener.Close(closer)
}
// CloseAll iterates and closes all registered listeners.
func (l *Listeners) CloseAll(closer CloseFunc) {
l.RLock()
i := 0
ids := make([]string, len(l.internal))
for id := range l.internal {
ids[i] = id
i++
}
l.RUnlock()
for _, id := range ids {
l.Close(id, closer)
}
l.wg.Wait()
}