-
Notifications
You must be signed in to change notification settings - Fork 211
/
handler.go
154 lines (135 loc) · 4.34 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package blocks
import (
"context"
"errors"
"fmt"
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/pubsub"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/blocks"
"github.com/spacemeshos/go-spacemesh/system"
)
var (
errMalformedData = fmt.Errorf("%w: malformed data", pubsub.ErrValidationReject)
errWrongHash = fmt.Errorf("%w: incorrect hash", pubsub.ErrValidationReject)
errDuplicateTX = errors.New("duplicate TxID in proposal")
)
// Handler processes Block fetched from peers during sync.
type Handler struct {
logger log.Log
fetcher system.Fetcher
db *sql.Database
tortoise tortoiseProvider
mesh meshProvider
}
// Opt for configuring BlockHandler.
type Opt func(*Handler)
// WithLogger defines logger for Handler.
func WithLogger(logger log.Log) Opt {
return func(h *Handler) {
h.logger = logger
}
}
// NewHandler creates new Handler.
func NewHandler(f system.Fetcher, db *sql.Database, tortoise tortoiseProvider, m meshProvider, opts ...Opt) *Handler {
h := &Handler{
logger: log.NewNop(),
fetcher: f,
db: db,
tortoise: tortoise,
mesh: m,
}
for _, opt := range opts {
opt(h)
}
return h
}
// HandleSyncedBlock handles Block data from sync.
func (h *Handler) HandleSyncedBlock(ctx context.Context, expHash types.Hash32, peer p2p.Peer, data []byte) error {
logger := h.logger.WithContext(ctx)
var b types.Block
if err := codec.Decode(data, &b); err != nil {
logger.With().Debug("malformed block", log.Err(err))
return errMalformedData
}
// set the block ID when received
b.Initialize()
if b.ID().AsHash32() != expHash {
return fmt.Errorf("%w: block want %s, got %s", errWrongHash, expHash.ShortString(), b.ID().String())
}
if b.LayerIndex <= types.GetEffectiveGenesis() {
return fmt.Errorf("%w: block before effective genesis: layer %v", pubsub.ErrValidationReject, b.LayerIndex)
}
if err := ValidateRewards(b.Rewards); err != nil {
return fmt.Errorf("%w: %s", pubsub.ErrValidationReject, err.Error())
}
logger = logger.WithFields(b.ID(), b.LayerIndex)
if exists, err := blocks.Has(h.db, b.ID()); err != nil {
logger.With().Error("failed to check block exist", log.Err(err))
} else if exists {
logger.Debug("known block")
return nil
}
logger.With().Info("new block")
if missing := h.tortoise.GetMissingActiveSet(b.LayerIndex.GetEpoch(), toAtxIDs(b.Rewards)); len(missing) > 0 {
h.fetcher.RegisterPeerHashes(peer, types.ATXIDsToHashes(missing))
if err := h.fetcher.GetAtxs(ctx, missing); err != nil {
return err
}
}
if len(b.TxIDs) > 0 {
if err := h.checkTransactions(ctx, peer, &b); err != nil {
logger.With().Warning("failed to fetch block TXs", log.Err(err))
return err
}
}
if err := h.mesh.AddBlockWithTXs(ctx, &b); err != nil {
logger.With().Error("failed to save block", log.Err(err))
return fmt.Errorf("save block: %w", err)
}
return nil
}
// ValidateRewards syntactically validates rewards.
func ValidateRewards(rewards []types.AnyReward) error {
if len(rewards) == 0 {
return fmt.Errorf("empty rewards")
}
unique := map[types.ATXID]struct{}{}
for _, reward := range rewards {
if reward.Weight.Num == 0 || reward.Weight.Denom == 0 {
return fmt.Errorf("reward with invalid (zeroed) weight (%d/%d) included into the block for %v", reward.Weight.Num, reward.Weight.Denom, reward.AtxID)
}
if _, exists := unique[reward.AtxID]; exists {
return fmt.Errorf("multiple rewards for the same atx %v", reward.AtxID)
}
unique[reward.AtxID] = struct{}{}
}
return nil
}
func (h *Handler) checkTransactions(ctx context.Context, peer p2p.Peer, b *types.Block) error {
if len(b.TxIDs) == 0 {
return nil
}
set := make(map[types.TransactionID]struct{}, len(b.TxIDs))
for _, tx := range b.TxIDs {
if _, exist := set[tx]; exist {
return errDuplicateTX
}
set[tx] = struct{}{}
}
h.fetcher.RegisterPeerHashes(peer, types.TransactionIDsToHashes(b.TxIDs))
if err := h.fetcher.GetBlockTxs(ctx, b.TxIDs); err != nil {
return fmt.Errorf("block get TXs: %w", err)
}
return nil
}
func toAtxIDs(rewards []types.AnyReward) []types.ATXID {
rst := make([]types.ATXID, 0, len(rewards))
for i := range rewards {
rst = append(rst, rewards[i].AtxID)
}
return rst
}