/
archive.go
999 lines (839 loc) · 31 KB
/
archive.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
package proof
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"os"
"path/filepath"
"regexp"
"strings"
"time"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/taproot-assets/asset"
"github.com/lightninglabs/taproot-assets/fn"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwire"
)
const (
// TaprootAssetsFileEnding is the main file suffix for the Taproot Asset
// proof files stored on disk, without the dot.
TaprootAssetsFileEnding = "assetproof"
// TaprootAssetsFileSuffix is the main file suffix for the Taproot Asset
// proof files stored on disk, including the dot.
TaprootAssetsFileSuffix = "." + TaprootAssetsFileEnding
// ProofDirName is the name of the directory we'll use to store our
// proofs.
ProofDirName = "proofs"
// outpointTruncateLength is the number of hex characters we use to
// represent the outpoint hash in the file name. This is to avoid
// problems with long file names on some operating systems.
outpointTruncateLength = 32
)
var (
// emptyKey is an empty public key that we use to check if a script key
// is valid.
emptyKey btcec.PublicKey
// ErrProofNotFound is returned when a user attempts to look up a proof
// based on a Locator, but we can't find it on disk.
ErrProofNotFound = fmt.Errorf("unable to find proof")
// ErrInvalidLocatorID is returned when a specified has an invalid
// asset ID.
ErrInvalidLocatorID = fmt.Errorf("invalid asset ID locator")
// ErrInvalidLocatorKey is returned when a specified locator script key
// is invalid.
ErrInvalidLocatorKey = fmt.Errorf("invalid script key locator")
// ErrOutPointMissing is returned when a specified locator does not
// contain an outpoint. The outpoint is required when storing a proof.
ErrOutPointMissing = fmt.Errorf("outpoint missing in key locator")
// ErrMultipleProofs is returned if looking up a proof with only the
// asset ID and script key results in multiple proofs being found.
ErrMultipleProofs = fmt.Errorf(
"multiple proofs found with asset ID and script key, specify " +
"outpoint to disambiguate",
)
// OutPointFileNamePattern is the regular expression we use to find out
// if a proof file on disk already has the new naming scheme. The first
// number (66) is the number of hex characters in the compressed script
// key, the second number (32) is the number of hex characters in the
// truncated outpoint txid (16 bytes of the txid). The last part is the
// variable length outpoint index (at least one digit).
OutPointFileNamePattern = regexp.MustCompile(
`^[0-9a-f]{66}-[0-9a-f]{32}-[0-9]+\.` +
TaprootAssetsFileEnding + "$",
)
)
// Locator is able to uniquely identify a proof in the extended Taproot Asset
// Universe by a combination of the: top-level asset ID, the group key, and also
// the script key.
type Locator struct {
// AssetID the asset ID of the proof to fetch. This is an optional field.
AssetID *asset.ID
// GroupKey the group key of the asset to fetch. This is an optional
// field.
GroupKey *btcec.PublicKey
// ScriptKey specifies the script key of the asset to fetch/store. This
// field MUST be specified.
ScriptKey btcec.PublicKey
// OutPoint is the outpoint of the associated asset. This field is
// optional.
OutPoint *wire.OutPoint
}
// Hash returns a SHA256 hash of the bytes serialized locator.
func (l *Locator) Hash() ([32]byte, error) {
var buf bytes.Buffer
if l.AssetID != nil {
buf.Write(l.AssetID[:])
}
if l.GroupKey != nil {
buf.Write(l.GroupKey.SerializeCompressed())
}
buf.Write(l.ScriptKey.SerializeCompressed())
if l.OutPoint != nil {
err := lnwire.WriteOutPoint(&buf, *l.OutPoint)
if err != nil {
return [32]byte{}, fmt.Errorf("unable to write "+
"outpoint: %w", err)
}
}
// Hash the buffer.
return sha256.Sum256(buf.Bytes()), nil
}
// AnnotatedProof an annotated proof contains the raw proof blob along with a
// locator that may convey additional information related to the proof.
type AnnotatedProof struct {
Locator
Blob
*AssetSnapshot
}
// Archiver is the main storage backend the ProofArchiver uses to store and
// query for proof files.
type Archiver interface {
// FetchProof fetches a proof for an asset uniquely identified by the
// passed ProofIdentifier.
//
// If a proof cannot be found, then ErrProofNotFound should be
// returned. If multiple proofs exist for the given fields of the
// locator then ErrMultipleProofs should be returned to indicate more
// specific fields need to be set in the Locator (e.g. the OutPoint).
FetchProof(ctx context.Context, id Locator) (Blob, error)
// HasProof returns true if the proof for the given locator exists. This
// is intended to be a performance optimized lookup compared to fetching
// a proof and checking for ErrProofNotFound.
HasProof(ctx context.Context, id Locator) (bool, error)
// FetchProofs fetches all proofs for assets uniquely identified by the
// passed asset ID.
FetchProofs(ctx context.Context, id asset.ID) ([]*AnnotatedProof, error)
// ImportProofs attempts to store fully populated proofs on disk. The
// previous outpoint of the first state transition will be used as the
// Genesis point. The final resting place of the asset will be used as
// the script key itself. If replace is specified, we expect a proof to
// already be present, and we just update (replace) it with the new
// proof.
ImportProofs(ctx context.Context, headerVerifier HeaderVerifier,
groupVerifier GroupVerifier, replace bool,
proofs ...*AnnotatedProof) error
}
// NotifyArchiver is an Archiver that also allows callers to subscribe to
// notifications about new proofs being added to the archiver.
type NotifyArchiver interface {
// FetchProof fetches a proof for an asset uniquely identified by the
// passed Identifier. The returned blob is expected to be the encoded
// full proof file, containing the complete provenance of the asset.
//
// If a proof cannot be found, then ErrProofNotFound should be returned.
FetchProof(ctx context.Context, id Locator) (Blob, error)
fn.EventPublisher[Blob, []*Locator]
}
// MultiArchiveNotifier is a NotifyArchiver that wraps several other archives
// and notifies subscribers about new proofs that are added to any of the
// archives.
type MultiArchiveNotifier struct {
archives []NotifyArchiver
}
// NewMultiArchiveNotifier creates a new MultiArchiveNotifier based on the set
// of specified backends.
func NewMultiArchiveNotifier(archives ...NotifyArchiver) *MultiArchiveNotifier {
return &MultiArchiveNotifier{
archives: archives,
}
}
// FetchProof fetches a proof for an asset uniquely identified by the passed
// Identifier. The returned proof can either be a full proof file or just a
// single proof.
//
// If a proof cannot be found, then ErrProofNotFound should be returned.
//
// NOTE: This is part of the NotifyArchiver interface.
func (m *MultiArchiveNotifier) FetchProof(ctx context.Context,
id Locator) (Blob, error) {
for idx := range m.archives {
a := m.archives[idx]
proofBlob, err := a.FetchProof(ctx, id)
if errors.Is(err, ErrProofNotFound) {
// Try the next archive.
continue
} else if err != nil {
return nil, fmt.Errorf("error fetching proof "+
"from archive: %w", err)
}
return proofBlob, nil
}
return nil, ErrProofNotFound
}
// RegisterSubscriber adds a new subscriber for receiving events. The
// registration request is forwarded to all registered archives.
func (m *MultiArchiveNotifier) RegisterSubscriber(
receiver *fn.EventReceiver[Blob], deliverExisting bool,
deliverFrom []*Locator) error {
for idx := range m.archives {
a := m.archives[idx]
err := a.RegisterSubscriber(
receiver, deliverExisting, deliverFrom,
)
if err != nil {
return fmt.Errorf("error registering subscriber: %w",
err)
}
}
return nil
}
// RemoveSubscriber removes the given subscriber and also stops it from
// processing events. The removal request is forwarded to all registered
// archives.
func (m *MultiArchiveNotifier) RemoveSubscriber(
subscriber *fn.EventReceiver[Blob]) error {
for idx := range m.archives {
a := m.archives[idx]
err := a.RemoveSubscriber(subscriber)
if err != nil {
return fmt.Errorf("error removing subscriber: "+
"%w", err)
}
}
return nil
}
// A compile-time interface to ensure MultiArchiveNotifier meets the
// NotifyArchiver interface.
var _ NotifyArchiver = (*MultiArchiveNotifier)(nil)
// FileArchiver implements proof Archiver backed by an on-disk file system. The
// archiver takes a single root directory then creates the following overlap
// mapping:
//
// proofs/
// ├─ asset_id1/
// │ ├─ scriptKey1-outpointTxid[:32]-outpointIndex.assetproof
// │ ├─ scriptKey2-outpointTxid[:32]-outpointIndex.assetproof
type FileArchiver struct {
// proofPath is the directory name that we'll use as the roof for all
// our files.
proofPath string
// eventDistributor is an event distributor that will be used to notify
// subscribers about new proofs that are added to the archiver.
eventDistributor *fn.EventDistributor[Blob]
}
// NewFileArchiver creates a new file archive rooted at the passed specified
// directory.
//
// TODO(roasbeef): use fs.FS instead?
//
// TODO(roasbeef): option to memory map these instead? then don't need to lug
// around large blobs in user space as much
func NewFileArchiver(dirName string) (*FileArchiver, error) {
// First, we'll make sure our main proof directory has already been
// created.
proofPath := filepath.Join(dirName, ProofDirName)
if err := os.Mkdir(proofPath, 0750); err != nil && !os.IsExist(err) {
return nil, fmt.Errorf("unable to create proof dir: %w", err)
}
// We need to make sure that all our proof files have the new naming
// scheme. If they don't, we'll rename them now. This might take quite a
// while since we need to read each proof file and parse it to extract
// the outpoint and then rename it.
err := migrateOldFileNames(proofPath)
if err != nil {
return nil, fmt.Errorf("error migrating old proof file "+
"names: %w", err)
}
return &FileArchiver{
proofPath: proofPath,
eventDistributor: fn.NewEventDistributor[Blob](),
}, nil
}
// genProofFileStoragePath generates the full proof file path for storing a
// proof based on a rootPath and a valid locator.
// The final path is:
//
// root/assetID/scriptKey-outpointTxid[:32]-outpointIndex.assetproof
//
// NOTE: Because some operating systems have issues with paths longer than 256
// characters, we don't use the full outpoint in the file name, but only the
// first 16 bytes (32 hex characters) of the hash. That should be enough to
// avoid collisions but saves us a full 32 characters (we already use 130 for
// the hex encoded asset ID and script key).
func genProofFileStoragePath(rootPath string, loc Locator) (string, error) {
switch {
case loc.AssetID == nil:
return "", ErrInvalidLocatorID
case loc.ScriptKey.IsEqual(&emptyKey):
return "", ErrInvalidLocatorKey
case loc.OutPoint == nil:
return "", ErrOutPointMissing
}
assetID := hex.EncodeToString(loc.AssetID[:])
truncatedHash := loc.OutPoint.Hash.String()[:outpointTruncateLength]
fileName := fmt.Sprintf("%x-%s-%d.%s",
loc.ScriptKey.SerializeCompressed(), truncatedHash,
loc.OutPoint.Index, TaprootAssetsFileEnding)
return filepath.Join(rootPath, assetID, fileName), nil
}
// lookupProofFilePath returns the full path for reading a proof file, based on
// the given locator. If the locator does not contain an outpoint, we'll check
// if there is just a single proof available on disk. If there is, we return
// that. If there are multiple, then the user needs to also specify the outpoint
// and we return ErrMultipleProofs.
func lookupProofFilePath(rootPath string, loc Locator) (string, error) {
// If an outpoint is specified, we want to look up a very specific file
// on disk.
if loc.OutPoint != nil {
fullName, err := genProofFileStoragePath(rootPath, loc)
if err != nil {
return "", err
}
// If the file doesn't exist under the full name, we know there
// just isn't a proof file for that asset yet.
if !lnrpc.FileExists(fullName) {
return "", fmt.Errorf("proof file %s does not "+
"exist: %w", fullName, ErrProofNotFound)
}
return fullName, nil
}
// If the user didn't specify an outpoint, we look up all proof files
// that start with the script key given. If there is exactly one, we
// return it.
switch {
case loc.AssetID == nil:
return "", ErrInvalidLocatorID
case loc.ScriptKey.IsEqual(&emptyKey):
return "", ErrInvalidLocatorKey
}
assetID := hex.EncodeToString(loc.AssetID[:])
scriptKey := hex.EncodeToString(loc.ScriptKey.SerializeCompressed())
searchPattern := filepath.Join(rootPath, assetID, scriptKey+"*")
matches, err := filepath.Glob(searchPattern)
if err != nil {
return "", fmt.Errorf("error listing proof files: %w", err)
}
switch {
// We have no proof for this script key.
case len(matches) == 0:
return "", ErrProofNotFound
// Exactly one proof for this script key, we'll return it.
case len(matches) == 1:
return matches[0], nil
// User needs to specify the outpoint as well, since we have multiple
// proofs for this script key.
default:
return "", ErrMultipleProofs
}
}
// extractLastProof extracts the last proof from a proof file.
func extractLastProof(fileContent Blob) (*Proof, error) {
parsedFile := &File{}
err := parsedFile.Decode(bytes.NewReader(fileContent))
if err != nil {
return nil, fmt.Errorf("error parsing proof file: %w", err)
}
// To find out the new file name, we need to parse the proof
// file and extract the last proof in it.
lastProof, err := parsedFile.LastProof()
if err != nil {
return nil, fmt.Errorf("error extracting last proof from "+
"proof file: %w", err)
}
return lastProof, nil
}
// migrateOldFileNames looks for proof files in the root path that don't conform
// to the new naming scheme and renames them to the new scheme.
func migrateOldFileNames(rootPath string) error {
// List all files matching rootPath/*/*.assetproof.
searchPattern := filepath.Join(
rootPath, "*", "*"+TaprootAssetsFileSuffix,
)
oldProofs, err := filepath.Glob(searchPattern)
if err != nil {
return fmt.Errorf("error listing old proof files: %w", err)
}
// Skip files that already have the new naming pattern.
oldProofs = fn.Filter(oldProofs, func(path string) bool {
return !OutPointFileNamePattern.MatchString(filepath.Base(path))
})
// Nothing to migrate, let's not even log a message to avoid startup
// log spam.
if len(oldProofs) == 0 {
return nil
}
log.Infof("Found %d proof files in %s with old naming scheme, "+
"renaming now (will take a while)", len(oldProofs), rootPath)
var (
startTime = time.Now()
numFilesRenamed int
)
for _, oldPath := range oldProofs {
proofFile, err := os.ReadFile(oldPath)
if err != nil {
return fmt.Errorf("unable to read proof: %w", err)
}
// To find out the new file name, we need to parse the proof
// file and extract the last proof in it.
lastProof, err := extractLastProof(proofFile)
if err != nil {
return fmt.Errorf("unable to extract last proof from "+
"proof file: %w", err)
}
newFileName, err := genProofFileStoragePath(rootPath, Locator{
AssetID: fn.Ptr(lastProof.Asset.ID()),
ScriptKey: *lastProof.Asset.ScriptKey.PubKey,
OutPoint: fn.Ptr(lastProof.OutPoint()),
})
if err != nil {
return fmt.Errorf("error generating new file name: "+
"%w", err)
}
err = os.Rename(oldPath, newFileName)
if err != nil {
return fmt.Errorf("error renaming file %s to %s: %w",
oldPath, newFileName, err)
}
numFilesRenamed++
if numFilesRenamed%1000 == 0 {
log.Infof("Renamed %d of %d old files", numFilesRenamed,
len(oldProofs))
}
}
log.Infof("Done renaming %d proof files, took %v", len(oldProofs),
time.Since(startTime))
return nil
}
// FetchProof fetches a proof for an asset uniquely identified by the passed
// ProofIdentifier.
//
// If a proof cannot be found, then ErrProofNotFound should be returned. If
// multiple proofs exist for the given fields of the locator then
// ErrMultipleProofs is returned to indicate more specific fields need to be set
// in the Locator (e.g. the OutPoint).
//
// NOTE: This implements the Archiver interface.
func (f *FileArchiver) FetchProof(_ context.Context, id Locator) (Blob, error) {
// All our on-disk storage is based on asset IDs, so to look up a path,
// we just need to compute the full file path and see if it exists on
// disk.
proofPath, err := lookupProofFilePath(f.proofPath, id)
if err != nil {
return nil, fmt.Errorf("unable to make proof file path: %w",
err)
}
proofFile, err := os.ReadFile(proofPath)
switch {
case os.IsNotExist(err):
return nil, ErrProofNotFound
case err != nil:
return nil, fmt.Errorf("unable to find proof: %w", err)
}
return proofFile, nil
}
// HasProof returns true if the proof for the given locator exists. This is
// intended to be a performance optimized lookup compared to fetching a proof
// and checking for ErrProofNotFound.
func (f *FileArchiver) HasProof(_ context.Context, id Locator) (bool, error) {
// All our on-disk storage is based on asset IDs, so to look up a path,
// we just need to compute the full file path and see if it exists on
// disk.
proofPath, err := lookupProofFilePath(f.proofPath, id)
if err != nil {
return false, fmt.Errorf("unable to make proof file path: %w",
err)
}
return lnrpc.FileExists(proofPath), nil
}
// FetchProofs fetches all proofs for assets uniquely identified by the passed
// asset ID.
func (f *FileArchiver) FetchProofs(_ context.Context,
id asset.ID) ([]*AnnotatedProof, error) {
assetID := hex.EncodeToString(id[:])
assetPath := filepath.Join(f.proofPath, assetID)
entries, err := os.ReadDir(assetPath)
if err != nil {
return nil, fmt.Errorf("unable to read dir %s: %w", assetPath,
err)
}
proofs := make([]*AnnotatedProof, len(entries))
for idx := range entries {
// We'll skip any files that don't end with our suffix, this
// will include directories as well, so we don't need to check
// for those.
fileName := entries[idx].Name()
if !strings.HasSuffix(fileName, TaprootAssetsFileSuffix) {
continue
}
parts := strings.Split(strings.ReplaceAll(
fileName, TaprootAssetsFileSuffix, "",
), "-")
if len(parts) != 3 {
return nil, fmt.Errorf("malformed proof file name "+
"'%s', expected two parts, got %d", fileName,
len(parts))
}
scriptKeyBytes, err := hex.DecodeString(parts[0])
if err != nil {
return nil, fmt.Errorf("malformed proof file name, "+
"unable to decode script key: %w", err)
}
scriptKey, err := btcec.ParsePubKey(scriptKeyBytes)
if err != nil {
return nil, fmt.Errorf("malformed proof file name, "+
"unable to parse script key: %w", err)
}
fullPath := filepath.Join(assetPath, fileName)
proofFile, err := os.ReadFile(fullPath)
if err != nil {
return nil, fmt.Errorf("unable to read proof: %w", err)
}
// We only have part of the outpoint in the file name, so we
// need to read the file and parse the last proof to extract the
// outpoint.
lastProof, err := extractLastProof(proofFile)
if err != nil {
return nil, fmt.Errorf("unable to extract last proof "+
"from proof file: %w", err)
}
outPoint := lastProof.OutPoint()
proofs[idx] = &AnnotatedProof{
Locator: Locator{
AssetID: &id,
ScriptKey: *scriptKey,
OutPoint: &outPoint,
},
Blob: proofFile,
}
}
return proofs, nil
}
// ImportProofs attempts to store fully populated proofs on disk. The previous
// outpoint of the first state transition will be used as the Genesis point.
// The final resting place of the asset will be used as the script key itself.
// If replace is specified, we expect a proof to already be present, and we just
// update (replace) it with the new proof.
//
// NOTE: This implements the Archiver interface.
func (f *FileArchiver) ImportProofs(_ context.Context,
_ HeaderVerifier, _ GroupVerifier, replace bool,
proofs ...*AnnotatedProof) error {
for _, proof := range proofs {
proofPath, err := genProofFileStoragePath(
f.proofPath, proof.Locator,
)
if err != nil {
return err
}
if err := os.MkdirAll(filepath.Dir(proofPath), 0750); err != nil {
return err
}
// Can't replace a file that doesn't exist yet.
if replace && !lnrpc.FileExists(proofPath) {
return fmt.Errorf("cannot replace proof because file "+
"%s does not exist", proofPath)
}
log.Tracef("Importing proof file %s (replace=%v)", proofPath,
replace)
err = os.WriteFile(proofPath, proof.Blob, 0666)
if err != nil {
return fmt.Errorf("unable to store proof: %v", err)
}
f.eventDistributor.NotifySubscribers(proof.Blob)
}
return nil
}
// RegisterSubscriber adds a new subscriber for receiving events. The
// deliverExisting boolean indicates whether already existing items should be
// sent to the NewItemCreated channel when the subscription is started. An
// optional deliverFrom can be specified to indicate from which timestamp/index/
// marker onward existing items should be delivered on startup. If deliverFrom
// is nil/zero/empty then all existing items will be delivered.
func (f *FileArchiver) RegisterSubscriber(
receiver *fn.EventReceiver[Blob],
deliverExisting bool, deliverFrom []*Locator) error {
f.eventDistributor.RegisterSubscriber(receiver)
// No delivery of existing items requested, we're done here.
if !deliverExisting {
return nil
}
for _, loc := range deliverFrom {
blob, err := f.FetchProof(nil, *loc)
if err != nil {
return err
}
// Deliver the found proof to the new item queue of the
// subscriber.
receiver.NewItemCreated.ChanIn() <- blob
}
return nil
}
// RemoveSubscriber removes the given subscriber and also stops it from
// processing events.
func (f *FileArchiver) RemoveSubscriber(
subscriber *fn.EventReceiver[Blob]) error {
return f.eventDistributor.RemoveSubscriber(subscriber)
}
// A compile-time interface to ensure FileArchiver meets the NotifyArchiver
// interface.
var _ NotifyArchiver = (*FileArchiver)(nil)
// MultiArchiver is an archive of archives. It contains several archives and
// attempts to use them either as a look-aside cache, or a write through cache
// for all incoming requests.
type MultiArchiver struct {
proofVerifier Verifier
backends []Archiver
// archiveTimeout is the default timeout to use for any archive
// interaction.
archiveTimeout time.Duration
// eventDistributor is an event distributor that will be used to notify
// subscribers about new proofs that are added to the archiver.
eventDistributor *fn.EventDistributor[Blob]
}
// NewMultiArchiver creates a new MultiArchiver based on the set of specified
// backends.
func NewMultiArchiver(verifier Verifier, archiveTimeout time.Duration,
backends ...Archiver) *MultiArchiver {
return &MultiArchiver{
proofVerifier: verifier,
backends: backends,
archiveTimeout: archiveTimeout,
eventDistributor: fn.NewEventDistributor[Blob](),
}
}
// FetchProof fetches a proof for an asset uniquely identified by the passed
// ProofIdentifier.
func (m *MultiArchiver) FetchProof(ctx context.Context,
loc Locator) (Blob, error) {
// Iterate through all our active backends and try to see if at least
// one of them contains the proof. Either one of them will have the
// proof, or we'll return an error back to the user.
//
// TODO(roasbeef): fire all requests off and take the one that responds
// first?
for _, archive := range m.backends {
proof, err := archive.FetchProof(ctx, loc)
switch {
case errors.Is(err, ErrProofNotFound):
continue
case err != nil:
return nil, err
}
return proof, nil
}
return nil, ErrProofNotFound
}
// HasProof returns true if the proof for the given locator exists. This is
// intended to be a performance optimized lookup compared to fetching a proof
// and checking for ErrProofNotFound. The multi archiver only considers a proof
// to be present if all backends have it.
func (m *MultiArchiver) HasProof(ctx context.Context, id Locator) (bool, error) {
for _, archive := range m.backends {
ok, err := archive.HasProof(ctx, id)
if err != nil {
return false, err
}
// We are expecting all backends to have the proof, otherwise we
// consider the proof not to be found.
if !ok {
return false, nil
}
}
return true, nil
}
// FetchProofs fetches all proofs for assets uniquely identified by the passed
// asset ID.
func (m *MultiArchiver) FetchProofs(ctx context.Context,
id asset.ID) ([]*AnnotatedProof, error) {
// We are listing proofs, so it shouldn't matter which backend we use.
return m.backends[0].FetchProofs(ctx, id)
}
// ImportProofs attempts to store fully populated proofs on disk. The previous
// outpoint of the first state transition will be used as the Genesis point.
// The final resting place of the asset will be used as the script key itself.
func (m *MultiArchiver) ImportProofs(ctx context.Context,
headerVerifier HeaderVerifier, groupVerifier GroupVerifier,
replace bool, proofs ...*AnnotatedProof) error {
// Before we import the proofs into the archive, we want to make sure
// that they're all valid. Along the way, we may augment the locator
// for each proof accordingly.
f := func(c context.Context, proof *AnnotatedProof) error {
// First, we'll decode and then also verify the proof.
finalStateTransition, err := m.proofVerifier.Verify(
c, bytes.NewReader(proof.Blob), headerVerifier,
groupVerifier,
)
if err != nil {
return fmt.Errorf("unable to verify proof: %w", err)
}
proof.AssetSnapshot = finalStateTransition
// TODO(roasbeef): actually want the split commit info here?
// * or need to pass in alongside the proof?
finalAsset := finalStateTransition.Asset
// Now that the proof has been fully verified, we'll use the
// final resting place of the asset (result of the last state
// transition) to create a proper annotated proof. We only need
// to do this if it wasn't specified though.
if proof.AssetID == nil {
assetID := finalAsset.ID()
proof.AssetID = &assetID
if finalAsset.GroupKey != nil {
proof.GroupKey = &finalAsset.GroupKey.GroupPubKey
}
proof.ScriptKey = *finalAsset.ScriptKey.PubKey
}
return nil
}
if err := fn.ParSlice(ctx, proofs, f); err != nil {
return err
}
// Now that we know all the proofs are valid, and have tacked on some
// additional supplementary information into the locator, we'll attempt
// to import each proof our archive backends.
for _, archive := range m.backends {
err := archive.ImportProofs(
ctx, headerVerifier, groupVerifier, replace, proofs...,
)
if err != nil {
return err
}
}
// Deliver each new proof to the new item queue of the subscribers.
blobs := fn.Map(proofs, func(p *AnnotatedProof) Blob {
return p.Blob
})
m.eventDistributor.NotifySubscribers(blobs...)
return nil
}
// RegisterSubscriber adds a new subscriber for receiving events. The
// deliverExisting boolean indicates whether already existing items should be
// sent to the NewItemCreated channel when the subscription is started. An
// optional deliverFrom can be specified to indicate from which timestamp/index/
// marker onward existing items should be delivered on startup. If deliverFrom
// is nil/zero/empty then all existing items will be delivered.
func (m *MultiArchiver) RegisterSubscriber(receiver *fn.EventReceiver[Blob],
deliverExisting bool, deliverFrom []*Locator) error {
m.eventDistributor.RegisterSubscriber(receiver)
// No delivery of existing items requested, we're done here.
if !deliverExisting {
return nil
}
ctxt, cancel := context.WithTimeout(
context.Background(), m.archiveTimeout,
)
defer cancel()
for _, loc := range deliverFrom {
blob, err := m.FetchProof(ctxt, *loc)
if err != nil {
return err
}
// Deliver the found proof to the new item queue of the
// subscriber.
receiver.NewItemCreated.ChanIn() <- blob
}
return nil
}
// RemoveSubscriber removes the given subscriber and also stops it from
// processing events.
func (m *MultiArchiver) RemoveSubscriber(
subscriber *fn.EventReceiver[Blob]) error {
return m.eventDistributor.RemoveSubscriber(subscriber)
}
// A compile-time assertion to make sure MultiArchiver satisfies the
// NotifyArchiver interface.
var _ NotifyArchiver = (*MultiArchiver)(nil)
// ReplaceProofInBlob attempts to replace a proof in all proof files we have for
// assets of the same ID. This is useful when we want to update the proof with a
// new one after a re-org.
func ReplaceProofInBlob(ctx context.Context, p *Proof, archive Archiver,
headerVerifier HeaderVerifier, groupVerifier GroupVerifier) error {
// This is a bit of a hacky part. If we have a chain of transactions
// that were re-organized, we can't verify the whole chain until all of
// the transactions were confirmed and all proofs were updated with the
// new blocks and merkle roots. So we'll skip the verification here
// since we don't know if the whole chain has been updated yet (the
// confirmations might come in out of order).
// TODO(guggero): Find a better way to do this.
headerVerifier = func(wire.BlockHeader, uint32) error {
return nil
}
assetID := p.Asset.ID()
scriptPubKeyOfUpdate := p.Asset.ScriptKey.PubKey
// We now fetch all proofs of that same asset ID and filter out those
// that need updating.
proofs, err := archive.FetchProofs(ctx, assetID)
if err != nil {
return fmt.Errorf("unable to fetch all proofs for asset ID "+
"%x: %w", assetID[:], err)
}
for idx := range proofs {
existingProof := proofs[idx]
f := &File{}
err := f.Decode(bytes.NewReader(existingProof.Blob))
if err != nil {
return fmt.Errorf("unable to decode current proof: %w",
err)
}
// We only need to update proofs that contain this asset in the
// chain and haven't been updated yet (i.e. the block hash of
// the proof is different from the block hash of the proof we
// want to update).
_, indexToUpdate, err := f.LocateProof(func(fp *Proof) bool {
fileScriptKey := fp.Asset.ScriptKey.PubKey
fileTxHash := fp.AnchorTx.TxHash()
fileBlockHash := fp.BlockHeader.BlockHash()
return fileScriptKey.IsEqual(scriptPubKeyOfUpdate) &&
fileTxHash == p.AnchorTx.TxHash() &&
fileBlockHash != p.BlockHeader.BlockHash()
})
if err != nil {
// Either we failed to decode the proof for some reason,
// or we didn't find a proof that needs updating. In
// either case, we can skip this file.
continue
}
log.Debugf("Updating descendant proof at index %d "+
"(script_key=%x) in file with %d proofs", indexToUpdate,
scriptPubKeyOfUpdate.SerializeCompressed(),
f.NumProofs())
// All good, we can now replace the proof in the file with the
// new one.
err = f.ReplaceProofAt(indexToUpdate, *p)
if err != nil {
return fmt.Errorf("unable to replace proof at index "+
"%d with updated one: %w", indexToUpdate, err)
}
var buf bytes.Buffer
if err := f.Encode(&buf); err != nil {
return fmt.Errorf("unable to encode updated proof: %w",
err)
}
// We now update this direct proof in the archive.
directProof := &AnnotatedProof{
Locator: existingProof.Locator,
Blob: buf.Bytes(),
}
err = archive.ImportProofs(
ctx, headerVerifier, groupVerifier, true, directProof,
)
if err != nil {
return fmt.Errorf("unable to import updated proof: %w",
err)
}
}
return nil
}