/
client.go
421 lines (373 loc) · 11.8 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
package client
import (
"context"
"encoding/hex"
"time"
"github.com/iov-one/weave"
"github.com/iov-one/weave/app"
"github.com/iov-one/weave/errors"
"github.com/iov-one/weave/x/sigs"
cmn "github.com/tendermint/tendermint/libs/common"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
tmtypes "github.com/tendermint/tendermint/types"
)
// BroadcastTxSyncDefaultTimeOut timeout for sync tx broadcasting
const BroadcastTxSyncDefaultTimeOut = 15 * time.Second
var QueryNewBlockHeader = tmtypes.EventQueryNewBlockHeader
// Client is an interface to interact with weave apps
type Client interface {
// TendermintClient returns the underlying tendermint client
TendermintClient() client.Client
// GetUser will return nonce and public key registered
// for a given address if it was ever used.
GetUser(addr weave.Address) (*UserResponse, error)
// GetWallet will return a wallet given an address
GetWallet(addr weave.Address) (*WalletResponse, error)
// BroadcastTx serializes a signed transaction and writes to the
// blockchain. It returns when the tx is committed to the blockchain.
BroadcastTx(tx weave.Tx) BroadcastTxResponse
// BroadcastTxAsync can be run in a goroutine and will output the
// result or error to the given channel.
BroadcastTxAsync(tx weave.Tx, out chan<- BroadcastTxResponse)
// BroadcastTxSync brodcasts transactions synchronously
BroadcastTxSync(tx weave.Tx, timeout time.Duration) BroadcastTxResponse
// AbciQuery calls abci query on tendermint rpc.
AbciQuery(path string, data []byte) (AbciResponse, error)
// NextNonce queries the blockchain for the next nonce
NextNonce(client Client, addr weave.Address) (int64, error)
}
// BlogClient is a tendermint client wrapped to provide
// simple access to the data structures used in blog module.
type BlogClient struct {
conn client.Client
// subscriber is a unique identifier for subscriptions
subscriber string
}
// NewClient wraps a BlogClient around an existing
// tendermint client connection.
func NewClient(conn client.Client) *BlogClient {
return &BlogClient{
conn: conn,
subscriber: "tools-client",
}
}
// TendermintClient returns underlying tendermint client
func (cc *BlogClient) TendermintClient() client.Client {
return cc.conn
}
//************ generic (weave) functionality *************//
// Status will return the raw status from the node
func (cc *BlogClient) Status() (*ctypes.ResultStatus, error) {
return cc.conn.Status()
}
// Genesis will return the genesis directly from the node
func (cc *BlogClient) Genesis() (*tmtypes.GenesisDoc, error) {
gen, err := cc.conn.Genesis()
if err != nil {
return nil, err
}
return gen.Genesis, nil
}
// ChainID will parse out the chainID from the status result
func (cc *BlogClient) ChainID() (string, error) {
gen, err := cc.Genesis()
if err != nil {
return "", err
}
return gen.ChainID, nil
}
// Height will parse out the Height from the status result
func (cc *BlogClient) Height() (int64, error) {
status, err := cc.conn.Status()
if err != nil {
return -1, err
}
return status.SyncInfo.LatestBlockHeight, nil
}
// AbciResponse contains a query result:
// a (possibly empty) list of key-value pairs, and the height
// at which it queried
type AbciResponse struct {
// a list of key/value pairs
Models []weave.Model
Height int64
}
// AbciQuery calls abci query on tendermint rpc,
// verifies if it is an error or empty, and if there is
// data pulls out the ResultSets from keys and values into
// a useful AbciResponse struct
func (cc *BlogClient) AbciQuery(path string, data []byte) (AbciResponse, error) {
var out AbciResponse
q, err := cc.conn.ABCIQuery(path, data)
if err != nil {
return out, err
}
resp := q.Response
if resp.IsErr() {
return out, errors.ABCIError(resp.Code, resp.Log)
}
out.Height = resp.Height
if len(resp.Key) == 0 {
return out, nil
}
// assume there is data, parse the result sets
var keys, vals app.ResultSet
err = keys.Unmarshal(resp.Key)
if err != nil {
return out, err
}
err = vals.Unmarshal(resp.Value)
if err != nil {
return out, err
}
out.Models, err = app.JoinResults(&keys, &vals)
return out, err
}
// TxSearch searches transactions using underlying tendermint client
func (cc *BlogClient) TxSearch(query string, prove bool, page, perPage int) (*ctypes.ResultTxSearch, error) {
return cc.conn.TxSearch(query, prove, page, perPage)
}
// BroadcastTxResponse is the result of submitting a transaction.
type BroadcastTxResponse struct {
Error error // not-nil if there was an error sending
Response *ctypes.ResultBroadcastTxCommit // not-nil if we got response from node
}
// IsError returns the error for failure if it failed,
// or null if it succeeded
func (b BroadcastTxResponse) IsError() error {
if b.Error != nil {
return b.Error
}
if b.Response.CheckTx.IsErr() {
ctx := b.Response.CheckTx
return errors.Wrap(errors.ABCIError(ctx.Code, ctx.Log), "CheckTx error")
}
if b.Response.DeliverTx.IsErr() {
dtx := b.Response.DeliverTx
return errors.Wrap(errors.ABCIError(dtx.Code, dtx.Log), "DeliverTx error")
}
return nil
}
// BroadcastTx serializes a signed transaction and writes to the
// blockchain. It returns when the tx is committed to the
// blockchain.
//
// If you want high-performance, parallel sending, use BroadcastTxAsync
func (cc *BlogClient) BroadcastTx(tx weave.Tx) BroadcastTxResponse {
out := make(chan BroadcastTxResponse, 1)
defer close(out)
go cc.BroadcastTxAsync(tx, out)
res := <-out
return res
}
// BroadcastTxSync brodcasts transactions synchronously
func (cc *BlogClient) BroadcastTxSync(tx weave.Tx, timeout time.Duration) BroadcastTxResponse {
data, err := tx.Marshal()
if err != nil {
return BroadcastTxResponse{Error: err}
}
res, err := cc.conn.BroadcastTxSync(data)
if err != nil {
return BroadcastTxResponse{Error: err}
}
if res.Code != 0 {
err = errors.Wrap(errors.ABCIError(res.Code, res.Log), "CheckTx error")
return BroadcastTxResponse{Error: err}
}
// and wait for confirmation
evt, err := cc.WaitForTxEvent(data, tmtypes.EventTx, timeout)
if err != nil {
return BroadcastTxResponse{Error: err}
}
txe, ok := evt.(tmtypes.EventDataTx)
if !ok {
if err != nil {
err = errors.Wrap(err, "WaitForOneEvent did not return an EventDataTx object")
return BroadcastTxResponse{Error: err}
}
}
return BroadcastTxResponse{
Response: &ctypes.ResultBroadcastTxCommit{
DeliverTx: txe.Result,
Height: txe.Height,
Hash: txe.Tx.Hash(),
},
}
}
// WaitForTxEvent listens for and particular event type of evtTyp to be fired
func (cc *BlogClient) WaitForTxEvent(tx tmtypes.Tx, evtTyp string, timeout time.Duration) (tmtypes.TMEventData, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
query := tmtypes.EventQueryTxFor(tx)
uuid := hex.EncodeToString(append(tx.Hash(), cmn.RandBytes(2)...))
evts, err := cc.conn.Subscribe(ctx, uuid, query.String())
if err != nil {
return nil, errors.Wrap(err, "failed to subscribe")
}
// make sure to unregister after the test is over
defer cc.conn.UnsubscribeAll(ctx, uuid)
select {
case evt := <-evts:
return evt.Data.(tmtypes.TMEventData), nil
case <-ctx.Done():
return nil, errors.Wrap(errors.ErrTimeout, "waiting for event timed out")
}
}
// BroadcastTxAsync can be run in a goroutine and will output
// the result or error to the given channel.
// Useful if you want to send many tx in parallel
func (cc *BlogClient) BroadcastTxAsync(tx weave.Tx, out chan<- BroadcastTxResponse) {
data, err := tx.Marshal()
if err != nil {
out <- BroadcastTxResponse{Error: err}
return
}
// TODO: make this async, maybe adjust return value
res, err := cc.conn.BroadcastTxCommit(data)
msg := BroadcastTxResponse{
Error: err,
Response: res,
}
out <- msg
}
// SubscribeHeaders queries for headers and starts a goroutine
// to typecase the events into Headers. Returns a cancel
// function. If you don't want the automatic goroutine, use
// Subscribe(QueryNewBlockHeader, out)
func (cc *BlogClient) SubscribeHeaders(out chan<- *tmtypes.Header) (func(), error) {
query := tmtypes.EventQueryNewBlockHeader
pipe, cancel, err := cc.Subscribe(query)
if err != nil {
return nil, err
}
go func() {
for msg := range pipe {
evt, ok := msg.Data.(tmtypes.EventDataNewBlockHeader)
if !ok {
// TODO: something else?
panic("Unexpected event type")
}
out <- &evt.Header
}
close(out)
}()
return cancel, nil
}
// Subscribe will take an arbitrary query and push all events to
// the given channel. If there is no error,
// returns a cancel function that can be called to cancel
// the subscription
func (cc *BlogClient) Subscribe(query tmpubsub.Query) (<-chan ctypes.ResultEvent, func(), error) {
ctx := context.Background()
out, err := cc.conn.Subscribe(ctx, cc.subscriber, query.String())
if err != nil {
return out, nil, err
}
cancel := func() {
cc.conn.Unsubscribe(ctx, cc.subscriber, query.String())
}
return out, cancel, nil
}
// UnsubscribeAll cancels all subscriptions
func (cc *BlogClient) UnsubscribeAll() error {
ctx := context.Background()
return cc.conn.UnsubscribeAll(ctx, cc.subscriber)
}
// GetWallet will return a wallet given an address
// If non wallet is present, it will return (nil, nil)
// Error codes are used when the query failed on the server
func (cc *BlogClient) GetWallet(addr weave.Address) (*WalletResponse, error) {
// make sure we send a valid address to the server
err := addr.Validate()
if err != nil {
return nil, errors.Wrap(err, "invalid address")
}
resp, err := cc.AbciQuery("/wallets", addr)
if err != nil {
return nil, err
}
if len(resp.Models) == 0 { // empty list or nil
return nil, errors.Wrap(errors.ErrNotFound, "model not found")
}
// assume only one result
model := resp.Models[0]
// make sure the return value is expected
acct := walletKeyToAddr(model.Key)
if !addr.Equals(acct) {
return nil, errors.Wrapf(ErrNoMatch, "queried %s, returned %s", addr, acct)
}
out := WalletResponse{
Address: acct,
Height: resp.Height,
}
// parse the value as wallet bytes
err = out.Wallet.Unmarshal(model.Value)
if err != nil {
return nil, err
}
return &out, nil
}
// key is the address prefixed with "wallet:"
func walletKeyToAddr(key []byte) weave.Address {
return key[5:]
}
// UserResponse is a response on a query for a User
type UserResponse struct {
Address weave.Address
UserData sigs.UserData
Height int64
}
// GetUser will return nonce and public key registered
// for a given address if it was ever used.
// If it returns (nil, nil), then this address never signed
// a transaction before (and can use nonce = 0)
func (cc *BlogClient) GetUser(addr weave.Address) (*UserResponse, error) {
// make sure we send a valid address to the server
err := addr.Validate()
if err != nil {
return nil, errors.Wrap(err, "invalid address")
}
resp, err := cc.AbciQuery("/auth", addr)
if err != nil {
return nil, err
}
if len(resp.Models) == 0 { // empty list or nil
return nil, nil
}
// assume only one result
model := resp.Models[0]
// make sure the return value is expected
acct := userKeyToAddr(model.Key)
if !addr.Equals(acct) {
return nil, errors.Wrapf(ErrNoMatch, "queried %s, returned %s", addr, acct)
}
out := UserResponse{
Address: acct,
Height: resp.Height,
}
// parse the value as wallet bytes
err = out.UserData.Unmarshal(model.Value)
if err != nil {
return nil, err
}
return &out, nil
}
// key is the address prefixed with "sigs:"
func userKeyToAddr(key []byte) weave.Address {
return key[5:]
}
// NextNonce queries the blockchain for the next nonce
// returns 0 if the address never used
func (cc *BlogClient) NextNonce(addr weave.Address) (int64, error) {
user, err := cc.GetUser(addr)
if err != nil {
return 0, err
}
if user != nil {
return user.UserData.Sequence, nil
}
// new account starts at 0
return 0, nil
}