/
handle_sync.go
78 lines (67 loc) · 1.78 KB
/
handle_sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package sync
import (
"context"
"encoding/hex"
"fmt"
"github.com/protolambda/rumor/chain"
bdb "github.com/protolambda/rumor/chain/db/blocks"
"github.com/protolambda/zrnt/eth2/beacon"
"github.com/sirupsen/logrus"
)
type SyncFn func(blocksCh chan<- *beacon.SignedBeaconBlock) error
type handleSync struct {
Log logrus.FieldLogger
Blocks bdb.DB
Chain chain.FullChain
ExpectedCount uint64
Store bool
Process bool
}
func (c handleSync) handle(processingCtx context.Context, runSync SyncFn) error {
blocksCh := make(chan *beacon.SignedBeaconBlock, c.ExpectedCount)
var syncErr error
go func() {
// after sync request is done, close the channel and stop sync
defer close(blocksCh)
syncErr = runSync(blocksCh)
}()
spec := c.Blocks.Spec()
// TODO: option to buffer batches of blocks, to then process as aggregate.
i := 0
processLoop:
for {
select {
case block, ok := <-blocksCh:
if !ok {
break processLoop
}
i += 1
withRoot := bdb.WithRoot(spec, block)
if c.Process {
if err := c.Chain.AddBlock(processingCtx, block); err != nil {
return fmt.Errorf("failed to process block: %v", err)
}
c.Log.WithFields(logrus.Fields{
"i": i,
"slot": block.Message.Slot,
"root": hex.EncodeToString(withRoot.Root[:]),
}).Debug("processed block")
}
if c.Store {
exists, err := c.Blocks.Store(processingCtx, withRoot)
if err != nil {
return fmt.Errorf("failed to store block: %v", err)
}
c.Log.WithFields(logrus.Fields{
"known": exists,
"i": i,
"slot": block.Message.Slot,
"root": hex.EncodeToString(withRoot.Root[:]),
}).Debug("stored block")
}
case <-processingCtx.Done():
return fmt.Errorf("block processing stopped early, only processed %d blocks", i)
}
}
return syncErr
}