-
Notifications
You must be signed in to change notification settings - Fork 179
/
core.go
218 lines (195 loc) · 9.19 KB
/
core.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
// (c) 2019 Dapper Labs - ALL RIGHTS RESERVED
package ingestion
import (
"context"
"errors"
"fmt"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/mempool"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/module/signature"
"github.com/onflow/flow-go/module/trace"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
)
// Core represents core logic of the ingestion engine. It contains logic
// for handling single collection which are channeled from engine in concurrent way.
type Core struct {
log zerolog.Logger // used to log relevant actions with context
tracer module.Tracer // used for tracing
mempool module.MempoolMetrics // used to track mempool metrics
state protocol.State // used to access the protocol state
headers storage.Headers // used to retrieve headers
pool mempool.Guarantees // used to keep pending guarantees in pool
}
func NewCore(
log zerolog.Logger,
tracer module.Tracer,
mempool module.MempoolMetrics,
state protocol.State,
headers storage.Headers,
pool mempool.Guarantees,
) *Core {
return &Core{
log: log.With().Str("ingestion", "core").Logger(),
tracer: tracer,
mempool: mempool,
state: state,
headers: headers,
pool: pool,
}
}
// OnGuarantee is used to process collection guarantees received
// from nodes that are not consensus nodes (notably collection nodes).
// Returns expected errors:
// * engine.InvalidInputError if the collection violates protocol rules
// * engine.UnverifiableInputError if the reference block of the collection is unknown
// * engine.OutdatedInputError if the collection is already expired
// All other errors are unexpected and potential symptoms of internal state corruption.
func (e *Core) OnGuarantee(originID flow.Identifier, guarantee *flow.CollectionGuarantee) error {
span, _, isSampled := e.tracer.StartCollectionSpan(context.Background(), guarantee.CollectionID, trace.CONIngOnCollectionGuarantee)
if isSampled {
span.LogKV("originID", originID.String())
}
defer span.Finish()
guaranteeID := guarantee.ID()
log := e.log.With().
Hex("origin_id", originID[:]).
Hex("collection_id", guaranteeID[:]).
Hex("signers", guarantee.SignerIndices).
Logger()
log.Info().Msg("collection guarantee received")
// skip collection guarantees that are already in our memory pool
exists := e.pool.Has(guaranteeID)
if exists {
log.Debug().Msg("skipping known collection guarantee")
return nil
}
// check collection guarantee's validity
err := e.validateOrigin(originID, guarantee) // retrieve and validate the sender of the collection guarantee
if err != nil {
return fmt.Errorf("origin validation error: %w", err)
}
err = e.validateExpiry(guarantee) // ensure that collection has not expired
if err != nil {
return fmt.Errorf("expiry validation error: %w", err)
}
err = e.validateGuarantors(guarantee) // ensure the guarantors are allowed to produce this collection
if err != nil {
return fmt.Errorf("guarantor validation error: %w", err)
}
// at this point, we can add the guarantee to the memory pool
added := e.pool.Add(guarantee)
if !added {
log.Debug().Msg("discarding guarantee already in pool")
return nil
}
log.Info().Msg("collection guarantee added to pool")
e.mempool.MempoolEntries(metrics.ResourceGuarantee, e.pool.Size())
return nil
}
// validateExpiry validates that the collection has not expired w.r.t. the local
// latest finalized block.
// Expected errors during normal operation:
// * engine.UnverifiableInputError if the reference block of the collection is unknown
// * engine.OutdatedInputError if the collection is already expired
// All other errors are unexpected and potential symptoms of internal state corruption.
func (e *Core) validateExpiry(guarantee *flow.CollectionGuarantee) error {
// get the last finalized header and the reference block header
final, err := e.state.Final().Head()
if err != nil {
return fmt.Errorf("could not get finalized header: %w", err)
}
ref, err := e.headers.ByBlockID(guarantee.ReferenceBlockID)
if errors.Is(err, storage.ErrNotFound) {
return engine.NewUnverifiableInputError("collection guarantee refers to an unknown block (id=%x): %w", guarantee.ReferenceBlockID, err)
}
// if head has advanced beyond the block referenced by the collection guarantee by more than 'expiry' number of blocks,
// then reject the collection
if ref.Height > final.Height {
return nil // the reference block is newer than the latest finalized one
}
if final.Height-ref.Height > flow.DefaultTransactionExpiry {
return engine.NewOutdatedInputErrorf("collection guarantee expired ref_height=%d final_height=%d", ref.Height, final.Height)
}
return nil
}
// validateGuarantors validates that the guarantors of a collection are valid,
// in that they are all from the same cluster and that cluster is allowed to
// produce the given collection w.r.t. the guarantee's reference block.
// Expected errors during normal operation:
// * engine.InvalidInputError if the origin violates any requirements
// * engine.UnverifiableInputError if the reference block of the collection is unknown
// All other errors are unexpected and potential symptoms of internal state corruption.
// TODO: Eventually we should check the signatures, ensure a quorum of the
// cluster, and ensure HotStuff finalization rules. Likely a cluster-specific
// version of the follower will be a good fit for this. For now, collection
// nodes independently decide when a collection is finalized and we only check
// that the guarantors are all from the same cluster. This implementation is NOT BFT.
func (e *Core) validateGuarantors(guarantee *flow.CollectionGuarantee) error {
// get the clusters to assign the guarantee and check if the guarantor is part of it
snapshot := e.state.AtBlockID(guarantee.ReferenceBlockID)
cluster, err := snapshot.Epochs().Current().ClusterByChainID(guarantee.ChainID)
// reference block not found
if errors.Is(err, storage.ErrNotFound) {
return engine.NewUnverifiableInputError(
"could not get clusters with chainID %v for unknown reference block (id=%x): %w", guarantee.ChainID, guarantee.ReferenceBlockID, err)
}
// cluster not found by the chain ID
if errors.Is(err, protocol.ErrClusterNotFound) {
return engine.NewInvalidInputErrorf("cluster not found by chain ID %v: %w", guarantee.ChainID, err)
}
if err != nil {
return fmt.Errorf("internal error retrieving collector clusters for guarantee (ReferenceBlockID: %v, ChainID: %v): %w",
guarantee.ReferenceBlockID, guarantee.ChainID, err)
}
// ensure the guarantors are from the same cluster
clusterMembers := cluster.Members()
// find guarantors by signer indices
guarantors, err := signature.DecodeSignerIndicesToIdentities(clusterMembers, guarantee.SignerIndices)
if err != nil {
if signature.IsInvalidSignerIndicesError(err) {
return engine.NewInvalidInputErrorf("could not decode guarantor indices: %w", err)
}
// unexpected error
return fmt.Errorf("unexpected internal error decoding signer indices: %w", err)
}
// determine whether signers reach minimally required stake threshold
threshold := hotstuff.ComputeWeightThresholdForBuildingQC(clusterMembers.TotalWeight()) // compute required stake threshold
totalStake := flow.IdentityList(guarantors).TotalWeight()
if totalStake < threshold {
return engine.NewInvalidInputErrorf("collection guarantee qc signers have insufficient stake of %d (required=%d)", totalStake, threshold)
}
return nil
}
// validateOrigin validates that the message has a valid sender (origin). We
// only accept guarantees from an origin that is part of the identity table
// at the collection's reference block. Furthermore, the origin must be
// an authorized (i.e. positive weight), non-ejected collector node.
// Expected errors during normal operation:
// * engine.InvalidInputError if the origin violates any requirements
// * engine.UnverifiableInputError if the reference block of the collection is unknown
// All other errors are unexpected and potential symptoms of internal state corruption.
//
// TODO: ultimately, the origin broadcasting a collection is irrelevant, as long as the
// collection itself is valid. The origin is only needed in case the guarantee is found
// to be invalid, in which case we might want to slash the origin.
func (e *Core) validateOrigin(originID flow.Identifier, guarantee *flow.CollectionGuarantee) error {
refState := e.state.AtBlockID(guarantee.ReferenceBlockID)
valid, err := protocol.IsNodeAuthorizedWithRoleAt(refState, originID, flow.RoleCollection)
if err != nil {
// collection with an unknown reference block is unverifiable
if errors.Is(err, storage.ErrNotFound) {
return engine.NewUnverifiableInputError("could not get origin (id=%x) for unknown reference block (id=%x): %w", originID, guarantee.ReferenceBlockID, err)
}
return fmt.Errorf("unexpected error checking collection origin %x at reference block %x: %w", originID, guarantee.ReferenceBlockID, err)
}
if !valid {
return engine.NewInvalidInputErrorf("invalid collection origin (id=%x)", originID)
}
return nil
}