forked from firedancer-io/radiance
-
Notifications
You must be signed in to change notification settings - Fork 0
/
shreds.go
126 lines (113 loc) · 3.09 KB
/
shreds.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package blockstore
import (
"encoding/binary"
"fmt"
"math"
bin "github.com/gagliardetto/binary"
"go.firedancer.io/radiance/pkg/shred"
)
// MakeShredKey creates the RocksDB key for CfDataShred or CfCodeShred.
func MakeShredKey(slot, index uint64) (key [16]byte) {
binary.BigEndian.PutUint64(key[0:8], slot)
binary.BigEndian.PutUint64(key[8:16], index)
return
}
// ParseShredKey decodes the RocksDB keys in CfDataShred or CfCodeShred.
func ParseShredKey(key []byte) (slot uint64, index uint64, ok bool) {
ok = len(key) == 16
if !ok {
return
}
slot = binary.BigEndian.Uint64(key[0:8])
index = binary.BigEndian.Uint64(key[8:16])
return
}
type entryRange struct {
startIdx, endIdx uint32
}
// entryRanges returns the shred ranges of each entry
func (s *SlotMeta) entryRanges() []entryRange {
if !s.IsFull() {
return nil
}
indexes := sliceSortedByRange[uint32](s.EntryEndIndexes, 0, uint32(s.Consumed))
ranges := make([]entryRange, len(indexes))
begin := uint32(0)
for i, index := range s.EntryEndIndexes {
ranges[i] = entryRange{begin, index}
begin = index + 1
}
return ranges
}
type ordered interface {
int | int8 | int16 | int32 | int64 |
uint | uint8 | uint16 | uint32 | uint64 | uintptr |
float32 | float64 |
string
}
func sliceSortedByRange[T ordered](list []T, start T, stop T) []T {
for len(list) > 0 && list[0] < start {
list = list[1:]
}
for len(list) > 0 && list[len(list)-1] >= stop {
list = list[:len(list)-1]
}
return list
}
type Entries struct {
Entries []shred.Entry
Raw []byte
Shreds []shred.Shred
}
func (e *Entries) Slot() uint64 {
if len(e.Shreds) == 0 {
return math.MaxUint64
}
return e.Shreds[0].Slot
}
// DataShredsToEntries reassembles shreds to entries containing transactions.
func DataShredsToEntries(meta *SlotMeta, shreds []shred.Shred) (entries []Entries, err error) {
ranges := meta.entryRanges()
for _, r := range ranges {
parts := shreds[r.startIdx : r.endIdx+1]
entryBytes := shred.Concat(parts)
if len(entryBytes) == 0 {
continue
}
var dec bin.Decoder
dec.SetEncoding(bin.EncodingBin)
dec.Reset(entryBytes)
var subEntries SubEntries
if err := subEntries.UnmarshalWithDecoder(&dec); err != nil {
return nil, fmt.Errorf("cannot decode entry at %d:[%d-%d]: %w",
meta.Slot, r.startIdx, r.endIdx, err)
}
entries = append(entries, Entries{
Entries: subEntries.Entries,
Raw: entryBytes[:dec.Position()],
Shreds: parts,
})
}
return entries, nil
}
type SubEntries struct {
Entries []shred.Entry
}
func (se *SubEntries) UnmarshalWithDecoder(decoder *bin.Decoder) (err error) {
// read the number of entries:
numEntries, err := decoder.ReadUint64(bin.LE)
if err != nil {
return fmt.Errorf("failed to read number of entries: %w", err)
}
if numEntries > uint64(decoder.Remaining()) {
return fmt.Errorf("not enough bytes to read %d entries", numEntries)
}
// read the entries:
se.Entries = make([]shred.Entry, numEntries)
for i := uint64(0); i < numEntries; i++ {
if err = se.Entries[i].UnmarshalWithDecoder(decoder); err != nil {
return fmt.Errorf("failed to read entry %d: %w", i, err)
}
}
return
}