forked from pebbe/zmq4
/
peering3.go
335 lines (288 loc) · 8.93 KB
/
peering3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
//
// Broker peering simulation (part 3).
// Prototypes the full flow of status and tasks
//
/*
One of the differences between peering2 and peering3 is that
peering2 always uses Poll() and then uses a helper function socketInPolled()
to check if a specific socket returned a result, while peering3 uses PollAll()
and checks the event state of the socket in a specific index in the list.
*/
package main
import (
zmq "github.com/pebbe/zmq4"
"fmt"
"math/rand"
"os"
"strconv"
"strings"
"time"
)
const (
NBR_CLIENTS = 10
NBR_WORKERS = 5
WORKER_READY = "**READY**" // Signals worker is ready
)
var (
// Our own name; in practice this would be configured per node
self string
)
// This is the client task. It issues a burst of requests and then
// sleeps for a few seconds. This simulates sporadic activity; when
// a number of clients are active at once, the local workers should
// be overloaded. The client uses a REQ socket for requests and also
// pushes statistics to the monitor socket:
func client_task(i int) {
client, _ := zmq.NewSocket(zmq.REQ)
defer client.Close()
client.Connect("ipc://" + self + "-localfe.ipc")
monitor, _ := zmq.NewSocket(zmq.PUSH)
defer monitor.Close()
monitor.Connect("ipc://" + self + "-monitor.ipc")
poller := zmq.NewPoller()
poller.Add(client, zmq.POLLIN)
for {
time.Sleep(time.Duration(rand.Intn(5000)) * time.Millisecond)
for burst := rand.Intn(15); burst > 0; burst-- {
task_id := fmt.Sprintf("%04X-%s-%d", rand.Intn(0x10000), self, i)
// Send request with random hex ID
client.Send(task_id, 0)
// Wait max ten seconds for a reply, then complain
sockets, err := poller.Poll(10 * time.Second)
if err != nil {
break // Interrupted
}
if len(sockets) == 1 {
reply, err := client.Recv(0)
if err != nil {
break // Interrupted
}
// Worker is supposed to answer us with our task id
id := strings.Fields(reply)[0]
if id != task_id {
panic("id != task_id")
}
monitor.Send(reply, 0)
} else {
monitor.Send("E: CLIENT EXIT - lost task "+task_id, 0)
return
}
}
}
}
// This is the worker task, which uses a REQ socket to plug into the
// load-balancer. It's the same stub worker task you've seen in other
// examples:
func worker_task(i int) {
worker, _ := zmq.NewSocket(zmq.REQ)
defer worker.Close()
worker.Connect("ipc://" + self + "-localbe.ipc")
// Tell broker we're ready for work
worker.SendMessage(WORKER_READY)
// Process messages as they arrive
for {
msg, err := worker.RecvMessage(0)
if err != nil {
break // Interrupted
}
// Workers are busy for 0/1 seconds
time.Sleep(time.Duration(rand.Intn(2)) * time.Second)
n := len(msg) - 1
worker.SendMessage(msg[:n], fmt.Sprintf("%s %s-%d", msg[n], self, i))
}
}
// The main task begins by setting-up all its sockets. The local frontend
// talks to clients, and our local backend talks to workers. The cloud
// frontend talks to peer brokers as if they were clients, and the cloud
// backend talks to peer brokers as if they were workers. The state
// backend publishes regular state messages, and the state frontend
// subscribes to all state backends to collect these messages. Finally,
// we use a PULL monitor socket to collect printable messages from tasks:
func main() {
// First argument is this broker's name
// Other arguments are our peers' names
//
if len(os.Args) < 2 {
fmt.Println("syntax: peering1 me {you}...")
os.Exit(1)
}
self = os.Args[1]
fmt.Printf("I: preparing broker at %s...\n", self)
rand.Seed(time.Now().UnixNano())
// Prepare local frontend and backend
localfe, _ := zmq.NewSocket(zmq.ROUTER)
defer localfe.Close()
localfe.Bind("ipc://" + self + "-localfe.ipc")
localbe, _ := zmq.NewSocket(zmq.ROUTER)
defer localbe.Close()
localbe.Bind("ipc://" + self + "-localbe.ipc")
// Bind cloud frontend to endpoint
cloudfe, _ := zmq.NewSocket(zmq.ROUTER)
defer cloudfe.Close()
cloudfe.SetIdentity(self)
cloudfe.Bind("ipc://" + self + "-cloud.ipc")
// Connect cloud backend to all peers
cloudbe, _ := zmq.NewSocket(zmq.ROUTER)
defer cloudbe.Close()
cloudbe.SetIdentity(self)
for _, peer := range os.Args[2:] {
fmt.Printf("I: connecting to cloud frontend at '%s'\n", peer)
cloudbe.Connect("ipc://" + peer + "-cloud.ipc")
}
// Bind state backend to endpoint
statebe, _ := zmq.NewSocket(zmq.PUB)
defer statebe.Close()
statebe.Bind("ipc://" + self + "-state.ipc")
// Connect state frontend to all peers
statefe, _ := zmq.NewSocket(zmq.SUB)
defer statefe.Close()
statefe.SetSubscribe("")
for _, peer := range os.Args[2:] {
fmt.Printf("I: connecting to state backend at '%s'\n", peer)
statefe.Connect("ipc://" + peer + "-state.ipc")
}
// Prepare monitor socket
monitor, _ := zmq.NewSocket(zmq.PULL)
defer monitor.Close()
monitor.Bind("ipc://" + self + "-monitor.ipc")
// After binding and connecting all our sockets, we start our child
// tasks - workers and clients:
for worker_nbr := 0; worker_nbr < NBR_WORKERS; worker_nbr++ {
go worker_task(worker_nbr)
}
// Start local clients
for client_nbr := 0; client_nbr < NBR_CLIENTS; client_nbr++ {
go client_task(client_nbr)
}
// Queue of available workers
local_capacity := 0
cloud_capacity := 0
workers := make([]string, 0)
primary := zmq.NewPoller()
primary.Add(localbe, zmq.POLLIN)
primary.Add(cloudbe, zmq.POLLIN)
primary.Add(statefe, zmq.POLLIN)
primary.Add(monitor, zmq.POLLIN)
secondary1 := zmq.NewPoller()
secondary1.Add(localfe, zmq.POLLIN)
secondary2 := zmq.NewPoller()
secondary2.Add(localfe, zmq.POLLIN)
secondary2.Add(cloudfe, zmq.POLLIN)
msg := make([]string, 0)
for {
// If we have no workers ready, wait indefinitely
timeout := time.Duration(time.Second)
if local_capacity == 0 {
timeout = -1
}
sockets, err := primary.PollAll(timeout)
if err != nil {
break // Interrupted
}
// Track if capacity changes during this iteration
previous := local_capacity
// Handle reply from local worker
msg = msg[0:0]
if sockets[0].Events&zmq.POLLIN != 0 { // 0 == localbe
msg, err = localbe.RecvMessage(0)
if err != nil {
break // Interrupted
}
var identity string
identity, msg = unwrap(msg)
workers = append(workers, identity)
local_capacity++
// If it's READY, don't route the message any further
if msg[0] == WORKER_READY {
msg = msg[0:0]
}
} else if sockets[1].Events&zmq.POLLIN != 0 { // 1 == cloudbe
// Or handle reply from peer broker
msg, err = cloudbe.RecvMessage(0)
if err != nil {
break // Interrupted
}
// We don't use peer broker identity for anything
_, msg = unwrap(msg)
}
if len(msg) > 0 {
// Route reply to cloud if it's addressed to a broker
to_broker := false
for _, peer := range os.Args[2:] {
if peer == msg[0] {
to_broker = true
break
}
}
if to_broker {
cloudfe.SendMessage(msg)
} else {
localfe.SendMessage(msg)
}
}
// If we have input messages on our statefe or monitor sockets we
// can process these immediately:
if sockets[2].Events&zmq.POLLIN != 0 { // 2 == statefe
var status string
m, _ := statefe.RecvMessage(0)
_, m = unwrap(m) // peer
status, _ = unwrap(m)
cloud_capacity, _ = strconv.Atoi(status)
}
if sockets[3].Events&zmq.POLLIN != 0 { // 3 == monitor
status, _ := monitor.Recv(0)
fmt.Println(status)
}
// Now route as many clients requests as we can handle. If we have
// local capacity we poll both localfe and cloudfe. If we have cloud
// capacity only, we poll just localfe. We route any request locally
// if we can, else we route to the cloud.
for local_capacity+cloud_capacity > 0 {
var sockets []zmq.Polled
var err error
if local_capacity > 0 {
sockets, err = secondary2.PollAll(0)
} else {
sockets, err = secondary1.PollAll(0)
}
if err != nil {
panic(err)
}
if sockets[0].Events&zmq.POLLIN != 0 { // 0 == localfe
msg, _ = localfe.RecvMessage(0)
} else if len(sockets) > 1 && sockets[1].Events&zmq.POLLIN != 0 { // 1 == cloudfe
msg, _ = cloudfe.RecvMessage(0)
} else {
break // No work, go back to primary
}
if local_capacity > 0 {
localbe.SendMessage(workers[0], "", msg)
workers = workers[1:]
local_capacity--
} else {
// Route to random broker peer
random_peer := rand.Intn(len(os.Args)-2) + 2
cloudbe.SendMessage(os.Args[random_peer], "", msg)
}
}
// We broadcast capacity messages to other peers; to reduce chatter
// we do this only if our capacity changed.
if local_capacity != previous {
// We stick our own identity onto the envelope
// Broadcast new capacity
statebe.SendMessage(self, "", local_capacity)
}
}
}
// Pops frame off front of message and returns it as 'head'
// If next frame is empty, pops that empty frame.
// Return remaining frames of message as 'tail'
func unwrap(msg []string) (head string, tail []string) {
head = msg[0]
if len(msg) > 1 && msg[1] == "" {
tail = msg[2:]
} else {
tail = msg[1:]
}
return
}