forked from Consensys/quorum
/
speculative_chain.go
183 lines (154 loc) · 6.34 KB
/
speculative_chain.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package raft
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"gopkg.in/fatih/set.v0"
lane "gopkg.in/oleiade/lane.v1"
)
// The speculative chain represents blocks that we have minted which haven't been accepted into the chain yet, building
// on each other in a chain. It has three basic operations:
// * add new block to end
// * accept / remove oldest block
// * unwind / remove invalid blocks to the end
//
// Additionally:
// * clear state when we stop minting
// * set the parent when we're not minting (so it's always current)
type speculativeChain struct {
head *types.Block
unappliedBlocks *lane.Deque
expectedInvalidBlockHashes *set.Set // This is thread-safe. This set is referred to as our "guard" below.
proposedTxes *set.Set // This is thread-safe.
}
func newSpeculativeChain() *speculativeChain {
return &speculativeChain{
head: nil,
unappliedBlocks: lane.NewDeque(),
expectedInvalidBlockHashes: set.New(),
proposedTxes: set.New(),
}
}
func (chain *speculativeChain) clear(block *types.Block) {
chain.head = block
chain.unappliedBlocks = lane.NewDeque()
chain.expectedInvalidBlockHashes.Clear()
chain.proposedTxes.Clear()
}
// Append a new speculative block
func (chain *speculativeChain) extend(block *types.Block) {
chain.head = block
chain.recordProposedTransactions(block.Transactions())
chain.unappliedBlocks.Append(block)
}
// Set the parent of the speculative chain
//
// Note: This is only called when not minter
func (chain *speculativeChain) setHead(block *types.Block) {
chain.head = block
}
// Accept this block, removing it from the speculative chain
func (chain *speculativeChain) accept(acceptedBlock *types.Block) {
earliestProposedI := chain.unappliedBlocks.Shift()
var earliestProposed *types.Block
if nil != earliestProposedI {
earliestProposed = earliestProposedI.(*types.Block)
}
// There are three possible scenarios:
// 1. We don't have a record of this block (or any proposed blocks), meaning someone else minted it and we should
// add it as the new head of our speculative chain. New blocks from the old leader are still coming in.
// 2. This block was the first outstanding one we proposed.
// 3. This block is different from the block we proposed, (also) meaning new blocks are still coming in from the old
// leader, but unlike the first scenario, we need to clear all of the speculative chain state because the
// `acceptedBlock` takes precedence over our speculative state.
if earliestProposed == nil {
chain.head = acceptedBlock
} else if expectedBlock := earliestProposed.Hash() == acceptedBlock.Hash(); expectedBlock {
// Remove the txes in this accepted block from our blacklist.
chain.removeProposedTxes(acceptedBlock)
} else {
log.Info("Another node minted; Clearing speculative state", "block", acceptedBlock.Hash())
chain.clear(acceptedBlock)
}
}
// Remove all blocks in the chain from the specified one until the end
func (chain *speculativeChain) unwindFrom(invalidHash common.Hash, headBlock *types.Block) {
// check our "guard" to see if this is a (descendant) block we're
// expected to be ruled invalid. if we find it, remove from the guard
if chain.expectedInvalidBlockHashes.Has(invalidHash) {
log.Info("Removing expected-invalid block from guard.", "block", invalidHash)
chain.expectedInvalidBlockHashes.Remove(invalidHash)
return
}
// pop from the RHS repeatedly, updating minter.parent each time. if not
// our block, add to guard. in all cases, call removeProposedTxes
for {
currBlockI := chain.unappliedBlocks.Pop()
if nil == currBlockI {
log.Info("(Popped all blocks from queue.)")
break
}
currBlock := currBlockI.(*types.Block)
log.Info("Popped block from queue RHS.", "block", currBlock.Hash())
// Maintain invariant: the parent always points the last speculative block or the head of the blockchain
// if there are not speculative blocks.
if speculativeParentI := chain.unappliedBlocks.Last(); nil != speculativeParentI {
chain.head = speculativeParentI.(*types.Block)
} else {
chain.head = headBlock
}
chain.removeProposedTxes(currBlock)
if currBlock.Hash() != invalidHash {
log.Info("Haven't yet found block; adding descendent to guard.\n", "invalid block", invalidHash, "descendant", currBlock.Hash())
chain.expectedInvalidBlockHashes.Add(currBlock.Hash())
} else {
break
}
}
}
// We keep track of txes we've put in all newly-mined blocks since the last
// ChainHeadEvent, and filter them out so that we don't try to create blocks
// with the same transactions. This is necessary because the TX pool will keep
// supplying us these transactions until they are in the chain (after having
// flown through raft).
func (chain *speculativeChain) recordProposedTransactions(txes types.Transactions) {
txHashIs := make([]interface{}, len(txes))
for i, tx := range txes {
txHashIs[i] = tx.Hash()
}
chain.proposedTxes.Add(txHashIs...)
}
// Removes txes in block from our "blacklist" of "proposed tx" hashes. When we
// create a new block and use txes from the tx pool, we ignore those that we
// have already used ("proposed"), but that haven't yet officially made it into
// the chain yet.
//
// It's important to remove hashes from this blacklist (once we know we don't
// need them in there anymore) so that it doesn't grow endlessly.
func (chain *speculativeChain) removeProposedTxes(block *types.Block) {
minedTxes := block.Transactions()
minedTxInterfaces := make([]interface{}, len(minedTxes))
for i, tx := range minedTxes {
minedTxInterfaces[i] = tx.Hash()
}
// NOTE: we are using a thread-safe Set here, so it's fine if we access this
// here and in mintNewBlock concurrently. using a finer-grained set-specific
// lock here is preferable, because mintNewBlock holds its locks for a
// nontrivial amount of time.
chain.proposedTxes.Remove(minedTxInterfaces...)
}
func (chain *speculativeChain) withoutProposedTxes(addrTxes AddressTxes) AddressTxes {
newMap := make(AddressTxes)
for addr, txes := range addrTxes {
filteredTxes := make(types.Transactions, 0)
for _, tx := range txes {
if !chain.proposedTxes.Has(tx.Hash()) {
filteredTxes = append(filteredTxes, tx)
}
}
if len(filteredTxes) > 0 {
newMap[addr] = filteredTxes
}
}
return newMap
}