-
Notifications
You must be signed in to change notification settings - Fork 249
/
waku.go
285 lines (232 loc) · 8.15 KB
/
waku.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
package gethbridge
import (
"context"
"crypto/ecdsa"
"errors"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/connection"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/waku"
wakucommon "github.com/status-im/status-go/waku/common"
)
type gethWakuWrapper struct {
waku *waku.Waku
}
// NewGethWakuWrapper returns an object that wraps Geth's Waku in a types interface
func NewGethWakuWrapper(w *waku.Waku) types.Waku {
if w == nil {
panic("waku cannot be nil")
}
return &gethWakuWrapper{
waku: w,
}
}
// GetGethWhisperFrom retrieves the underlying whisper Whisper struct from a wrapped Whisper interface
func GetGethWakuFrom(m types.Waku) *waku.Waku {
return m.(*gethWakuWrapper).waku
}
func (w *gethWakuWrapper) PublicWakuAPI() types.PublicWakuAPI {
return NewGethPublicWakuAPIWrapper(waku.NewPublicWakuAPI(w.waku))
}
func (w *gethWakuWrapper) Version() uint {
return 1
}
// Added for compatibility with waku V2
func (w *gethWakuWrapper) PeerCount() int {
return -1
}
// Added for compatibility with waku V2
func (w *gethWakuWrapper) StartDiscV5() error {
return errors.New("not available in WakuV1")
}
// Added for compatibility with waku V2
func (w *gethWakuWrapper) StopDiscV5() error {
return errors.New("not available in WakuV1")
}
// PeerCount function only added for compatibility with waku V2
func (w *gethWakuWrapper) AddStorePeer(address string) (peer.ID, error) {
return "", errors.New("not available in WakuV1")
}
// AddRelayPeer function only added for compatibility with waku V2
func (w *gethWakuWrapper) AddRelayPeer(address string) (peer.ID, error) {
return "", errors.New("not available in WakuV1")
}
// DialPeer function only added for compatibility with waku V2
func (w *gethWakuWrapper) DialPeer(address string) error {
return errors.New("not available in WakuV1")
}
// DialPeerByID function only added for compatibility with waku V2
func (w *gethWakuWrapper) DialPeerByID(peerID string) error {
return errors.New("not available in WakuV1")
}
// ListenAddresses function only added for compatibility with waku V2
func (w *gethWakuWrapper) ListenAddresses() ([]string, error) {
return nil, errors.New("not available in WakuV1")
}
// PeerCount function only added for compatibility with waku V2
func (w *gethWakuWrapper) DropPeer(peerID string) error {
return errors.New("not available in WakuV1")
}
func (w *gethWakuWrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSubscription, error) {
return nil, errors.New("not available in WakuV1")
}
// Peers function only added for compatibility with waku V2
func (w *gethWakuWrapper) Peers() map[string]types.WakuV2Peer {
p := make(map[string]types.WakuV2Peer)
return p
}
// MinPow returns the PoW value required by this node.
func (w *gethWakuWrapper) MinPow() float64 {
return w.waku.MinPow()
}
// MaxMessageSize returns the MaxMessageSize set
func (w *gethWakuWrapper) MaxMessageSize() uint32 {
return w.waku.MaxMessageSize()
}
// BloomFilter returns the aggregated bloom filter for all the topics of interest.
// The nodes are required to send only messages that match the advertised bloom filter.
// If a message does not match the bloom, it will tantamount to spam, and the peer will
// be disconnected.
func (w *gethWakuWrapper) BloomFilter() []byte {
return w.waku.BloomFilter()
}
// GetCurrentTime returns current time.
func (w *gethWakuWrapper) GetCurrentTime() time.Time {
return w.waku.CurrentTime()
}
func (w *gethWakuWrapper) SubscribeEnvelopeEvents(eventsProxy chan<- types.EnvelopeEvent) types.Subscription {
events := make(chan wakucommon.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper
go func() {
for e := range events {
eventsProxy <- *NewWakuEnvelopeEventWrapper(&e)
}
}()
return NewGethSubscriptionWrapper(w.waku.SubscribeEnvelopeEvents(events))
}
func (w *gethWakuWrapper) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) {
return w.waku.GetPrivateKey(id)
}
// AddKeyPair imports a asymmetric private key and returns a deterministic identifier.
func (w *gethWakuWrapper) AddKeyPair(key *ecdsa.PrivateKey) (string, error) {
return w.waku.AddKeyPair(key)
}
// DeleteKeyPair deletes the key with the specified ID if it exists.
func (w *gethWakuWrapper) DeleteKeyPair(keyID string) bool {
return w.waku.DeleteKeyPair(keyID)
}
func (w *gethWakuWrapper) AddSymKeyDirect(key []byte) (string, error) {
return w.waku.AddSymKeyDirect(key)
}
func (w *gethWakuWrapper) AddSymKeyFromPassword(password string) (string, error) {
return w.waku.AddSymKeyFromPassword(password)
}
func (w *gethWakuWrapper) DeleteSymKey(id string) bool {
return w.waku.DeleteSymKey(id)
}
func (w *gethWakuWrapper) GetSymKey(id string) ([]byte, error) {
return w.waku.GetSymKey(id)
}
func (w *gethWakuWrapper) Subscribe(opts *types.SubscriptionOptions) (string, error) {
var (
err error
keyAsym *ecdsa.PrivateKey
keySym []byte
)
if opts.SymKeyID != "" {
keySym, err = w.GetSymKey(opts.SymKeyID)
if err != nil {
return "", err
}
}
if opts.PrivateKeyID != "" {
keyAsym, err = w.GetPrivateKey(opts.PrivateKeyID)
if err != nil {
return "", err
}
}
f, err := w.createFilterWrapper("", keyAsym, keySym, opts.PoW, opts.Topics)
if err != nil {
return "", err
}
id, err := w.waku.Subscribe(GetWakuFilterFrom(f))
if err != nil {
return "", err
}
f.(*wakuFilterWrapper).id = id
return id, nil
}
func (w *gethWakuWrapper) GetStats() types.StatsSummary {
return w.waku.GetStats()
}
func (w *gethWakuWrapper) GetFilter(id string) types.Filter {
return NewWakuFilterWrapper(w.waku.GetFilter(id), id)
}
func (w *gethWakuWrapper) Unsubscribe(ctx context.Context, id string) error {
return w.waku.Unsubscribe(id)
}
func (w *gethWakuWrapper) UnsubscribeMany(ids []string) error {
return w.waku.UnsubscribeMany(ids)
}
func (w *gethWakuWrapper) createFilterWrapper(id string, keyAsym *ecdsa.PrivateKey, keySym []byte, pow float64, topics [][]byte) (types.Filter, error) {
return NewWakuFilterWrapper(&wakucommon.Filter{
KeyAsym: keyAsym,
KeySym: keySym,
PoW: pow,
AllowP2P: true,
Topics: topics,
Messages: wakucommon.NewMemoryMessageStore(),
}, id), nil
}
func (w *gethWakuWrapper) SendMessagesRequest(peerID []byte, r types.MessagesRequest) error {
return w.waku.SendMessagesRequest(peerID, wakucommon.MessagesRequest{
ID: r.ID,
From: r.From,
To: r.To,
Limit: r.Limit,
Cursor: r.Cursor,
Bloom: r.Bloom,
Topics: r.Topics,
})
}
// RequestHistoricMessages sends a message with p2pRequestCode to a specific peer,
// which is known to implement MailServer interface, and is supposed to process this
// request and respond with a number of peer-to-peer messages (possibly expired),
// which are not supposed to be forwarded any further.
// The whisper protocol is agnostic of the format and contents of envelope.
func (w *gethWakuWrapper) RequestHistoricMessagesWithTimeout(peerID []byte, envelope types.Envelope, timeout time.Duration) error {
return w.waku.RequestHistoricMessagesWithTimeout(peerID, envelope.Unwrap().(*wakucommon.Envelope), timeout)
}
func (w *gethWakuWrapper) ProcessingP2PMessages() bool {
return w.waku.ProcessingP2PMessages()
}
func (w *gethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) {
w.waku.MarkP2PMessageAsProcessed(hash)
}
func (w *gethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID []byte, r types.MessagesRequest) (*types.StoreRequestCursor, error) {
return nil, errors.New("not implemented")
}
func (w *gethWakuWrapper) ConnectionChanged(_ connection.State) {}
type wakuFilterWrapper struct {
filter *wakucommon.Filter
id string
}
// NewWakuFilterWrapper returns an object that wraps Geth's Filter in a types interface
func NewWakuFilterWrapper(f *wakucommon.Filter, id string) types.Filter {
if f.Messages == nil {
panic("Messages should not be nil")
}
return &wakuFilterWrapper{
filter: f,
id: id,
}
}
// GetWakuFilterFrom retrieves the underlying whisper Filter struct from a wrapped Filter interface
func GetWakuFilterFrom(f types.Filter) *wakucommon.Filter {
return f.(*wakuFilterWrapper).filter
}
// ID returns the filter ID
func (w *wakuFilterWrapper) ID() string {
return w.id
}