forked from Onyx-Protocol/Onyx
/
protocol.go
292 lines (243 loc) · 8 KB
/
protocol.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
/*
Package protocol provides the logic to tie together
storage and validation for a Chain Protocol blockchain.
This comprises all behavior that's common to every full
node, as well as other functions that need to operate on the
blockchain state.
Here are a few examples of typical full node types.
Generator
A generator has two basic jobs: collecting transactions from
other nodes and putting them into blocks.
To add a new block to the blockchain, call GenerateBlock,
sign the block (possibly collecting signatures from other
parties), and call CommitAppliedBlock.
Signer
A signer validates blocks generated by the Generator and signs
at most one block at each height.
Participant
A participant node in a network may select outputs for spending
and compose transactions.
To publish a new transaction, prepare your transaction
(select outputs, and compose and sign the tx) and send the
transaction to the network's generator. To wait for
confirmation, call BlockWaiter on successive block heights
and inspect the blockchain state until you find that the
transaction has been either confirmed or rejected. Note
that transactions may be malleable if there's no commitment
to TXSIGHASH.
New block sequence
Every new block must be validated against the existing
blockchain state. New blocks are validated by calling
ValidateBlock. Blocks produced by GenerateBlock are already
known to be valid.
A new block goes through the sequence:
- If not generated locally, the block is validated by
calling ValidateBlock.
- The new block is committed to the Chain's Store through
its SaveBlock method. This is the linearization point.
Once a block is saved to the Store, it's committed and
can be recovered after a crash.
- The Chain's in-memory representation of the blockchain
state is updated. If the block was remotely-generated,
the Chain must apply the new block to its current state
to retrieve the new state. If the block was generated
locally, the resulting state is already known and does
not need to be recalculated.
- Other cored processes are notified of the new block
through Store.FinalizeBlock.
Committing a block
As a consumer of the package, there are two ways to
commit a new block: CommitBlock and CommitAppliedBlock.
When generating new blocks, GenerateBlock will return
the resulting state snapshot with the new block. To
ingest a block with a known resulting state snapshot,
call CommitAppliedBlock.
When ingesting remotely-generated blocks, the state after
the block must be calculated by taking the Chain's
current state and applying the new block. To ingest a
block without a known resulting state snapshot, call
CommitBlock.
*/
package protocol
import (
"context"
"sync"
"time"
"github.com/golang/groupcache/lru"
"chain/errors"
"chain/log"
"chain/protocol/bc"
"chain/protocol/bc/legacy"
"chain/protocol/state"
)
// maxCachedValidatedTxs is the max number of validated txs to cache.
const maxCachedValidatedTxs = 1000
var (
// ErrTheDistantFuture is returned when waiting for a blockheight
// too far in excess of the tip of the blockchain.
ErrTheDistantFuture = errors.New("block height too far in future")
)
// Store provides storage for blockchain data: blocks and state tree
// snapshots.
//
// Note, this is different from a state snapshot. A state snapshot
// provides access to the state at a given point in time -- outputs
// and issuance memory. The Chain type uses Store to load state
// from storage and persist validated data.
type Store interface {
Height(context.Context) (uint64, error)
GetBlock(context.Context, uint64) (*legacy.Block, error)
LatestSnapshot(context.Context) (*state.Snapshot, uint64, error)
SaveBlock(context.Context, *legacy.Block) error
FinalizeBlock(context.Context, uint64) error
SaveSnapshot(context.Context, uint64, *state.Snapshot) error
}
// Chain provides a complete, minimal blockchain database. It
// delegates the underlying storage to other objects, and uses
// validation logic from package validation to decide what
// objects can be safely stored.
type Chain struct {
InitialBlockHash bc.Hash
MaxIssuanceWindow time.Duration // only used by generators
state struct {
cond sync.Cond // protects height, block, snapshot
height uint64
block *legacy.Block // current only if leader
snapshot *state.Snapshot // current only if leader
}
store Store
lastQueuedSnapshot time.Time
pendingSnapshots chan pendingSnapshot
prevalidated prevalidatedTxsCache
}
type pendingSnapshot struct {
height uint64
snapshot *state.Snapshot
}
// NewChain returns a new Chain using store as the underlying storage.
func NewChain(ctx context.Context, initialBlockHash bc.Hash, store Store, heights <-chan uint64) (*Chain, error) {
c := &Chain{
InitialBlockHash: initialBlockHash,
store: store,
pendingSnapshots: make(chan pendingSnapshot, 1),
prevalidated: prevalidatedTxsCache{
lru: lru.New(maxCachedValidatedTxs),
},
}
c.state.cond.L = new(sync.Mutex)
c.state.snapshot = state.Empty()
var err error
c.state.height, err = store.Height(ctx)
if err != nil {
return nil, errors.Wrap(err, "looking up blockchain height")
}
// Note that c.state.height may still be zero here.
if heights != nil {
go func() {
for h := range heights {
c.setHeight(h)
}
}()
}
go func() {
for {
select {
case <-ctx.Done():
return
case ps := <-c.pendingSnapshots:
err = store.SaveSnapshot(ctx, ps.height, ps.snapshot)
if err != nil {
log.Error(ctx, err, "at", "saving snapshot")
}
}
}
}()
return c, nil
}
// Height returns the current height of the blockchain.
func (c *Chain) Height() uint64 {
c.state.cond.L.Lock()
defer c.state.cond.L.Unlock()
return c.state.height
}
// TimestampMS returns the latest known block timestamp.
func (c *Chain) TimestampMS() uint64 {
c.state.cond.L.Lock()
defer c.state.cond.L.Unlock()
if c.state.block == nil {
return 0
}
return c.state.block.TimestampMS
}
// State returns the most recent state available. It will not be current
// unless the current process is the leader. Callers should examine the
// returned block header's height if they need to verify the current state.
func (c *Chain) State() (*legacy.Block, *state.Snapshot) {
c.state.cond.L.Lock()
defer c.state.cond.L.Unlock()
return c.state.block, c.state.snapshot
}
func (c *Chain) setState(b *legacy.Block, s *state.Snapshot) {
c.state.cond.L.Lock()
defer c.state.cond.L.Unlock()
// Multiple goroutines may attempt to set the state at the
// same time. If b is an older block than c.state, ignore it.
if b != nil && c.state.block != nil && b.Height <= c.state.block.Height {
return
}
c.state.block = b
c.state.snapshot = s
if b != nil && b.Height > c.state.height {
c.state.height = b.Height
c.state.cond.Broadcast()
}
}
func (c *Chain) setHeight(h uint64) {
// We update c.state.height from multiple places:
// setState and here, called by the Postgres LISTEN
// goroutine. setHeight must ignore heights less than
// the current height.
c.state.cond.L.Lock()
defer c.state.cond.L.Unlock()
if h <= c.state.height {
return
}
c.state.height = h
c.state.cond.Broadcast()
}
// BlockSoonWaiter returns a channel that
// waits for the block at the given height,
// but it is an error to wait for a block far in the future.
// WaitForBlockSoon will timeout if the context times out.
// To wait unconditionally, the caller should use WaitForBlock.
func (c *Chain) BlockSoonWaiter(ctx context.Context, height uint64) <-chan error {
ch := make(chan error, 1)
go func() {
const slop = 3
if height > c.Height()+slop {
ch <- ErrTheDistantFuture
return
}
select {
case <-c.BlockWaiter(height):
ch <- nil
case <-ctx.Done():
ch <- ctx.Err()
}
}()
return ch
}
// BlockWaiter returns a channel that
// waits for the block at the given height.
func (c *Chain) BlockWaiter(height uint64) <-chan struct{} {
ch := make(chan struct{}, 1)
go func() {
c.state.cond.L.Lock()
defer c.state.cond.L.Unlock()
for c.state.height < height {
c.state.cond.Wait()
}
ch <- struct{}{}
}()
return ch
}