/
E2_PssRouting.go
157 lines (134 loc) · 4.45 KB
/
E2_PssRouting.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// pss send-to-self hello world
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/swarm"
bzzapi "github.com/ethereum/go-ethereum/swarm/api"
"github.com/ethereum/go-ethereum/swarm/pss"
demo "./common"
)
func newService(bzzdir string, bzzport int, bzznetworkid uint64) func(ctx *node.ServiceContext) (node.Service, error) {
return func(ctx *node.ServiceContext) (node.Service, error) {
// generate a new private key
privkey, err := crypto.GenerateKey()
if err != nil {
demo.Log.Crit("private key generate servicenode 'left' fail: %v")
}
// create necessary swarm params
bzzconfig := bzzapi.NewConfig()
bzzconfig.Path = bzzdir
bzzconfig.Init(privkey)
if err != nil {
demo.Log.Crit("unable to configure swarm", "err", err)
}
bzzconfig.Port = fmt.Sprintf("%d", bzzport)
// shortcut to setting up a swarm node
return swarm.NewSwarm(bzzconfig, nil)
}
}
func main() {
// create three nodes
l_stack, err := demo.NewServiceNode(demo.P2pPort, 0, 0)
if err != nil {
demo.Log.Crit(err.Error())
}
r_stack, err := demo.NewServiceNode(demo.P2pPort+1, 0, 0)
if err != nil {
demo.Log.Crit(err.Error())
}
c_stack, err := demo.NewServiceNode(demo.P2pPort+2, 0, 0)
if err != nil {
demo.Log.Crit(err.Error())
}
// register the pss activated bzz services
l_svc := newService(l_stack.InstanceDir(), demo.BzzDefaultPort, demo.BzzDefaultNetworkId)
err = l_stack.Register(l_svc)
if err != nil {
demo.Log.Crit("servicenode 'left' pss register fail", "err", err)
}
r_svc := newService(r_stack.InstanceDir(), demo.BzzDefaultPort+1, demo.BzzDefaultNetworkId)
err = r_stack.Register(r_svc)
if err != nil {
demo.Log.Crit("servicenode 'right' pss register fail", "err", err)
}
c_svc := newService(c_stack.InstanceDir(), demo.BzzDefaultPort+2, demo.BzzDefaultNetworkId)
err = c_stack.Register(c_svc)
if err != nil {
demo.Log.Crit("servicenode 'right' pss register fail", "err", err)
}
// start the nodes
err = l_stack.Start()
if err != nil {
demo.Log.Crit("servicenode start failed", "err", err)
}
defer os.RemoveAll(l_stack.DataDir())
err = r_stack.Start()
if err != nil {
demo.Log.Crit("servicenode start failed", "err", err)
}
defer os.RemoveAll(r_stack.DataDir())
err = c_stack.Start()
if err != nil {
demo.Log.Crit("servicenode start failed", "err", err)
}
defer os.RemoveAll(c_stack.DataDir())
// connect the nodes to the middle
c_stack.Server().AddPeer(l_stack.Server().Self())
c_stack.Server().AddPeer(r_stack.Server().Self())
// get the rpc clients
l_rpcclient, err := l_stack.Attach()
r_rpcclient, err := r_stack.Attach()
// wait until the state of the swarm overlay network is ready
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
err = demo.WaitHealthy(ctx, 2, l_rpcclient, r_rpcclient)
if err != nil {
demo.Log.Crit("health check fail", "err", err)
}
time.Sleep(time.Second) // because the healthy does not work
// get a valid topic byte
var topic string
err = l_rpcclient.Call(&topic, "pss_stringToTopic", "foo")
if err != nil {
demo.Log.Crit("pss string to topic fail", "err", err)
}
// subscribe to incoming messages on the receiving sevicenode
// this will register a message handler on the specified topic
msgC := make(chan pss.APIMsg)
sub, err := r_rpcclient.Subscribe(context.Background(), "pss", msgC, "receive", topic, false, false)
// supply no address for routing
r_bzzaddr := "0x"
// get the receiver's public key
var r_pubkey string
err = r_rpcclient.Call(&r_pubkey, "pss_getPublicKey")
if err != nil {
demo.Log.Crit("pss get pubkey fail", "err", err)
}
// make the sender aware of the receiver's public key
err = l_rpcclient.Call(nil, "pss_setPeerPublicKey", r_pubkey, topic, r_bzzaddr)
if err != nil {
demo.Log.Crit("pss get pubkey fail", "err", err)
}
// send message using asymmetric encryption
// since it's sent to ourselves, it will not go through pss forwarding
err = l_rpcclient.Call(nil, "pss_sendAsym", r_pubkey, topic, common.ToHex([]byte("bar")))
if err != nil {
demo.Log.Crit("pss send fail", "err", err)
}
// get the incoming message
inmsg := <-msgC
demo.Log.Info("pss received", "msg", string(inmsg.Msg), "from", fmt.Sprintf("%x", inmsg.Key))
// bring down the servicenodes
sub.Unsubscribe()
r_rpcclient.Close()
l_rpcclient.Close()
c_stack.Stop()
r_stack.Stop()
l_stack.Stop()
}