-
Notifications
You must be signed in to change notification settings - Fork 26
/
reader.go
109 lines (93 loc) · 2.31 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package blob
import (
"io"
"redwood.dev/errors"
"redwood.dev/state"
"redwood.dev/types"
)
type Reader struct {
db *state.DBTree
manifest Manifest
spans []readerSpan
spanIdx uint64
chunkBytes []byte
chunkByteIdx uint64
eof bool
}
var _ io.ReadCloser = (*Reader)(nil)
type readerSpan struct {
ChunkIdx int
Rng types.Range
}
func NewReader(db *state.DBTree, manifest Manifest, rng *types.Range) *Reader {
var spans []readerSpan
if rng != nil {
normalizedRange := rng.NormalizedForLength(manifest.TotalSize)
var totalBytes uint64
for i, chunk := range manifest.Chunks {
intersection, exists := chunk.Range.Intersection(normalizedRange)
if !exists {
if len(spans) > 0 {
break
}
totalBytes += chunk.Range.Length()
continue
}
intersection.Start -= chunk.Range.Start
intersection.End -= chunk.Range.Start
spans = append(spans, readerSpan{ChunkIdx: i, Rng: intersection})
totalBytes += intersection.Length()
}
} else {
for i, chunk := range manifest.Chunks {
rng := types.Range{Start: 0, End: chunk.Range.End - chunk.Range.Start}
spans = append(spans, readerSpan{ChunkIdx: i, Rng: rng})
}
}
return &Reader{db: db, manifest: manifest, spans: spans}
}
func (r *Reader) Read(buf []byte) (int, error) {
if r.eof {
return 0, io.EOF
}
var copiedIntoBuf int
for copiedIntoBuf < len(buf) {
if r.spanIdx >= uint64(len(r.spans)) {
r.eof = true
break
}
if r.chunkBytes == nil {
err := r.readNextChunk()
if err != nil {
return 0, err
}
}
n := copy(buf[copiedIntoBuf:], r.chunkBytes[r.chunkByteIdx:])
r.chunkByteIdx += uint64(n)
copiedIntoBuf += n
if r.chunkByteIdx >= uint64(len(r.chunkBytes)) {
r.spanIdx++
r.chunkBytes = nil
r.chunkByteIdx = 0
}
}
return copiedIntoBuf, nil
}
func (r *Reader) readNextChunk() error {
node := r.db.State(false)
defer node.Close()
span := r.spans[r.spanIdx]
chunk := r.manifest.Chunks[span.ChunkIdx]
bytes, is, err := node.BytesValue(state.Keypath(chunk.SHA3.Hex()).Pushs("chunk"))
if err != nil {
return err
} else if !is {
return errors.New("value is not bytes")
}
bytesRange := span.Rng.NormalizedForLength(uint64(len(bytes)))
r.chunkBytes = bytes[bytesRange.Start:bytesRange.End]
return nil
}
func (r *Reader) Close() error {
return nil
}