forked from justusranvier/bmd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rpcmethods.go
284 lines (252 loc) · 9.01 KB
/
rpcmethods.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
// Copyright (c) 2015 Monetas.
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"crypto/sha256"
"crypto/subtle"
"encoding/base64"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/cenkalti/rpc2"
"github.com/monetas/bmd/database"
"github.com/monetas/bmutil"
"github.com/monetas/bmutil/pow"
"github.com/monetas/bmutil/wire"
)
// RPCAuthArgs contains arguments for Authenticate.
type RPCAuthArgs struct {
Username string `json:"username"`
Password string `json:"password"`
}
// handleAuth authenticates a websocket client using the supplied username and
// password. If the supplied authentication does not match the username and
// password expected, an error is returned.
//
// This check is time-constant.
//
// The function sets the values of isAuthenticated and isAdmin for the client.
// The first bool return value signifies auth success (true if successful) and
// the second bool return value specifies whether the user can change the state
// of the server (true) or whether the user is limited (false). The second is
// always false if the first is.
func (s *rpcServer) handleAuth(client *rpc2.Client, in *RPCAuthArgs, success *bool) error {
login := in.Username + ":" + in.Password
authsha := sha256.Sum256([]byte(login))
c := client.State
// Check for limited auth first as in environments with limited users, those
// are probably expected to have a higher volume of calls
limitcmp := subtle.ConstantTimeCompare(authsha[:], s.limitauthsha[:])
if limitcmp == 1 {
c.Set(rpcStateIsAuthenticated, true)
c.Set(rpcStateIsAdmin, false)
*success = true
return nil
}
// Check for admin-level auth
cmp := subtle.ConstantTimeCompare(authsha[:], s.authsha[:])
if cmp == 1 {
c.Set(rpcStateIsAuthenticated, true)
c.Set(rpcStateIsAdmin, true)
*success = true
return nil
}
*success = false
state := rpcConstructState(client)
rpcLog.Warnf("RPC authentication failure from %s.", state.remoteAddr)
return nil
}
// objectsSend sends the requested object into the Bitmessage network. in is
// a base64 representation of the object.
func (s *rpcServer) sendObject(client *rpc2.Client, in string, counter *uint64) error {
if err := s.restrictAuth(client); err != nil {
return err
}
data, err := base64.StdEncoding.DecodeString(in)
if err != nil {
return errors.New("base64 decode failed")
}
// Check whether the object is valid.
obj, err := wire.DecodeMsgObject(data)
if err != nil {
return fmt.Errorf("invalid object: %v", err)
}
if time.Now().After(obj.ExpiresTime) { // already expired
return errors.New("object already expired")
}
if obj.StreamNumber != 1 { // TODO improve
return errors.New("invalid stream")
}
// Check whether the PoW is valid.
if !pow.Check(obj, pow.DefaultExtraBytes, pow.DefaultNonceTrialsPerByte,
time.Now()) {
return errors.New("invalid proof of work")
}
// Relay object to object manager which will handle insertion and
// advertisement.
*counter = s.server.objectManager.handleInsert(obj)
if *counter == 0 {
return errors.New("failed to insert and advertise object")
}
return nil
}
// RPCGetIDOut contains the output of GetIdentity.
type RPCGetIDOut struct {
Address string `json:"address"`
NonceTrialsPerByte uint64 `json:"nonceTrialsPerByte"`
ExtraBytes uint64 `json:"extraBytes"`
// base64 encoded bytes
SigningKey string `json:"signingKey"`
EncryptionKey string `json:"encryptionKey"`
}
// handleGetId returns the stored public key associated with the given
// Bitmessage address.
func (s *rpcServer) getID(client *rpc2.Client, addr string, id *RPCGetIDOut) error {
if err := s.restrictAuth(client); err != nil {
return err
}
address, err := bmutil.DecodeAddress(addr)
if err != nil {
return fmt.Errorf("address decode failed: %v", err)
}
pubID, err := s.server.db.FetchIdentityByAddress(address)
if err == database.ErrNonexistentObject {
return errors.New("identity not found")
} else if err != nil {
rpcLog.Errorf("FetchIdentityByAddress, database error: %v", err)
return errors.New("database error")
}
id.Address = addr
id.NonceTrialsPerByte = pubID.NonceTrialsPerByte
id.ExtraBytes = pubID.ExtraBytes
id.EncryptionKey = base64.StdEncoding.EncodeToString(
pubID.EncryptionKey.SerializeUncompressed())
id.SigningKey = base64.StdEncoding.EncodeToString(
pubID.SigningKey.SerializeUncompressed())
return nil
}
// RPCSubscribeArgs contains the input for Subscribe methods.
type RPCSubscribeArgs struct {
FromCounter uint64 `json:"fromCounter"`
}
// RPCReceiveArgs contains the input for Receive methods on the client side.
type RPCReceiveArgs struct {
Object string `json:"object"`
Counter uint64 `json:"counter"`
}
// subscribeMessages subscribes the client to receiving objects of type message
// as soon as they are received by bmd. On the client side, ReceiveMessage RPC
// method is called.
func (s *rpcServer) subscribeMessages(client *rpc2.Client, args *RPCSubscribeArgs,
_ *struct{}) error {
return s.handleSubscribe(client, wire.ObjectTypeMsg, args, rpcEvtNewMessage,
rpcClientHandleMessage)
}
// subscribeBroadcasts subscribes the client to receiving objects of type
// broadcast as soon as they are received by bmd. On the client side,
// ReceiveBroadcast RPC method is called.
func (s *rpcServer) subscribeBroadcasts(client *rpc2.Client, args *RPCSubscribeArgs,
_ *struct{}) error {
return s.handleSubscribe(client, wire.ObjectTypeBroadcast, args,
rpcEvtNewBroadcast, rpcClientHandleBroadcast)
}
// subscribeGetpubkeys subscribes the client to receiving objects of type
// getpubkey as soon as they are received by bmd. On the client side,
// ReceiveGetpubkey RPC method is called.
func (s *rpcServer) subscribeGetpubkeys(client *rpc2.Client, args *RPCSubscribeArgs,
_ *struct{}) error {
return s.handleSubscribe(client, wire.ObjectTypeGetPubKey, args,
rpcEvtNewGetpubkey, rpcClientHandleGetpubkey)
}
// subscribePubkeys subscribes the client to receiving objects of type
// pubkey as soon as they are received by bmd. On the client side,
// ReceivePubkey RPC method is called.
func (s *rpcServer) subscribePubkeys(client *rpc2.Client, args *RPCSubscribeArgs,
_ *struct{}) error {
return s.handleSubscribe(client, wire.ObjectTypePubKey, args,
rpcEvtNewPubkey, rpcClientHandlePubkey)
}
// subscribeUnknownObjects subscribes the client to receiving objects of unknown
// type as soon as they are received by bmd. On the client side,
// ReceiveUnknownObject RPC method is called.
func (s *rpcServer) subscribeUnknownObjects(client *rpc2.Client, args *RPCSubscribeArgs,
_ *struct{}) error {
// XXX just a hack
return s.handleSubscribe(client, wire.ObjectType(999999), args,
rpcEvtNewUnknownObj, rpcClientHandleUnknownObj)
}
func (s *rpcServer) handleSubscribe(client *rpc2.Client, objType wire.ObjectType,
args *RPCSubscribeArgs, evt string, clientHandler string) error {
// Make sure only authenticated users can subscribe to objects.
if err := s.restrictAuth(client); err != nil {
return err
}
state := rpcConstructState(client)
s.evtMgr.On(evt, func(out *RPCReceiveArgs) {
err := client.Call(clientHandler, out, nil)
if err != nil {
rpcLog.Infof("failed to call %s on client %s: %v", clientHandler,
state.remoteAddr, err)
client.Close()
}
}, state.eventsID)
// We subscribe to event before sending old objects because otherwise there
// might be misses because of race conditions. Duplication >> Misses.
return s.sendOldObjects(client, objType, args.FromCounter, clientHandler)
}
// sendOldObjects is used to send objects of a particular type starting from a
// fixed counter value to the client.
func (s *rpcServer) sendOldObjects(client *rpc2.Client, objType wire.ObjectType,
fromCounter uint64, clientHandler string) error {
objs, lastCount, err := s.server.db.FetchObjectsFromCounter(objType,
fromCounter, rpcCounterObjectsSize)
if err != nil {
rpcLog.Errorf("FetchObjectsFromCounter, database error: %v", err)
return errors.New("database error")
}
state := rpcConstructState(client)
wg := sync.WaitGroup{}
var callError uint64
for counter, msg := range objs {
out := &RPCReceiveArgs{
Object: base64.StdEncoding.EncodeToString(wire.EncodeMessage(msg)),
Counter: counter,
}
// Send objects to client. Terminate all requests if one fails.
go func() {
wg.Add(1)
call := client.Go(clientHandler, out, nil, nil)
out:
for {
select {
case <-call.Done:
if call.Error != nil {
rpcLog.Infof("failed to call %s on client %s: %v",
clientHandler, state.remoteAddr, err)
// Can't use channels because of possible race
// conditions while trying to close them.
atomic.StoreUint64(&callError, 1)
}
break out
default:
if atomic.LoadUint64(&callError) == 1 {
break out
}
}
}
wg.Done()
}()
}
wg.Wait() // wait for all requests to finish
if callError == 1 { // there was an error
client.Close()
}
// We might have more objects to send.
if len(objs) == rpcCounterObjectsSize {
return s.sendOldObjects(client, objType, lastCount, clientHandler)
}
return nil
}