/
B3_Message.go
232 lines (199 loc) · 5.33 KB
/
B3_Message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
// trigger p2p message with RPC
package main
import (
"crypto/ecdsa"
"fmt"
"net"
"os"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
demo "./common"
)
var (
protoW = &sync.WaitGroup{}
messageW = &sync.WaitGroup{}
msgC = make(chan string)
ipcpath = ".demo.ipc"
)
// create a protocol that can take care of message sending
// the Run function is invoked upon connection
// it gets passed:
// * an instance of p2p.Peer, which represents the remote peer
// * an instance of p2p.MsgReadWriter, which is the io between the node and its peer
type FooMsg struct {
Content string
}
var (
proto = p2p.Protocol{
Name: "foo",
Version: 42,
Length: 1,
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
// only one of the peers will send this
content, ok := <-msgC
if ok {
outmsg := &FooMsg{
Content: content,
}
// send the message
err := p2p.Send(rw, 0, outmsg)
if err != nil {
return fmt.Errorf("Send p2p message fail: %v", err)
}
demo.Log.Info("sending message", "peer", p, "msg", outmsg)
}
// wait for the subscriptions to end
messageW.Wait()
protoW.Done()
// terminate the protocol
return nil
},
}
)
type FooAPI struct {
sent bool
}
func (api *FooAPI) SendMsg(content string) error {
if api.sent {
return fmt.Errorf("Already sent")
}
msgC <- content
close(msgC)
api.sent = true
return nil
}
// create a server
func newP2pServer(privkey *ecdsa.PrivateKey, name string, version string, port int) *p2p.Server {
// we need to explicitly allow at least one peer, otherwise the connection attempt will be refused
// we also need to explicitly tell the server to generate events for messages
cfg := p2p.Config{
PrivateKey: privkey,
Name: common.MakeName(name, version),
MaxPeers: 1,
Protocols: []p2p.Protocol{proto},
EnableMsgEvents: true,
}
if port > 0 {
cfg.ListenAddr = fmt.Sprintf(":%d", port)
}
srv := &p2p.Server{
Config: cfg,
}
return srv
}
func newRPCServer() (*rpc.Server, error) {
// set up the RPC server
rpcsrv := rpc.NewServer()
err := rpcsrv.RegisterName("foo", &FooAPI{})
if err != nil {
return nil, fmt.Errorf("Register API method(s) fail: %v", err)
}
// create IPC endpoint
ipclistener, err := net.Listen("unix", ipcpath)
if err != nil {
return nil, fmt.Errorf("IPC endpoint create fail: %v", err)
}
// mount RPC server on IPC endpoint
// it will automatically detect and serve any valid methods
go func() {
err = rpcsrv.ServeListener(ipclistener)
if err != nil {
demo.Log.Crit("Mount RPC on IPC fail", "err", err)
}
}()
return rpcsrv, nil
}
func main() {
// we need private keys for both servers
privkey_one, err := crypto.GenerateKey()
if err != nil {
demo.Log.Crit("Generate private key #1 failed", "err", err)
}
privkey_two, err := crypto.GenerateKey()
if err != nil {
demo.Log.Crit("Generate private key #2 failed", "err", err)
}
// set up the two servers
srv_one := newP2pServer(privkey_one, "foo", "42", 0)
err = srv_one.Start()
if err != nil {
demo.Log.Crit("Start p2p.Server #1 failed", "err", err)
}
srv_two := newP2pServer(privkey_two, "bar", "666", 31234)
err = srv_two.Start()
if err != nil {
demo.Log.Crit("Start p2p.Server #2 failed", "err", err)
}
// set up the event subscriptions on both servers
// the Err() on the Subscription object returns when subscription is closed
eventOneC := make(chan *p2p.PeerEvent)
sub_one := srv_one.SubscribeEvents(eventOneC)
go func() {
for {
select {
case peerevent := <-eventOneC:
if peerevent.Type == "add" {
demo.Log.Debug("Received peer add notification on node #1", "peer", peerevent.Peer)
} else if peerevent.Type == "msgsend" {
demo.Log.Info("Received message send notification on node #1", "event", peerevent)
messageW.Done()
}
case <-sub_one.Err():
return
}
}
}()
eventTwoC := make(chan *p2p.PeerEvent)
sub_two := srv_two.SubscribeEvents(eventTwoC)
go func() {
for {
select {
case peerevent := <-eventTwoC:
if peerevent.Type == "add" {
demo.Log.Debug("Received peer add notification on node #2", "peer", peerevent.Peer)
} else if peerevent.Type == "msgsend" {
demo.Log.Info("Received message send notification on node #2", "event", peerevent)
messageW.Done()
}
case <-sub_two.Err():
return
}
}
}()
// create and start RPC server
rpcsrv, err := newRPCServer()
if err != nil {
demo.Log.Crit(err.Error())
}
defer os.Remove(ipcpath)
// get the node instance of the second server
node_two := srv_two.Self()
// add it as a peer to the first node
// the connection and crypto handshake will be performed automatically
srv_one.AddPeer(node_two)
// create an IPC client
rpcclient, err := rpc.Dial(ipcpath)
if err != nil {
demo.Log.Crit("IPC dial fail", "err", err)
}
// wait for one message be sent, and both protocols to end
messageW.Add(1)
protoW.Add(2)
// call the RPC method
err = rpcclient.Call(nil, "foo_sendMsg", "foobar")
if err != nil {
demo.Log.Crit("RPC call fail", "err", err)
}
// wait for protocols to finish
protoW.Wait()
// terminate subscription loops and unsubscribe
sub_one.Unsubscribe()
sub_two.Unsubscribe()
// stop the servers
rpcsrv.Stop()
srv_one.Stop()
srv_two.Stop()
}