-
Notifications
You must be signed in to change notification settings - Fork 568
/
reader.go
114 lines (101 loc) · 3.13 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package fileset
import (
"context"
"github.com/pachyderm/pachyderm/v2/src/internal/pctx"
"io"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/storage/chunk"
"github.com/pachyderm/pachyderm/v2/src/internal/storage/fileset/index"
)
// Reader is an abstraction for reading a file set.
type Reader struct {
store MetadataStore
chunks *chunk.Storage
idxCache *index.Cache
id ID
}
func newReader(store MetadataStore, chunks *chunk.Storage, idxCache *index.Cache, id ID) *Reader {
return &Reader{
store: store,
chunks: chunks,
idxCache: idxCache,
id: id,
}
}
func (r *Reader) Iterate(ctx context.Context, cb func(File) error, opts ...index.Option) error {
ctx = pctx.Child(ctx, "reader.iterate")
prim, err := r.getPrimitive(ctx)
if err != nil {
return err
}
ctx = pctx.Child(ctx, "", pctx.WithFields(LogIndex(prim.Additive, "startIdx")))
ir := index.NewReader(r.chunks, r.idxCache, prim.Additive, opts...)
return ir.Iterate(ctx, func(idx *index.Index) error {
return cb(newFileReader(r.chunks, idx))
})
}
func (r *Reader) getPrimitive(ctx context.Context) (*Primitive, error) {
md, err := r.store.Get(ctx, r.id)
if err != nil {
return nil, err
}
prim := md.GetPrimitive()
if prim == nil {
return nil, errors.Errorf("file set %v is not primitive", r.id)
}
return prim, nil
}
func (r *Reader) IterateDeletes(ctx context.Context, cb func(File) error, opts ...index.Option) error {
ctx = pctx.Child(ctx, "reader.iterateDeletes")
prim, err := r.getPrimitive(ctx)
if err != nil {
return err
}
ir := index.NewReader(r.chunks, r.idxCache, prim.Deletive, opts...)
ctx = pctx.Child(ctx, "", pctx.WithFields(LogIndex(prim.Deletive, "startIdx")))
return ir.Iterate(ctx, func(idx *index.Index) error {
return cb(newFileReader(r.chunks, idx))
})
}
func (r *Reader) Shards(ctx context.Context, opts ...index.Option) (pathRanges []*index.PathRange, err error) {
ctx = pctx.Child(ctx, "reader.shard")
prim, err := r.getPrimitive(ctx)
if err != nil {
return nil, err
}
ctx = pctx.Child(ctx, "", pctx.WithFields(LogIndex(prim.Additive, "startIdx")))
ir := index.NewReader(r.chunks, nil, prim.Additive, opts...)
pathRanges, err = ir.Shards(ctx)
if err != nil {
return nil, errors.Wrap(err, "sharding fileset")
}
return pathRanges, nil
}
// FileReader is an abstraction for reading a file.
type FileReader struct {
chunks *chunk.Storage
idx *index.Index
}
func newFileReader(chunks *chunk.Storage, idx *index.Index) *FileReader {
return &FileReader{
chunks: chunks,
idx: idx,
}
}
// Index returns the index for the file.
func (fr *FileReader) Index() *index.Index {
return fr.idx
}
// Content writes the content of the file.
func (fr *FileReader) Content(ctx context.Context, w io.Writer, opts ...chunk.ReaderOption) error {
r := fr.chunks.NewReader(ctx, fr.idx.File.DataRefs, opts...)
return r.Get(w)
}
// Hash returns the hash of the file.
func (fr *FileReader) Hash(_ context.Context) ([]byte, error) {
var hashes [][]byte
for _, dataRef := range fr.idx.File.DataRefs {
hashes = append(hashes, dataRef.Hash)
}
return computeFileHash(hashes)
}