-
Notifications
You must be signed in to change notification settings - Fork 14
/
receiver.go
119 lines (101 loc) · 2.82 KB
/
receiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package consensus
import (
"github.com/AsynkronIT/protoactor-go/actor"
"github.com/qlcchain/go-qlc/common/event"
"github.com/qlcchain/go-qlc/common/topic"
"github.com/qlcchain/go-qlc/common/types"
"github.com/qlcchain/go-qlc/p2p"
"github.com/qlcchain/go-qlc/p2p/protos"
)
type Receiver struct {
eb event.EventBus
subscriber *event.ActorSubscriber
c *Consensus
}
func NewReceiver(eb event.EventBus) *Receiver {
r := &Receiver{
eb: eb,
}
return r
}
func (r *Receiver) init(c *Consensus) {
r.c = c
}
func (r *Receiver) start() error {
r.subscriber = event.NewActorSubscriber(event.Spawn(func(c actor.Context) {
switch msg := c.Message().(type) {
case *topic.EventPublishMsg:
r.ReceivePublish(msg.Block, msg.From)
case *topic.EventConfirmReqMsg:
r.ReceiveConfirmReq(msg.Blocks, msg.From)
case *p2p.EventConfirmAckMsg:
r.ReceiveConfirmAck(msg.Block, msg.From)
case types.StateBlockList:
r.ReceiveSyncBlock(msg)
case *types.StateBlock:
r.ReceiveGenerateBlock(msg)
}
}), r.eb)
if err := r.subscriber.Subscribe(topic.EventPublish, topic.EventConfirmReq, topic.EventConfirmAck, topic.EventSyncBlock,
topic.EventGenerateBlock); err != nil {
return err
}
return nil
}
func (r *Receiver) stop() error {
//r.cleanCacheStop()
return r.subscriber.UnsubscribeAll()
}
func (r *Receiver) ReceivePublish(blk *types.StateBlock, msgFrom string) {
r.c.logger.Debugf("receive publish block [%s] from [%s]", blk.GetHash(), msgFrom)
bs := &BlockSource{
Block: blk,
BlockFrom: types.UnSynchronized,
Type: MsgPublishReq,
}
r.c.ca.ProcessMsg(bs)
}
func (r *Receiver) ReceiveConfirmReq(blk []*types.StateBlock, msgFrom string) {
for _, b := range blk {
r.c.logger.Debugf("receive ConfirmReq block [%s] from [%s]", b.GetHash(), msgFrom)
bs := &BlockSource{
Block: b,
BlockFrom: types.UnSynchronized,
Type: MsgConfirmReq,
}
r.c.ca.ProcessMsg(bs)
}
}
func (r *Receiver) ReceiveConfirmAck(ack *protos.ConfirmAckBlock, msgFrom string) {
r.c.logger.Debugf("receive ConfirmAck for %d blocks [%s] from [%s]", len(ack.Hash), ack.Hash, msgFrom)
valid := IsAckSignValidate(ack)
if !valid {
r.c.logger.Error("ack sign err")
return
}
bs := &BlockSource{
Type: MsgConfirmAck,
Para: ack,
}
r.c.ca.ProcessMsg(bs)
}
func (r *Receiver) ReceiveSyncBlock(blocks types.StateBlockList) {
for _, blk := range blocks {
r.c.logger.Debugf("Sync Event for block:[%s]", blk.GetHash())
bs := &BlockSource{
Block: blk,
BlockFrom: types.Synchronized,
Type: MsgSync,
}
r.c.ca.ProcessMsg(bs)
}
}
func (r *Receiver) ReceiveGenerateBlock(blk *types.StateBlock) {
r.c.logger.Infof("GenerateBlock Event for block:[%s]", blk.GetHash())
bs := &BlockSource{
Block: blk,
BlockFrom: types.UnSynchronized,
Type: MsgGenerateBlock,
}
r.c.ca.ProcessMsg(bs)
}