-
Notifications
You must be signed in to change notification settings - Fork 211
/
find_fork.go
307 lines (278 loc) · 9.14 KB
/
find_fork.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
package syncer
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/fetch"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/layers"
)
var (
ErrPeerMeshChangedMidSession = errors.New("peer mesh changed mid session")
ErrNodeMeshChangedMidSession = errors.New("node mesh changed mid session")
)
type layerHash struct {
layer types.LayerID
hash types.Hash32
created time.Time
}
// boundary is used to define the to and from layers in the hash mesh requests to peers.
// the hashes of the boundary layers are known to the node and are used to double-check that
// the peer has not changed its opinions on those layers.
// if the boundary hashes change during a fork-finding session, the session is aborted.
type boundary struct {
from, to *layerHash
}
func (b *boundary) MarshalLogObject(encoder log.ObjectEncoder) error {
encoder.AddUint32("from", b.from.layer.Uint32())
encoder.AddString("from_hash", b.from.hash.String())
encoder.AddUint32("to", b.to.layer.Uint32())
encoder.AddString("to_hash", b.to.hash.String())
return nil
}
type ForkFinder struct {
logger log.Log
db sql.Executor
fetcher fetcher
maxStaleDuration time.Duration
mu sync.Mutex
agreedPeers map[p2p.Peer]*layerHash
// used to make sure we only resync based on the same layer hash once across runs.
resynced map[types.LayerID]map[types.Hash32]time.Time
}
func NewForkFinder(lg log.Log, db sql.Executor, f fetcher, maxStale time.Duration) *ForkFinder {
return &ForkFinder{
logger: lg,
db: db,
fetcher: f,
maxStaleDuration: maxStale,
agreedPeers: make(map[p2p.Peer]*layerHash),
resynced: make(map[types.LayerID]map[types.Hash32]time.Time),
}
}
// Purge cached agreements with peers.
func (ff *ForkFinder) Purge(all bool, toPurge ...p2p.Peer) {
ff.mu.Lock()
defer ff.mu.Unlock()
if all {
ff.agreedPeers = make(map[p2p.Peer]*layerHash)
return
}
if len(toPurge) > 0 {
for _, p := range toPurge {
delete(ff.agreedPeers, p)
}
return
}
peers := ff.fetcher.GetPeers()
uniquePeers := make(map[p2p.Peer]struct{})
for _, p := range peers {
uniquePeers[p] = struct{}{}
}
for p, lh := range ff.agreedPeers {
if _, ok := uniquePeers[p]; !ok {
if time.Since(lh.created) >= ff.maxStaleDuration {
delete(ff.agreedPeers, p)
}
}
}
for lid, val := range ff.resynced {
for hash, created := range val {
if time.Since(created) >= ff.maxStaleDuration {
delete(ff.resynced[lid], hash)
if len(ff.resynced[lid]) == 0 {
delete(ff.resynced, lid)
}
}
}
}
}
// NumPeersCached returns the number of peer agreement cached.
func (ff *ForkFinder) NumPeersCached() int {
ff.mu.Lock()
defer ff.mu.Unlock()
return len(ff.agreedPeers)
}
func (ff *ForkFinder) AddResynced(lid types.LayerID, hash types.Hash32) {
ff.mu.Lock()
defer ff.mu.Unlock()
if _, ok := ff.resynced[lid]; !ok {
ff.resynced[lid] = make(map[types.Hash32]time.Time)
}
ff.resynced[lid][hash] = time.Now()
}
func (ff *ForkFinder) NeedResync(lid types.LayerID, hash types.Hash32) bool {
ff.mu.Lock()
defer ff.mu.Unlock()
if _, ok := ff.resynced[lid]; ok {
_, resynced := ff.resynced[lid][hash]
return !resynced
}
return true
}
// FindFork finds the point of divergence in layer opinions between the node and the specified peer
// from a given disagreed layer.
func (ff *ForkFinder) FindFork(ctx context.Context, peer p2p.Peer, diffLid types.LayerID, diffHash types.Hash32) (types.LayerID, error) {
logger := ff.logger.WithContext(ctx).WithFields(
log.Stringer("diff_layer", diffLid),
log.Stringer("diff_hash", diffHash),
log.Stringer("peer", peer),
)
logger.With().Info("begin fork finding with peer")
bnd, err := ff.setupBoundary(peer, &layerHash{layer: diffLid, hash: diffHash})
if err != nil {
return 0, err
}
numReqs := 0
if bnd.from.layer.Add(1) == bnd.to.layer {
logger.With().Info("found hash fork with peer",
log.Stringer("fork", bnd.from.layer),
log.Stringer("fork_hash", bnd.from.hash),
log.Stringer("after_fork", bnd.to.layer),
log.Stringer("after_fork_hash", bnd.to.hash),
)
return bnd.from.layer, nil
}
for {
lg := logger.WithFields(log.Object("boundary", bnd))
mh, err := ff.sendRequest(ctx, lg, peer, bnd)
numReqs++
if err != nil {
lg.With().Error("failed hash request", log.Err(err))
return 0, err
}
req := fetch.NewMeshHashRequest(bnd.from.layer, bnd.to.layer)
ownHashes, err := layers.GetAggHashes(ff.db, req.From, req.To, req.Step)
if err != nil {
lg.With().Error("failed own hashes lookup", log.Err(err))
return 0, err
}
lid := req.From
var latestSame, oldestDiff *layerHash
for i, hash := range mh.Hashes {
ownHash := ownHashes[i]
if ownHash != hash {
if latestSame != nil && lid == latestSame.layer.Add(1) {
lg.With().Info("found hash fork with peer",
log.Int("num_reqs", numReqs),
log.Stringer("fork", latestSame.layer),
log.Stringer("fork_hash", latestSame.hash),
log.Stringer("after_fork", lid),
log.Stringer("after_fork_hash", hash),
)
return latestSame.layer, nil
}
oldestDiff = &layerHash{layer: lid, hash: hash}
break
}
latestSame = &layerHash{layer: lid, hash: hash}
ff.updateAgreement(peer, latestSame, time.Now())
lid = lid.Add(req.Step)
if lid.After(req.To) {
lid = req.To
}
}
if latestSame == nil || oldestDiff == nil {
// every layer hash is different/same from node's. this can only happen when
// the node's local hashes change while the mesh hash request is running
ff.Purge(true)
return 0, ErrNodeMeshChangedMidSession
}
bnd, err = ff.setupBoundary(peer, oldestDiff)
if err != nil {
return 0, err
}
}
}
// UpdateAgreement updates the layer at which the peer agreed with the node.
func (ff *ForkFinder) UpdateAgreement(peer p2p.Peer, lid types.LayerID, hash types.Hash32, created time.Time) {
ff.updateAgreement(peer, &layerHash{layer: lid, hash: hash}, created)
}
func (ff *ForkFinder) updateAgreement(peer p2p.Peer, update *layerHash, created time.Time) {
if update == nil {
ff.logger.With().Fatal("invalid arg", log.Stringer("peer", peer))
}
ff.mu.Lock()
defer ff.mu.Unlock()
// unconditional update instead of comparing layers because peers can change its opinions on historical layers.
ff.agreedPeers[peer] = &layerHash{
layer: update.layer,
hash: update.hash,
created: created,
}
}
// setupBoundary sets up the boundary for the hash requests.
// boundary.from contains the latest layer node and peer agree on hash.
// boundary.to contains the oldest layer node and peer disagree on hash.
func (ff *ForkFinder) setupBoundary(peer p2p.Peer, oldestDiff *layerHash) (*boundary, error) {
ff.mu.Lock()
defer ff.mu.Unlock()
var bnd boundary
lastAgreed := ff.agreedPeers[peer]
if lastAgreed != nil {
if lastAgreed.layer.Before(oldestDiff.layer) {
// double check if the node still has the same hash
nodeHash, err := layers.GetAggregatedHash(ff.db, lastAgreed.layer)
if err != nil {
return nil, fmt.Errorf("find fork get boundary hash %v: %w", lastAgreed.layer, err)
}
if nodeHash == lastAgreed.hash {
bnd.from = lastAgreed
} else {
delete(ff.agreedPeers, peer)
}
} else {
delete(ff.agreedPeers, peer)
}
}
if bnd.from == nil {
glid := types.GetEffectiveGenesis()
ghash, err := layers.GetAggregatedHash(ff.db, glid)
if err != nil {
return nil, fmt.Errorf("find fork get genesis hash: %w", err)
}
bnd.from = &layerHash{layer: glid, hash: ghash}
}
bnd.to = oldestDiff
return &bnd, nil
}
// get layer hashes from peers in the range defined by the boundary.
// if the number of hashes is less than maxHashesInReq, then request every hash.
// otherwise, set appropriate params such that the number of hashes requested is maxHashesInReq
// while ensuring hashes for the boundary layers are requested.
func (ff *ForkFinder) sendRequest(ctx context.Context, logger log.Log, peer p2p.Peer, bnd *boundary) (*fetch.MeshHashes, error) {
if bnd == nil {
logger.Fatal("invalid args")
} else if bnd.from == nil || bnd.to == nil || !bnd.to.layer.After(bnd.from.layer) {
logger.With().Fatal("invalid args", log.Object("boundary", bnd))
}
req := fetch.NewMeshHashRequest(bnd.from.layer, bnd.to.layer)
count := req.Count()
logger.With().Debug("sending request", log.Object("req", req))
mh, err := ff.fetcher.PeerMeshHashes(ctx, peer, req)
if err != nil {
return nil, fmt.Errorf("find fork hash req: %w", err)
}
logger.With().Debug("received response",
log.Int("num_hashes", len(mh.Hashes)),
)
if int(count) != len(mh.Hashes) {
return nil, errors.New("inconsistent layers for mesh hashes")
}
if mh.Hashes[0] != bnd.from.hash || mh.Hashes[count-1] != bnd.to.hash {
logger.With().Warning("peer boundary hashes have changed",
log.Stringer("hash_from", bnd.from.hash),
log.Stringer("hash_to", bnd.to.hash),
log.Stringer("peer_hash_from", mh.Hashes[0]),
log.Stringer("peer_hash_to", mh.Hashes[count-1]),
)
ff.Purge(false, peer)
return nil, ErrPeerMeshChangedMidSession
}
return mh, nil
}