forked from pachyderm/pachyderm
/
reader.go
111 lines (100 loc) · 2.35 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package chunk
import (
"bytes"
"compress/gzip"
"context"
"io"
"path"
"github.com/pachyderm/pachyderm/src/client/pkg/grpcutil"
"github.com/pachyderm/pachyderm/src/server/pkg/obj"
)
// Reader reads a set of DataRefs from chunk storage.
type Reader struct {
ctx context.Context
objC obj.Client
dataRefs []*DataRef
curr *DataRef
buf *bytes.Buffer
r *bytes.Reader
len int64
}
func newReader(ctx context.Context, objC obj.Client, dataRefs ...*DataRef) *Reader {
r := &Reader{
ctx: ctx,
objC: objC,
buf: &bytes.Buffer{},
}
r.NextRange(dataRefs)
return r
}
// NextRange sets the next range for the reader.
func (r *Reader) NextRange(dataRefs []*DataRef) {
r.dataRefs = dataRefs
r.r = bytes.NewReader([]byte{})
r.len = 0
for _, dataRef := range dataRefs {
r.len += dataRef.SizeBytes
}
}
// Len returns the number of bytes left.
func (r *Reader) Len() int64 {
return r.len
}
// Read reads from the byte stream produced by the set of DataRefs.
func (r *Reader) Read(data []byte) (int, error) {
var totalRead int
defer func() {
r.len -= int64(totalRead)
}()
for len(data) > 0 {
n, err := r.r.Read(data)
data = data[n:]
totalRead += n
if err != nil {
// If all DataRefs have been read, then io.EOF.
if len(r.dataRefs) == 0 {
return totalRead, io.EOF
}
if err := r.nextDataRef(); err != nil {
return totalRead, err
}
}
}
return totalRead, nil
}
func (r *Reader) nextDataRef() error {
// Get next chunk if necessary.
if r.curr == nil || r.curr.Chunk.Hash != r.dataRefs[0].Chunk.Hash {
if err := r.readChunk(r.dataRefs[0].Chunk); err != nil {
return err
}
}
r.curr = r.dataRefs[0]
r.dataRefs = r.dataRefs[1:]
r.r = bytes.NewReader(r.buf.Bytes()[r.curr.OffsetBytes : r.curr.OffsetBytes+r.curr.SizeBytes])
return nil
}
func (r *Reader) readChunk(chunk *Chunk) error {
objR, err := r.objC.Reader(r.ctx, path.Join(prefix, chunk.Hash), 0, 0)
if err != nil {
return err
}
defer objR.Close()
gzipR, err := gzip.NewReader(objR)
if err != nil {
return err
}
defer gzipR.Close()
r.buf.Reset()
buf := grpcutil.GetBuffer()
defer grpcutil.PutBuffer(buf)
if _, err := io.CopyBuffer(r.buf, gzipR, buf); err != nil {
return err
}
return nil
}
// Close closes the reader.
// Currently a no-op, but will be used when streaming is implemented.
func (r *Reader) Close() error {
return nil
}