/
bus.go
130 lines (112 loc) · 3.16 KB
/
bus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package dean
import (
"fmt"
)
var defaultMaxSockets = 200
// Bus is a logical msg broadcast bus. Msgs arrive on sockets connected to the
// bus. A received msg can be broadcast to the other sockets, or replied back
// to sender. A socket has a tag, and the bus segregates the sockets by tag.
// Msgs arriving on a tagged socket will be broadcast only to other sockets
// with same tag. Think of a tag as a VLAN. The empty tag "" is the default
// tag on the bus.
type Bus struct {
name string
socketsMu rwMutex
sockets map[Socketer]bool
socketQ chan bool
handlersMu rwMutex
handlers map[string]func(*Msg)
connect func(Socketer)
disconnect func(Socketer)
}
// NewBus returns a new bus with connect and disconnect callbacks
func NewBus(name string, connect, disconnect func(Socketer)) *Bus {
if connect == nil {
connect = func(Socketer) { /* don't notify */ }
}
if disconnect == nil {
disconnect = func(Socketer) { /* don't notify */ }
}
return &Bus{
name: name,
sockets: make(map[Socketer]bool),
socketQ: make(chan bool, defaultMaxSockets),
handlers: make(map[string]func(*Msg)),
connect: connect,
disconnect: disconnect,
}
}
// Handle sets the msg handler for a msg tag
func (b *Bus) Handle(tag string, handler func(*Msg)) bool {
if handler == nil {
panic("handler is nil")
}
b.handlersMu.Lock()
defer b.handlersMu.Unlock()
if _, ok := b.handlers[tag]; !ok {
b.handlers[tag] = handler
return true
}
return false
}
// Unhandle removes the msg handle for the msg tag
func (b *Bus) Unhandle(tag string) {
b.handlersMu.Lock()
defer b.handlersMu.Unlock()
delete(b.handlers, tag)
}
func (b *Bus) Name() string {
return b.name
}
// MaxSockets sets the maximum number of socket connections that can be made to
// the bus. Any socket connection attempts past the maximum will block until
// other sockets drop.
func (b *Bus) MaxSockets(maxSockets int) {
b.socketQ = make(chan bool, maxSockets)
}
// plugin the socket to the bus
func (b *Bus) plugin(s Socketer) {
//fmt.Printf("--- PLUGIN %s ---\r\n", s)
// block here when socketQ is full
//println(len(b.socketQ))
b.socketQ <- true
b.socketsMu.Lock()
b.sockets[s] = true
b.socketsMu.Unlock()
// call connect callback
b.connect(s)
}
// unplug the socket from the bus
func (b *Bus) unplug(s Socketer) {
//fmt.Printf("--- UNPLUG %s ---\r\n", s)
b.socketsMu.Lock()
delete(b.sockets, s)
b.socketsMu.Unlock()
// call disconnect callback
b.disconnect(s)
// release one from the socketQ
<-b.socketQ
}
// broadcast msg to all sockets with matching tag, skipping the source socket src
func (b *Bus) broadcast(msg *Msg) {
b.socketsMu.RLock()
defer b.socketsMu.RUnlock()
for sock := range b.sockets {
//println(" sock tag", sock.Tag(), "name", sock.Name())
if msg.src != sock &&
msg.src.Tag() == sock.Tag() &&
sock.TestFlag(SocketFlagBcast) {
fmt.Printf("Broadcast: src %s dst %s msg %s\r\n", msg.src, sock, msg)
sock.Send(msg)
}
}
}
// receive will call the msg handler for the matching tag
func (b *Bus) receive(msg *Msg) {
b.handlersMu.RLock()
defer b.handlersMu.RUnlock()
tag := msg.src.Tag()
if handler, ok := b.handlers[tag]; ok {
handler(msg)
}
}