/
sync_get.go
148 lines (118 loc) · 4.43 KB
/
sync_get.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package consensus
import (
"fmt"
core "github.com/libp2p/go-libp2p/core"
"github.com/pkg/errors"
"github.com/ngchain/ngcore/ngp2p/wired"
"github.com/ngchain/ngcore/ngtypes"
)
// GetRemoteStatus just get the remote status from remote, and then put it into sync.store.
func (mod *syncModule) getRemoteStatus(peerID core.PeerID) error {
origin := mod.pow.Chain.GetOriginBlock()
latest := mod.pow.Chain.GetLatestBlock().(*ngtypes.FullBlock)
cp := mod.pow.Chain.GetLatestCheckpoint()
id, stream := mod.localNode.SendPing(peerID, origin.GetHeight(), latest.GetHeight(), cp.GetHash(), cp.GetActualDiff().Bytes())
if stream == nil {
log.Infof("failed to send ping, cannot get remote status from %s", peerID) // level down this
return nil
}
reply, err := wired.ReceiveReply(id, stream)
if err != nil {
return err
}
switch reply.Header.Type {
case wired.PongMsg:
pongPayload, err := wired.DecodePongPayload(reply.Payload)
if err != nil {
return err
}
if _, exists := mod.store[peerID]; !exists {
mod.putRemote(peerID, NewRemoteRecord(peerID, pongPayload.Origin, pongPayload.Latest,
pongPayload.CheckpointHash, pongPayload.CheckpointDiff))
} else {
mod.store[peerID].update(pongPayload.Origin, pongPayload.Latest,
pongPayload.CheckpointHash, pongPayload.CheckpointDiff)
}
case wired.RejectMsg:
return errors.Wrapf(ErrMsgRejected, "ping is rejected by remote: %s", string(reply.Payload))
default:
return errors.Wrapf(ErrInvalidMsgType, "remote replies ping with invalid messgae type: %s", reply.Header.Type)
}
return nil
}
// getRemoteChainFromLocalLatest just get the remote status from remote.
func (mod *syncModule) getRemoteChainFromLocalLatest(record *RemoteRecord) (chain []*ngtypes.FullBlock, err error) {
latestHash := mod.pow.Chain.GetLatestBlockHash()
id, s, err := mod.localNode.SendGetChain(record.id, [][]byte{latestHash}, nil) // nil means get MaxBlocks number blocks
if s == nil {
return nil, fmt.Errorf("failed to send getchain: %w", err)
}
reply, err := wired.ReceiveReply(id, s)
if err != nil {
return nil, err
}
switch reply.Header.Type {
case wired.ChainMsg:
chainPayload, err := wired.DecodeChainPayload(reply.Payload)
if err != nil {
return nil, fmt.Errorf("failed to send ping: %w", err)
}
// TODO: add support for hashes etc
return chainPayload.Blocks, err
case wired.RejectMsg:
return nil, errors.Wrapf(ErrMsgRejected, "getchain is rejected by remote: %s", string(reply.Payload))
default:
return nil, errors.Wrapf(ErrInvalidMsgType, "remote replies ping with invalid messgae type: %s", reply.Header.Type)
}
}
var (
ErrMsgRejected = errors.New("message get rejected")
ErrInvalidMsgType = errors.New("invalid message type")
)
// getRemoteChain get the chain from remote node.
func (mod *syncModule) getRemoteChain(peerID core.PeerID, from [][]byte, to []byte) (chain []*ngtypes.FullBlock, err error) {
id, s, err := mod.localNode.SendGetChain(peerID, from, to)
if s == nil {
return nil, fmt.Errorf("failed to send getchain: %w", err)
}
reply, err := wired.ReceiveReply(id, s)
if err != nil {
return nil, err
}
switch reply.Header.Type {
case wired.ChainMsg:
chainPayload, err := wired.DecodeChainPayload(reply.Payload)
if err != nil {
return nil, errors.Wrap(err, "failed to send ping")
}
// TODO: add support for hashes etc
return chainPayload.Blocks, err
case wired.RejectMsg:
return nil, errors.Wrapf(ErrMsgRejected, "getchain failed with %s", string(reply.Payload))
default:
return nil, errors.Wrapf(ErrInvalidMsgType, "remote replies ping with messgae type %s", reply.Header.Type)
}
}
func (mod *syncModule) getRemoteStateSheet(record *RemoteRecord) (sheet *ngtypes.Sheet, err error) {
id, s, err := mod.localNode.SendGetSheet(record.id, record.checkpointHeight, record.checkpointHash)
if s == nil {
return nil, errors.Wrap(err, "failed to send getsheet")
}
reply, err := wired.ReceiveReply(id, s)
if err != nil {
return nil, err
}
switch reply.Header.Type {
case wired.SheetMsg:
sheetPayload, err := wired.DecodeSheetPayload(reply.Payload)
if err != nil {
return nil, errors.Wrap(err, "failed to send ping: %s")
}
// TODO: add support for hashes etc
return sheetPayload.Sheet, err
case wired.RejectMsg:
return nil, errors.Wrapf(ErrMsgRejected, "getsheet is rejected by remote: %s", string(reply.Payload))
default:
return nil, errors.Wrapf(ErrInvalidMsgType, "remote replies with invalid messgae type: %s", reply.Header.Type)
}
}