forked from 33cn/plugin
/
paracommitmsg.go
545 lines (476 loc) · 16 KB
/
paracommitmsg.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package para
import (
"bytes"
"context"
"time"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/types"
paracross "github.com/33cn/plugin/plugin/dapp/paracross/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
"github.com/pkg/errors"
)
var (
consensusInterval = 16 //about 1 new block interval
)
type commitMsgClient struct {
paraClient *client
waitMainBlocks int32
commitMsgNotify chan int64
delMsgNotify chan int64
mainBlockAdd chan *types.BlockDetail
currentTx *types.Transaction
checkTxCommitTimes int32
privateKey crypto.PrivKey
quit chan struct{}
}
func (client *commitMsgClient) handler() {
var isSync bool
var notification []int64 //记录每次系统重启后 min and current height
var finishHeight int64
var sendingHeight int64 //当前发送的最大高度
var sendingMsgs []*pt.ParacrossNodeStatus
var readTick <-chan time.Time
client.paraClient.wg.Add(1)
consensusCh := make(chan *pt.ParacrossStatus, 1)
go client.getConsensusHeight(consensusCh)
client.paraClient.wg.Add(1)
priKeyCh := make(chan crypto.PrivKey, 1)
go client.fetchPrivacyKey(priKeyCh)
client.paraClient.wg.Add(1)
sendMsgCh := make(chan *types.Transaction, 1)
go client.sendCommitMsg(sendMsgCh)
out:
for {
select {
case height := <-client.commitMsgNotify:
if notification == nil {
notification = append(notification, height)
notification = append(notification, height)
finishHeight = height - 1
} else {
//[0] need update to min value if any, [1] always get current height, as for fork case, the height may lower than before
if height < notification[0] {
notification[0] = height
finishHeight = height - 1
}
notification[1] = height
if finishHeight >= notification[1] {
finishHeight = notification[1] - 1
}
}
case height := <-client.delMsgNotify:
if len(notification) > 0 && height <= notification[1] {
notification[1] = height - 1
}
if height <= sendingHeight && client.currentTx != nil {
sendingMsgs = nil
client.currentTx = nil
}
case block := <-client.mainBlockAdd:
if client.currentTx != nil && client.paraClient.isCaughtUp {
exist := checkTxInMainBlock(client.currentTx, block)
if exist {
finishHeight = sendingHeight
sendingMsgs = nil
client.currentTx = nil
} else {
client.checkTxCommitTimes++
if client.checkTxCommitTimes > client.waitMainBlocks {
//需要从rawtx构建,nonce需要改,不然会认为重复交易
signTx, _, err := client.calcCommitMsgTxs(sendingMsgs)
if err != nil || signTx == nil {
continue
}
client.currentTx = signTx
client.checkTxCommitTimes = 0
sendMsgCh <- client.currentTx
}
}
}
case <-readTick:
if notification != nil && finishHeight < notification[1] && client.currentTx == nil && isSync {
count := notification[1] - finishHeight
if count > types.TxGroupMaxCount {
count = types.TxGroupMaxCount
}
status, err := client.getNodeStatus(finishHeight+1, finishHeight+count)
if err != nil {
plog.Error("para commit msg read tick", "err", err.Error())
continue
}
signTx, count, err := client.calcCommitMsgTxs(status)
if err != nil || signTx == nil {
continue
}
sendingHeight = finishHeight + count
sendingMsgs = status[:count]
client.currentTx = signTx
client.checkTxCommitTimes = 0
sendMsgCh <- client.currentTx
for i, msg := range sendingMsgs {
plog.Info("paracommitmsg sending", "idx", i, "height", msg.Height, "mainheight", msg.MainBlockHeight,
"blockhash", common.HashHex(msg.BlockHash), "mainHash", common.HashHex(msg.MainBlockHash),
"from", client.paraClient.authAccount)
}
}
//获取正在共识的高度,同步有两层意思,一个是主链跟其他节点完成了同步,另一个是当前平行链节点的高度追赶上了共识高度
case rsp := <-consensusCh:
consensusHeight := rsp.Height
plog.Info("para consensus rcv", "notify", notification, "sending", len(sendingMsgs),
"consens heigt", rsp.Height, "consens blockhash", common.HashHex(rsp.BlockHash), "sync", isSync)
//所有节点还没有共识场景或新节点catchingUp场景,要等到收到区块高度大于共识高度时候发送
if consensusHeight == -1 || (notification != nil && notification[1] > consensusHeight) {
isSync = true
}
//未共识过的小于当前共识高度的区块,可以不参与共识
//如果是新节点,一直等到同步的区块达到了共识高度,才设置同步参与共识
if notification != nil && finishHeight < consensusHeight {
finishHeight = consensusHeight
}
//如果正在发送的共识高度小于已经共识的高度,则取消发送,考虑新节点正在catchingup且新节点的加入能达成2/3共识场景,每次最多发送20 tx,
//但是由于addblock 正在catchingup,没办法确认tx,新tx达成了新的共识高度,需要把sendingmsg置nil,以发送下一笔共识交易
if sendingHeight <= consensusHeight && client.currentTx != nil {
sendingMsgs = nil
client.currentTx = nil
continue
}
//系统每次重启都有检查一次共识,如果共识高度落后于系统起来后完成的第一个高度或最小高度,说明可能有共识空洞,需要把从当前共识高度到完成的
//最大高度重发一遍,直到确认收到,发过的最小到最大高度也要重发是因为之前空洞原因共识不连续,即便满足2/3节点也不会增长,需要重发来触发commit
//此处也整合了当前consensus height=-1 场景
nextConsensHeight := consensusHeight + 1
if notification != nil && nextConsensHeight < notification[0] {
notification[0] = nextConsensHeight
finishHeight = nextConsensHeight - 1
sendingMsgs = nil
client.currentTx = nil
}
case key, ok := <-priKeyCh:
if !ok {
priKeyCh = nil
continue
}
client.privateKey = key
readTick = time.Tick(time.Second * 2)
case <-client.quit:
break out
}
}
client.paraClient.wg.Done()
}
func (client *commitMsgClient) calcCommitMsgTxs(notifications []*pt.ParacrossNodeStatus) (*types.Transaction, int64, error) {
txs, count, err := client.batchCalcTxGroup(notifications)
if err != nil {
txs, err = client.singleCalcTx((notifications)[0])
if err != nil {
plog.Error("single calc tx", "height", notifications[0].Height)
return nil, 0, err
}
return txs, 1, nil
}
return txs, int64(count), nil
}
func (client *commitMsgClient) getTxsGroup(txsArr *types.Transactions) (*types.Transaction, error) {
if len(txsArr.Txs) < 2 {
tx := txsArr.Txs[0]
tx.Sign(types.SECP256K1, client.privateKey)
return tx, nil
}
group, err := types.CreateTxGroup(txsArr.Txs)
if err != nil {
plog.Error("para CreateTxGroup", "err", err.Error())
return nil, err
}
err = group.Check(0, types.GInt("MinFee"))
if err != nil {
plog.Error("para CheckTxGroup", "err", err.Error())
return nil, err
}
for i := range group.Txs {
group.SignN(i, int32(types.SECP256K1), client.privateKey)
}
newtx := group.Tx()
return newtx, nil
}
func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossNodeStatus) (*types.Transaction, int, error) {
var rawTxs types.Transactions
for _, status := range notifications {
tx, err := paracross.CreateRawCommitTx4MainChain(status, pt.ParaX, 0)
if err != nil {
plog.Error("para get commit tx", "block height", status.Height)
return nil, 0, err
}
rawTxs.Txs = append(rawTxs.Txs, tx)
}
txs, err := client.getTxsGroup(&rawTxs)
if err != nil {
return nil, 0, err
}
return txs, len(notifications), nil
}
func (client *commitMsgClient) singleCalcTx(status *pt.ParacrossNodeStatus) (*types.Transaction, error) {
tx, err := paracross.CreateRawCommitTx4MainChain(status, pt.ParaX, 0)
if err != nil {
plog.Error("para get commit tx", "block height", status.Height)
return nil, err
}
tx.Sign(types.SECP256K1, client.privateKey)
return tx, nil
}
// 从ch收到tx有两种可能,readTick和addBlock, 如果
// 3 input case from ch: readTick , addBlock and delMsg to readTick, readTick trigger firstly and will block until received from addBlock
// if sendCommitMsgTx block quite long, write channel will be block in handle(), addBlock will not send new msg until rpc send over
// if sendCommitMsgTx block quite long, if delMsg occur, after send over, ignore previous tx succ or fail, new msg will be rcv and sent
// if sendCommitMsgTx fail, wait 1s resend the failed tx, if new tx rcv from ch, send the new one.
func (client *commitMsgClient) sendCommitMsg(ch chan *types.Transaction) {
var err error
var tx *types.Transaction
resendTimer := time.After(time.Second * 1)
out:
for {
select {
case tx = <-ch:
err = client.sendCommitMsgTx(tx)
if err != nil {
resendTimer = time.After(time.Second * 1)
}
case <-resendTimer:
if err != nil && tx != nil {
err = client.sendCommitMsgTx(tx)
if err != nil {
resendTimer = time.After(time.Second * 1)
}
}
case <-client.quit:
break out
}
}
client.paraClient.wg.Done()
}
func (client *commitMsgClient) sendCommitMsgTx(tx *types.Transaction) error {
if tx == nil {
return nil
}
resp, err := client.paraClient.grpcClient.SendTransaction(context.Background(), tx)
if err != nil {
plog.Error("sendCommitMsgTx send tx", "tx", tx, "err", err.Error())
return err
}
if !resp.GetIsOk() {
plog.Error("sendCommitMsgTx send tx Nok", "tx", tx, "err", string(resp.GetMsg()))
return errors.New(string(resp.GetMsg()))
}
return nil
}
func checkTxInMainBlock(targetTx *types.Transaction, detail *types.BlockDetail) bool {
targetHash := targetTx.Hash()
for i, tx := range detail.Block.Txs {
if bytes.Equal(targetHash, tx.Hash()) && detail.Receipts[i].Ty == types.ExecOk {
return true
}
}
return false
}
//当前未考虑获取key非常多失败的场景, 如果获取height非常多,block模块会比较大,但是使用完了就释放了
//如果有必要也可以考虑每次最多取20个一个txgroup,发送共识部分循环获取发送也没问题
func (client *commitMsgClient) getNodeStatus(start, end int64) ([]*pt.ParacrossNodeStatus, error) {
var ret []*pt.ParacrossNodeStatus
if start == 0 {
geneStatus, err := client.getGenesisNodeStatus()
if err != nil {
return nil, err
}
ret = append(ret, geneStatus)
start++
}
if end < start {
return ret, nil
}
req := &types.ReqBlocks{Start: start, End: end}
count := req.End - req.Start + 1
nodeList := make(map[int64]*pt.ParacrossNodeStatus, count+1)
keys := &types.LocalDBGet{}
for i := 0; i < int(count); i++ {
key := paracross.CalcMinerHeightKey(types.GetTitle(), req.Start+int64(i))
keys.Keys = append(keys.Keys, key)
}
r, err := client.paraClient.GetAPI().LocalGet(keys)
if err != nil {
return nil, err
}
if count != int64(len(r.Values)) {
plog.Error("paracommitmsg get node status key", "expect count", count, "actual count", len(r.Values))
return nil, err
}
for _, val := range r.Values {
status := &pt.ParacrossNodeStatus{}
err = types.Decode(val, status)
if err != nil {
return nil, err
}
if !(status.Height >= req.Start && status.Height <= req.End) {
plog.Error("paracommitmsg decode node status", "height", status.Height, "expect start", req.Start,
"end", req.End, "status", status)
return nil, errors.New("paracommitmsg wrong key result")
}
nodeList[status.Height] = status
}
for i := 0; i < int(count); i++ {
if nodeList[req.Start+int64(i)] == nil {
plog.Error("paracommitmsg get node status key nil", "height", req.Start+int64(i))
return nil, errors.New("paracommitmsg wrong key status result")
}
}
v, err := client.paraClient.GetAPI().GetBlocks(req)
if err != nil {
return nil, err
}
if count != int64(len(v.Items)) {
plog.Error("paracommitmsg get node status block", "expect count", count, "actual count", len(v.Items))
return nil, err
}
for _, block := range v.Items {
if !(block.Block.Height >= req.Start && block.Block.Height <= req.End) {
plog.Error("paracommitmsg get node status block", "height", block.Block.Height, "expect start", req.Start, "end", req.End)
return nil, errors.New("paracommitmsg wrong block result")
}
nodeList[block.Block.Height].BlockHash = block.Block.Hash()
nodeList[block.Block.Height].StateHash = block.Block.StateHash
}
for i := 0; i < int(count); i++ {
ret = append(ret, nodeList[req.Start+int64(i)])
}
return ret, nil
}
func (client *commitMsgClient) getGenesisNodeStatus() (*pt.ParacrossNodeStatus, error) {
var status pt.ParacrossNodeStatus
req := &types.ReqBlocks{Start: 0, End: 0}
v, err := client.paraClient.GetAPI().GetBlocks(req)
if err != nil {
return nil, err
}
block := v.Items[0].Block
if block.Height != 0 {
return nil, errors.New("block chain not return 0 height block")
}
status.Title = types.GetTitle()
status.Height = block.Height
status.PreBlockHash = zeroHash[:]
status.BlockHash = block.Hash()
status.PreStateHash = zeroHash[:]
status.StateHash = block.StateHash
return &status, nil
}
func (client *commitMsgClient) onBlockAdded(height int64) error {
select {
case client.commitMsgNotify <- height:
case <-client.quit:
}
return nil
}
func (client *commitMsgClient) onBlockDeleted(height int64) {
select {
case client.delMsgNotify <- height:
case <-client.quit:
}
}
func (client *commitMsgClient) onMainBlockAdded(block *types.BlockDetail) {
select {
case client.mainBlockAdd <- block:
case <-client.quit:
}
}
//only sync once, as main usually sync, here just need the first sync status after start up
func (client *commitMsgClient) mainSync() error {
req := &types.ReqNil{}
reply, err := client.paraClient.grpcClient.IsSync(context.Background(), req)
if err != nil {
plog.Error("Paracross main is syncing", "err", err.Error())
return err
}
if !reply.IsOk {
plog.Error("Paracross main reply not ok")
return err
}
plog.Info("Paracross main sync succ")
return nil
}
func (client *commitMsgClient) getConsensusHeight(consensusRst chan *pt.ParacrossStatus) {
ticker := time.NewTicker(time.Second * time.Duration(consensusInterval))
isSync := false
defer ticker.Stop()
out:
for {
select {
case <-client.quit:
break out
case <-ticker.C:
if !isSync {
err := client.mainSync()
if err != nil {
continue
}
isSync = true
}
var req types.ChainExecutor
req.Driver = "paracross"
req.FuncName = "GetTitle"
req.Param = types.Encode(&types.ReqString{Data: types.GetTitle()})
ret, err := client.paraClient.grpcClient.QueryChain(context.Background(), &req)
if err != nil {
plog.Error("getConsensusHeight ", "err", err.Error())
continue
}
if !ret.GetIsOk() {
plog.Info("getConsensusHeight", "error", ret.GetMsg())
continue
}
var result pt.ParacrossStatus
types.Decode(ret.Msg, &result)
consensusRst <- &result
}
}
client.paraClient.wg.Done()
}
func (client *commitMsgClient) fetchPrivacyKey(ch chan crypto.PrivKey) {
defer client.paraClient.wg.Done()
if client.paraClient.authAccount == "" {
close(ch)
return
}
req := &types.ReqString{Data: client.paraClient.authAccount}
out:
for {
select {
case <-client.quit:
break out
case <-time.NewTimer(time.Second * 2).C:
msg := client.paraClient.GetQueueClient().NewMessage("wallet", types.EventDumpPrivkey, req)
client.paraClient.GetQueueClient().Send(msg, true)
resp, err := client.paraClient.GetQueueClient().Wait(msg)
if err != nil {
plog.Error("para commit msg sign to wallet", "err", err.Error())
continue
}
str := resp.GetData().(*types.ReplyString).Data
pk, err := common.FromHex(str)
if err != nil && pk == nil {
panic(err)
}
secp, err := crypto.New(types.GetSignName("", types.SECP256K1))
if err != nil {
panic(err)
}
priKey, err := secp.PrivKeyFromBytes(pk)
if err != nil {
panic(err)
}
ch <- priKey
close(ch)
break out
}
}
}