/
C4_Full.go
403 lines (351 loc) · 9.62 KB
/
C4_Full.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
// Node stack with ping/pong and API reporting
package main
import (
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rpc"
demo "./common"
)
var (
p2pPort = 30100
ipcpath = ".demo.ipc"
datadirPrefix = ".data_"
stackW = &sync.WaitGroup{}
)
type FooPingMsg struct {
Pong bool
Created time.Time
}
// the service we want to offer on the node
// it must implement the node.Service interface
type fooService struct {
pongcount int
pingC map[enode.ID]chan struct{}
}
// specify API structs that carry the methods we want to use
func (self *fooService) APIs() []rpc.API {
return []rpc.API{
{
Namespace: "foo",
Version: "42",
Service: &FooAPI{
running: true,
pongcount: &self.pongcount,
pingC: self.pingC,
},
Public: true,
},
}
}
// the p2p.Protocol to run
// sends a ping to its peer, waits pong
func (self *fooService) Protocols() []p2p.Protocol {
return []p2p.Protocol{
{
Name: "fooping",
Version: 666,
Length: 1,
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
// create the channel when a connection is made
self.pingC[p.ID()] = make(chan struct{})
pingcount := 0
// create the message structure
// we don't know if we're awaiting anything at the time of the kill so this subroutine will run till the application ends
go func() {
for {
// listen for new message
msg, err := rw.ReadMsg()
if err != nil {
demo.Log.Warn("Receive p2p message fail", "err", err)
break
}
// decode the message and check the contents
var decodedmsg FooPingMsg
err = msg.Decode(&decodedmsg)
if err != nil {
demo.Log.Error("Decode p2p message fail", "err", err)
break
}
// if we get a pong, update our pong counter
// if not, send pong
if decodedmsg.Pong {
self.pongcount++
demo.Log.Debug("received pong", "peer", p, "count", self.pongcount)
} else {
demo.Log.Debug("received ping", "peer", p)
pingmsg := &FooPingMsg{
Pong: true,
Created: time.Now(),
}
err := p2p.Send(rw, 0, pingmsg)
if err != nil {
demo.Log.Error("Send p2p message fail", "err", err)
break
}
demo.Log.Debug("sent pong", "peer", p)
}
}
}()
// pings are invoked through the API using a channel
// when this channel is closed we quit the protocol
for {
// wait for signal to send ping
_, ok := <-self.pingC[p.ID()]
if !ok {
demo.Log.Debug("break protocol", "peer", p)
break
}
// send ping
pingmsg := &FooPingMsg{
Pong: false,
Created: time.Now(),
}
// either handler or sender should be asynchronous, otherwise we might deadlock
go p2p.Send(rw, 0, pingmsg)
pingcount++
demo.Log.Info("sent ping", "peer", p, "count", pingcount)
}
return nil
},
},
}
}
func (self *fooService) Start(srv *p2p.Server) error {
return nil
}
func (self *fooService) Stop() error {
return nil
}
// Specify the API
// in this example we don't care about who the pongs comes from, we count them all
// note it is a bit fragile; we don't check for closed channels
type FooAPI struct {
running bool
pongcount *int
pingC map[enode.ID]chan struct{}
}
func (api *FooAPI) Increment() {
*api.pongcount++
}
// invoke a single ping
func (api *FooAPI) Ping(id enode.ID) error {
if api.running {
api.pingC[id] <- struct{}{}
}
return nil
}
// quit the ping protocol
func (api *FooAPI) Quit(id enode.ID) error {
demo.Log.Debug("quitting API", "peer", id)
if api.pingC[id] == nil {
return fmt.Errorf("unknown peer")
}
api.running = false
close(api.pingC[id])
return nil
}
// return the amounts of pongs received
func (api *FooAPI) PongCount() (int, error) {
return *api.pongcount, nil
}
// set up the local service node
func newServiceNode(port int, httpport int, wsport int, modules ...string) (*node.Node, error) {
cfg := &node.DefaultConfig
cfg.P2P.ListenAddr = fmt.Sprintf(":%d", port)
cfg.P2P.EnableMsgEvents = true
cfg.P2P.NoDiscovery = true
cfg.IPCPath = ipcpath
cfg.DataDir = fmt.Sprintf("%s%d", datadirPrefix, port)
if httpport > 0 {
cfg.HTTPHost = node.DefaultHTTPHost
cfg.HTTPPort = httpport
}
if wsport > 0 {
cfg.WSHost = node.DefaultWSHost
cfg.WSPort = wsport
cfg.WSOrigins = []string{"*"}
for i := 0; i < len(modules); i++ {
cfg.WSModules = append(cfg.WSModules, modules[i])
}
}
stack, err := node.New(cfg)
if err != nil {
return nil, fmt.Errorf("ServiceNode create fail: %v", err)
}
return stack, nil
}
func main() {
// create the two nodes
stack_one, err := newServiceNode(p2pPort, 0, 0)
if err != nil {
demo.Log.Crit("Create servicenode #1 fail", "err", err)
}
stack_two, err := newServiceNode(p2pPort+1, 0, 0)
if err != nil {
demo.Log.Crit("Create servicenode #2 fail", "err", err)
}
// wrapper function for servicenode to start the service
foosvc := func(ctx *node.ServiceContext) (node.Service, error) {
return &fooService{
pingC: make(map[enode.ID]chan struct{}),
}, nil
}
// register adds the service to the services the servicenode starts when started
err = stack_one.Register(foosvc)
if err != nil {
demo.Log.Crit("Register service in servicenode #1 fail", "err", err)
}
err = stack_two.Register(foosvc)
if err != nil {
demo.Log.Crit("Register service in servicenode #2 fail", "err", err)
}
// start the nodes
err = stack_one.Start()
if err != nil {
demo.Log.Crit("servicenode #1 start failed", "err", err)
}
err = stack_two.Start()
if err != nil {
demo.Log.Crit("servicenode #2 start failed", "err", err)
}
// connect to the servicenode RPCs
rpcclient_one, err := rpc.Dial(filepath.Join(stack_one.DataDir(), ipcpath))
if err != nil {
demo.Log.Crit("connect to servicenode #1 IPC fail", "err", err)
}
defer os.RemoveAll(stack_one.DataDir())
rpcclient_two, err := rpc.Dial(filepath.Join(stack_two.DataDir(), ipcpath))
if err != nil {
demo.Log.Crit("connect to servicenode #2 IPC fail", "err", err)
}
defer os.RemoveAll(stack_two.DataDir())
// display that the initial pong counts are 0
var count int
err = rpcclient_one.Call(&count, "foo_pongCount")
if err != nil {
demo.Log.Crit("servicenode #1 pongcount RPC failed", "err", err)
}
demo.Log.Info("servicenode #1 before ping", "pongcount", count)
err = rpcclient_two.Call(&count, "foo_pongCount")
if err != nil {
demo.Log.Crit("servicenode #2 pongcount RPC failed", "err", err)
}
demo.Log.Info("servicenode #2 before ping", "pongcount", count)
// get the server instances
srv_one := stack_one.Server()
srv_two := stack_two.Server()
// subscribe to peerevents
eventOneC := make(chan *p2p.PeerEvent)
sub_one := srv_one.SubscribeEvents(eventOneC)
eventTwoC := make(chan *p2p.PeerEvent)
sub_two := srv_two.SubscribeEvents(eventTwoC)
// connect the nodes
p2pnode_two := srv_two.Self()
srv_one.AddPeer(p2pnode_two)
// fork and do the pinging
stackW.Add(2)
pingmax_one := 4
pingmax_two := 2
go func() {
// when we get the add event, we know we are connected
ev := <-eventOneC
if ev.Type != "add" {
demo.Log.Error("server #1 expected peer add", "eventtype", ev.Type)
stackW.Done()
return
}
demo.Log.Debug("server #1 connected", "peer", ev.Peer)
// send the pings
for i := 0; i < pingmax_one; i++ {
err := rpcclient_one.Call(nil, "foo_ping", ev.Peer)
if err != nil {
demo.Log.Error("server #1 RPC ping fail", "err", err)
stackW.Done()
break
}
}
// wait for all msgrecv events
// pings we receive, and pongs we expect from pings we sent
for i := 0; i < pingmax_two+pingmax_one; {
ev := <-eventOneC
demo.Log.Warn("msg", "type", ev.Type, "i", i)
if ev.Type == "msgrecv" {
i++
}
}
stackW.Done()
}()
// mirrors the previous go func
go func() {
ev := <-eventTwoC
if ev.Type != "add" {
demo.Log.Error("expected peer add", "eventtype", ev.Type)
stackW.Done()
return
}
demo.Log.Debug("server #2 connected", "peer", ev.Peer)
for i := 0; i < pingmax_two; i++ {
err := rpcclient_two.Call(nil, "foo_ping", ev.Peer)
if err != nil {
demo.Log.Error("server #2 RPC ping fail", "err", err)
stackW.Done()
break
}
}
for i := 0; i < pingmax_one+pingmax_two; {
ev := <-eventTwoC
if ev.Type == "msgrecv" {
demo.Log.Warn("msg", "type", ev.Type, "i", i)
i++
}
}
stackW.Done()
}()
// wait for the two ping pong exchanges to finish
stackW.Wait()
// tell the API to shut down
// this will disconnect the peers and close the channels connecting API and protocol
err = rpcclient_one.Call(nil, "foo_quit", srv_two.Self().ID())
if err != nil {
demo.Log.Error("server #1 RPC quit fail", "err", err)
}
err = rpcclient_two.Call(nil, "foo_quit", srv_one.Self().ID())
if err != nil {
demo.Log.Error("server #2 RPC quit fail", "err", err)
}
// disconnect will generate drop events
for {
ev := <-eventOneC
if ev.Type == "drop" {
break
}
}
for {
ev := <-eventTwoC
if ev.Type == "drop" {
break
}
}
// proudly inspect the results
err = rpcclient_one.Call(&count, "foo_pongCount")
if err != nil {
demo.Log.Crit("servicenode #1 pongcount RPC failed", "err", err)
}
demo.Log.Info("servicenode #1 after ping", "pongcount", count)
err = rpcclient_two.Call(&count, "foo_pongCount")
if err != nil {
demo.Log.Crit("servicenode #2 pongcount RPC failed", "err", err)
}
demo.Log.Info("servicenode #2 after ping", "pongcount", count)
// bring down the servicenodes
sub_one.Unsubscribe()
sub_two.Unsubscribe()
stack_one.Stop()
stack_two.Stop()
}