/
events.go
57 lines (48 loc) · 1.26 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package service
import (
"context"
"fmt"
v1 "github.com/filecoin-project/venus/venus-shared/api/chain/v1"
"github.com/filecoin-project/venus/venus-shared/types"
)
type NodeEvents struct {
client v1.FullNode
msgService *MessageService
}
func (nd *NodeEvents) listenHeadChangesOnce(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
notifs, err := nd.client.ChainNotify(ctx)
if err != nil {
return err
}
select {
case noti := <-notifs:
if len(noti) != 1 {
return fmt.Errorf("expect hccurrent length 1 but for %d", len(noti))
}
if noti[0].Type != types.HCCurrent {
return fmt.Errorf("expect hccurrent event but got %s ", noti[0].Type)
}
// todo do some check or repaire for the first connect
if err := nd.msgService.ReconnectCheck(ctx, noti[0].Val); err != nil {
return fmt.Errorf("reconnect check error: %v", err)
}
case <-ctx.Done():
return ctx.Err()
}
for notif := range notifs {
// ascending order
var apply []*types.TipSet
for _, change := range notif {
switch change.Type {
case types.HCApply:
apply = append(apply, change.Val)
}
}
if err := nd.msgService.ProcessNewHead(ctx, apply); err != nil {
return fmt.Errorf("process new head error: %v", err)
}
}
return nil
}