-
Notifications
You must be signed in to change notification settings - Fork 246
/
api.go
173 lines (154 loc) · 4.42 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package wakuext
import (
"context"
"crypto/ecdsa"
"encoding/hex"
"fmt"
"time"
"github.com/ethereum/go-ethereum/log"
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/services/ext"
waku "github.com/status-im/status-go/waku/common"
)
const (
// defaultWorkTime is a work time reported in messages sent to MailServer nodes.
defaultWorkTime = 5
)
// PublicAPI extends waku public API.
type PublicAPI struct {
*ext.PublicAPI
service *Service
publicAPI types.PublicWakuAPI
log log.Logger
}
// NewPublicAPI returns instance of the public API.
func NewPublicAPI(s *Service) *PublicAPI {
return &PublicAPI{
PublicAPI: ext.NewPublicAPI(s.Service, s.w),
service: s,
publicAPI: s.w.PublicWakuAPI(),
log: log.New("package", "status-go/services/wakuext.PublicAPI"),
}
}
// makeEnvelop makes an envelop for a historic messages request.
// Symmetric key is used to authenticate to MailServer.
// PK is the current node ID.
// DEPRECATED
func makeEnvelop(
payload []byte,
symKey []byte,
publicKey *ecdsa.PublicKey,
nodeID *ecdsa.PrivateKey,
pow float64,
now time.Time,
) (types.Envelope, error) {
params := waku.MessageParams{
PoW: pow,
Payload: payload,
WorkTime: defaultWorkTime,
Src: nodeID,
}
// Either symKey or public key is required.
// This condition is verified in `message.Wrap()` method.
if len(symKey) > 0 {
params.KeySym = symKey
} else if publicKey != nil {
params.Dst = publicKey
}
message, err := waku.NewSentMessage(¶ms)
if err != nil {
return nil, err
}
envelope, err := message.Wrap(¶ms, now)
if err != nil {
return nil, err
}
return gethbridge.NewWakuEnvelope(envelope), nil
}
// RequestMessages sends a request for historic messages to a MailServer.
func (api *PublicAPI) RequestMessages(_ context.Context, r ext.MessagesRequest) (types.HexBytes, error) {
api.log.Info("RequestMessages", "request", r)
now := api.service.w.GetCurrentTime()
r.SetDefaults(now)
if r.From > r.To {
return nil, fmt.Errorf("Query range is invalid: from > to (%d > %d)", r.From, r.To)
}
mailServerNode, err := api.service.GetPeer(r.MailServerPeer)
if err != nil {
return nil, fmt.Errorf("%v: %v", ext.ErrInvalidMailServerPeer, err)
}
var (
symKey []byte
publicKey *ecdsa.PublicKey
)
if r.SymKeyID != "" {
symKey, err = api.service.w.GetSymKey(r.SymKeyID)
if err != nil {
return nil, fmt.Errorf("%v: %v", ext.ErrInvalidSymKeyID, err)
}
} else {
publicKey = mailServerNode.Pubkey()
}
payload, err := ext.MakeMessagesRequestPayload(r)
if err != nil {
return nil, err
}
envelope, err := makeEnvelop(
payload,
symKey,
publicKey,
api.service.NodeID(),
api.service.w.MinPow(),
now,
)
if err != nil {
return nil, err
}
hash := envelope.Hash()
if !r.Force {
err = api.service.RequestsRegistry().Register(hash, r.Topics)
if err != nil {
return nil, err
}
}
if err := api.service.w.RequestHistoricMessagesWithTimeout(mailServerNode.ID().Bytes(), envelope, r.Timeout*time.Second); err != nil {
if !r.Force {
api.service.RequestsRegistry().Unregister(hash)
}
return nil, err
}
return hash[:], nil
}
// RequestMessagesSync repeats MessagesRequest using configuration in retry conf.
func (api *PublicAPI) RequestMessagesSync(conf ext.RetryConfig, r ext.MessagesRequest) (ext.MessagesResponse, error) {
var resp ext.MessagesResponse
events := make(chan types.EnvelopeEvent, 10)
var (
requestID types.HexBytes
err error
retries int
)
for retries <= conf.MaxRetries {
sub := api.service.w.SubscribeEnvelopeEvents(events)
r.Timeout = conf.BaseTimeout + conf.StepTimeout*time.Duration(retries)
timeout := r.Timeout
// FIXME this weird conversion is required because MessagesRequest expects seconds but defines time.Duration
r.Timeout = time.Duration(int(r.Timeout.Seconds()))
requestID, err = api.RequestMessages(context.Background(), r)
if err != nil {
sub.Unsubscribe()
return resp, err
}
mailServerResp, err := ext.WaitForExpiredOrCompleted(types.BytesToHash(requestID), events, timeout)
sub.Unsubscribe()
if err == nil {
resp.Cursor = hex.EncodeToString(mailServerResp.Cursor)
resp.Error = mailServerResp.Error
return resp, nil
}
retries++
api.log.Error("[RequestMessagesSync] failed", "err", err, "retries", retries)
}
return resp, fmt.Errorf("failed to request messages after %d retries", retries)
}