-
Notifications
You must be signed in to change notification settings - Fork 7
/
proof.go
308 lines (262 loc) · 10.4 KB
/
proof.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
package relayer
import (
"context"
"fmt"
"strings"
protoblocktypes "github.com/cometbft/cometbft/proto/tendermint/types"
"github.com/rollchains/tiablob/celestia-node/blob"
"github.com/rollchains/tiablob/celestia-node/share"
"github.com/rollchains/tiablob/lightclients/celestia"
cn "github.com/rollchains/tiablob/relayer/celestia-node"
)
func (r *Relayer) getCachedProof(height int64) *celestia.BlobProof {
r.mu.Lock()
defer r.mu.Unlock()
return r.blockProofCache[height]
}
func (r *Relayer) setCachedProof(height int64, proof *celestia.BlobProof) {
r.mu.Lock()
r.blockProofCache[height] = proof
r.mu.Unlock()
}
// GetCachedProofs() returns the next set of proofs to verify on-chain
func (r *Relayer) GetCachedProofs(proofLimit int, latestProvenHeight int64) []*celestia.BlobProof {
var proofs []*celestia.BlobProof
checkHeight := latestProvenHeight + 1
proof := r.getCachedProof(checkHeight)
for proof != nil {
proofs = append(proofs, proof)
if len(proofs) >= proofLimit {
break
}
checkHeight = checkHeight + int64(len(proof.RollchainHeights))
proof = r.getCachedProof(checkHeight)
}
return proofs
}
func (r *Relayer) HasCachedProof(block int64) bool {
if proof := r.getCachedProof(block); proof != nil {
return true
}
return false
}
// checkForNewBlockProofs will query Celestia for new block proofs and cache them to be included in the next proposal
// that this validator proposes.
func (r *Relayer) checkForNewBlockProofs(ctx context.Context, latestClientState *celestia.ClientState) {
if latestClientState == nil {
return
}
celestiaNodeClient, err := cn.NewClient(ctx, r.nodeRpcUrl, r.nodeAuthToken)
if err != nil {
r.logger.Error("creating celestia node client", "error", err)
return
}
defer celestiaNodeClient.Close()
celestiaLatestHeight, err := r.celestiaProvider.QueryLatestHeight(ctx)
if err != nil {
r.logger.Error("querying latest height from Celestia", "error", err)
return
}
squareSize := uint64(0)
for queryHeight := r.celestiaLastQueriedHeight + 1; queryHeight < celestiaLatestHeight; queryHeight++ {
// get the namespace blobs from that height
blobs, err := celestiaNodeClient.Blob.GetAll(ctx, uint64(queryHeight), []share.Namespace{r.celestiaNamespace.Bytes()})
if err != nil {
// this error just indicates we don't have a blob at this height
if strings.Contains(err.Error(), "blob: not found") {
r.celestiaLastQueriedHeight = queryHeight
continue
}
r.logger.Error("Celestia node blob getall", "height", queryHeight, "error", err)
return
}
if len(blobs) > 0 {
block, err := r.celestiaProvider.GetBlockAtHeight(ctx, queryHeight)
if err != nil {
r.logger.Error("querying celestia block", "height", queryHeight, "error", err)
return
}
squareSize = block.Block.Data.SquareSize
fmt.Println("Celestia height: ", queryHeight, "SquareSize:", squareSize) // TODO: remove, debug only
if err = r.getBlobProofs(ctx, blobs, queryHeight, squareSize, latestClientState); err != nil {
r.logger.Error("getting blob proofs", "error", err)
}
}
r.celestiaLastQueriedHeight = queryHeight
}
}
func (r *Relayer) getBlobProofs(ctx context.Context, blobs []*blob.Blob, queryHeight int64, squareSize uint64, latestClientState *celestia.ClientState) error {
success := r.tryGetAggregatedProof(ctx, blobs, queryHeight, squareSize, latestClientState)
if !success {
for _, mBlob := range blobs {
err := r.getBlobProof(ctx, mBlob, queryHeight, squareSize, latestClientState)
if err != nil {
r.logger.Error("getting blob proof", "error", err)
return err
}
}
}
return nil
}
// tryGetAggregatedProof will try and find a proof for multiple blocks/blobs
// currently, it is strict and will bail on an error or not sequential set of blocks
func (r *Relayer) tryGetAggregatedProof(ctx context.Context, blobs []*blob.Blob, queryHeight int64, squareSize uint64, latestClientState *celestia.ClientState) bool {
heights := make([]int64, len(blobs))
for i, mBlob := range blobs {
var err error
// Replace blob data with our data for proof verification
heights[i], mBlob.Data = r.getBlobHeightAndData(ctx, mBlob, queryHeight, r.latestProvenHeight)
if heights[i] == 0 || mBlob.Data == nil {
r.logger.Info("aggregate, getBlobHeightAndData bail", "error", err)
// bail immediately and try individual block proofs
return false
}
// Non-sequential set of blocks check
if i > 0 && heights[i] != heights[i-1]+1 {
r.logger.Info("aggregate, blobs not sequential", "current", heights[i], "previous", heights[i-1])
return false
}
}
shares, err := blob.BlobsToShares(blobs...)
if err != nil {
r.logger.Info("aggregate, blobs to shares", "error", err)
return false
}
shareIndex := getShareIndex(uint64(blobs[0].Index()), squareSize)
// Get all shares from celestia node, confirm our shares are present
proverShareProof, err := r.celestiaProvider.ProveShares(ctx, uint64(queryHeight), shareIndex, shareIndex+uint64(len(shares)))
if err != nil {
r.logger.Info("aggregate, error calling ProveShares", "note", "may be a namespace collision", "error", err)
return false
}
// Get the header to verify the proof
header := r.getCachedHeader(queryHeight)
if header == nil {
header = r.fetchNewHeader(ctx, queryHeight, latestClientState)
if header == nil {
r.logger.Info("aggregate, fetch header is nil")
return false
}
}
proverShareProof.Data = shares // Replace with retrieved shares
err = proverShareProof.Validate(header.SignedHeader.Header.DataHash)
if err != nil {
r.logger.Info("aggregate, failed verify membership", "error", err)
return false
}
// We have a valid proof, if we haven't cached the header, cache it
// This goroutine is the only way to delete headers, so it should be kept that way
if r.getCachedHeader(queryHeight) == nil {
r.setCachedHeader(queryHeight, header)
}
proverShareProof.Data = nil // Only include proof
shareProofProto := celestia.TmShareProofToProto(proverShareProof)
proof := &celestia.BlobProof{
ShareProof: shareProofProto,
CelestiaHeight: queryHeight,
RollchainHeights: heights,
}
for _, height := range heights {
r.setCachedProof(height, proof)
}
return true
}
// GetBlobProof
// - unmarshals blob into block
// - fetches same block from this node, converts to proto type, and marshals
// - fetches proof from celestia app
// - verifies proof using block data from this node
// - stores proof in cache
//
// returns an error only when we shouldn't hit an error, otherwise nil if the blob could be someone elses (i.e. same namespace used)
func (r *Relayer) getBlobProof(ctx context.Context, mBlob *blob.Blob, queryHeight int64, squareSize uint64, latestClientState *celestia.ClientState) error {
rollchainBlockHeight, data := r.getBlobHeightAndData(ctx, mBlob, queryHeight, r.latestProvenHeight)
if data == nil {
return nil
}
// Replace blob data with our data for proof verification
mBlob.Data = data
shares, err := blob.BlobsToShares(mBlob)
if err != nil {
r.logger.Error("error BlobsToShares")
return nil
}
shareIndex := getShareIndex(uint64(mBlob.Index()), squareSize)
// Get all shares from celestia node, confirm our shares are present
proverShareProof, err := r.celestiaProvider.ProveShares(ctx, uint64(queryHeight), shareIndex, shareIndex+uint64(len(shares)))
if err != nil {
r.logger.Error("error calling ProveShares", "error", err)
return nil
}
// Get the header to verify the proof
header := r.getCachedHeader(queryHeight)
if header == nil {
header = r.fetchNewHeader(ctx, queryHeight, latestClientState)
if header == nil {
r.logger.Error("fetch new header is nil")
return nil
}
}
proverShareProof.Data = shares // Replace with retrieved shares
err = proverShareProof.Validate(header.SignedHeader.Header.DataHash)
if err != nil {
r.logger.Info("failed verify membership", "note", "may be a namespace collision", "error", err)
return nil
}
// We have a valid proof, if we haven't cached the header, cache it
// This goroutine is the only way to delete headers, so it should be kept that way
if r.getCachedHeader(queryHeight) == nil {
r.setCachedHeader(queryHeight, header)
}
proverShareProof.Data = nil // Only include proof
shareProofProto := celestia.TmShareProofToProto(proverShareProof)
r.setCachedProof(rollchainBlockHeight, &celestia.BlobProof{
ShareProof: shareProofProto,
CelestiaHeight: queryHeight,
RollchainHeights: []int64{rollchainBlockHeight},
})
return nil
}
func (r *Relayer) getBlobHeightAndData(ctx context.Context, mBlob *blob.Blob, queryHeight int64, latestProvenHeight int64) (int64, []byte) {
var blobBlockProto protoblocktypes.Block
err := blobBlockProto.Unmarshal(mBlob.GetData())
if err != nil {
r.logger.Info("blob unmarshal", "note", "may be a namespace collision", "height", queryHeight, "error", err)
return 0, nil
}
rollchainBlockHeight := blobBlockProto.Header.Height
// Ignore any blocks that are <= the latest proven height
if rollchainBlockHeight <= latestProvenHeight {
return 0, nil
}
blockProtoBz, err := r.GetLocalBlockAtHeight(ctx, rollchainBlockHeight)
if err != nil {
return 0, nil
}
return rollchainBlockHeight, blockProtoBz
}
func (r *Relayer) GetLocalBlockAtHeight(ctx context.Context, rollchainBlockHeight int64) ([]byte, error) {
// Cannot use txClient.GetBlockWithTxs since it tries to decode the txs. This API is broken when using the same tx
// injection method as vote extensions. https://docs.cosmos.network/v0.50/build/abci/vote-extensions#vote-extension-propagation
// "FinalizeBlock will ignore any byte slice that doesn't implement an sdk.Tx, so any injected vote extensions will safely be ignored in FinalizeBlock"
// "Some existing Cosmos SDK core APIs may need to be modified and thus broken."
expectedBlock, err := r.localProvider.GetBlockAtHeight(ctx, rollchainBlockHeight)
if err != nil {
// TODO: add retries, bad if this errors
r.logger.Error("getting local block", "note", "may be a namespace collision", "rollchain height", rollchainBlockHeight, "err", err)
return nil, err
}
blockProto, err := expectedBlock.Block.ToProto()
if err != nil {
return nil, err
}
return blockProto.Marshal()
}
// GetShareIndex calculates the share index given the EDS index of the blob and square size of the respective block
func getShareIndex(edsIndex uint64, squareSize uint64) uint64 {
shareIndex := edsIndex
if edsIndex > squareSize {
shareIndex = (edsIndex-(edsIndex%squareSize))/2 + (edsIndex % squareSize)
}
return shareIndex
}