-
Notifications
You must be signed in to change notification settings - Fork 178
/
consumer.go
127 lines (106 loc) · 4.18 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package blockconsumer
import (
"fmt"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/verification/assigner"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/jobqueue"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
)
// DefaultBlockWorkers is the number of blocks processed in parallel.
const DefaultBlockWorkers = uint64(2)
// BlockConsumer listens to the OnFinalizedBlock event
// and notifies the consumer to check in the job queue
// (i.e., its block reader) for new block jobs.
type BlockConsumer struct {
consumer module.JobConsumer
defaultIndex uint64
unit *engine.Unit
metrics module.VerificationMetrics
}
// defaultProcessedIndex returns the last sealed block height from the protocol state.
//
// The BlockConsumer utilizes this return height to fetch and consume block jobs from
// jobs queue the first time it initializes.
func defaultProcessedIndex(state protocol.State) (uint64, error) {
final, err := state.Sealed().Head()
if err != nil {
return 0, fmt.Errorf("could not get finalized height: %w", err)
}
return final.Height, nil
}
// NewBlockConsumer creates a new consumer and returns the default processed
// index for initializing the processed index in storage.
func NewBlockConsumer(log zerolog.Logger,
metrics module.VerificationMetrics,
processedHeight storage.ConsumerProgress,
blocks storage.Blocks,
state protocol.State,
blockProcessor assigner.FinalizedBlockProcessor,
maxProcessing uint64) (*BlockConsumer, uint64, error) {
lg := log.With().Str("module", "block_consumer").Logger()
// wires blockProcessor as the worker. The block consumer will
// invoke instances of worker concurrently to process block jobs.
worker := newWorker(blockProcessor)
blockProcessor.WithBlockConsumerNotifier(worker)
// the block reader is where the consumer reads new finalized blocks from (i.e., jobs).
jobs := NewFinalizedBlockReader(state, blocks)
consumer := jobqueue.NewConsumer(lg, jobs, processedHeight, worker, maxProcessing)
defaultIndex, err := defaultProcessedIndex(state)
if err != nil {
return nil, 0, fmt.Errorf("could not read default processed index: %w", err)
}
blockConsumer := &BlockConsumer{
consumer: consumer,
defaultIndex: defaultIndex,
unit: engine.NewUnit(),
metrics: metrics,
}
worker.withBlockConsumer(blockConsumer)
return blockConsumer, defaultIndex, nil
}
// NotifyJobIsDone is invoked by the worker to let the consumer know that it is done
// processing a (block) job.
func (c *BlockConsumer) NotifyJobIsDone(jobID module.JobID) {
processedIndex := c.consumer.NotifyJobIsDone(jobID)
c.metrics.OnBlockConsumerJobDone(processedIndex)
}
// Size returns number of in-memory block jobs that block consumer is processing.
func (c *BlockConsumer) Size() uint {
return c.consumer.Size()
}
// OnFinalizedBlock implements FinalizationConsumer, and is invoked by the follower engine whenever
// a new block is finalized.
// In this implementation for block consumer, invoking OnFinalizedBlock is enough to only notify the consumer
// to check its internal queue and move its processing index ahead to the next height if there are workers available.
// The consumer retrieves the new blocks from its block reader module, hence it does not need to use the parameter
// of OnFinalizedBlock here.
func (c *BlockConsumer) OnFinalizedBlock(*model.Block) {
c.unit.Launch(c.consumer.Check)
}
// OnBlockIncorporated is to implement FinalizationConsumer
func (c *BlockConsumer) OnBlockIncorporated(*model.Block) {}
// OnDoubleProposeDetected is to implement FinalizationConsumer
func (c *BlockConsumer) OnDoubleProposeDetected(*model.Block, *model.Block) {}
func (c *BlockConsumer) Ready() <-chan struct{} {
err := c.consumer.Start(c.defaultIndex)
if err != nil {
panic(fmt.Errorf("could not start block consumer for finder engine: %w", err))
}
ready := make(chan struct{})
close(ready)
return ready
}
func (c *BlockConsumer) Done() <-chan struct{} {
ready := make(chan struct{})
go func() {
completeChan := c.unit.Done()
c.consumer.Stop()
<-completeChan
close(ready)
}()
return ready
}