-
Notifications
You must be signed in to change notification settings - Fork 178
/
worker.go
51 lines (45 loc) · 1.67 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package blockconsumer
import (
"github.com/onflow/flow-go/engine/verification/assigner"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/jobqueue"
)
// worker is an internal type of this package.
// It receives block jobs from job consumer and converts it to Block and passes it to the
// finalized block processor (i.e., assigner engine) to process.
// In this sense, worker acts as a broker between the block consumer and block processor.
// The worker is stateless, and is solely responsible for converting block jobs to blocks, passing them
// to the processor and notifying consumer when the block is processed.
type worker struct {
processor assigner.FinalizedBlockProcessor
consumer *BlockConsumer
}
func newWorker(processor assigner.FinalizedBlockProcessor) *worker {
return &worker{
processor: processor,
}
}
func (w *worker) withBlockConsumer(consumer *BlockConsumer) {
w.consumer = consumer
}
// Run is a block worker that receives a job corresponding to a finalized block.
// It then converts the job to a block and passes it to the underlying engine
// for processing.
func (w *worker) Run(job module.Job) error {
block, err := jobqueue.JobToBlock(job)
if err != nil {
return err
}
// TODO: wire out the internal fatal error, and return.
w.processor.ProcessFinalizedBlock(block)
return nil
}
// Notify is a callback for engine to notify a block has been
// processed by the given blockID.
// The worker translates the block ID into job ID and notifies the consumer
// that the job is done.
func (w *worker) Notify(blockID flow.Identifier) {
jobID := jobqueue.JobID(blockID)
w.consumer.NotifyJobIsDone(jobID)
}