-
Notifications
You must be signed in to change notification settings - Fork 568
/
fileset.go
202 lines (174 loc) · 5.78 KB
/
fileset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
/*
Package fileset provides access to files through file sets.
A file set is the basic unit of storage for files. File sets come in two types:
primitive and composite. A primitive file set stores a list of files added and
a list of files deleted. A composite file set composes multiple composite and /
or primitive file sets into layers which can be merged to produce a new file
set. Composite file sets allow file operations to be applied lazily, which can
be very beneficial for improving write / storage costs.
File sets are ephemeral by default, so unreferenced file sets will eventually
be garbage collected. Processing that involves creating and using file sets
should be done inside a renewer to keep the file sets alive while the
processing is occurring.
File sets can be created with a file set writer and files can be retrieved by
opening a file set with optional indexing and filtering, then iterating through
the files. Files must be added to a file set in lexicographical order and files
are emitted by a file set in lexicographical order. Various file set wrappers
are available that can perform computations and provide filtering based on the
files emitted by a file set.
Backwards compatibility:
The algorithms for file and index chunking have changed throughout 2.x, and we
must support previously written data. Here are some examples of conditions in
past data which current code will not generate:
- a file that is split across multiple chunks may share some of them with
other files (in the current code, a file split across chunks will be the only
file in those chunks).
- related, even small files may be split across multiple chunks
- an index range data reference may start part way through a chunk
*/
package fileset
import (
"bytes"
"context"
"crypto/rand"
"database/sql/driver"
"encoding/hex"
"io"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/storage/chunk"
"github.com/pachyderm/pachyderm/v2/src/internal/storage/fileset/index"
)
// ID is the unique identifier for a fileset
type ID [16]byte
func newID() ID {
id := ID{}
if _, err := rand.Read(id[:]); err != nil {
panic(err)
}
return id
}
// ParseID parses a string into an ID or returns an error
func ParseID(x string) (*ID, error) {
id, err := parseID([]byte(x))
if err != nil {
return nil, err
}
return &id, nil
}
// HexString returns the ID encoded with the hex alphabet.
func (id ID) HexString() string {
return hex.EncodeToString(id[:])
}
// TrackerID returns the tracker ID for the fileset.
func (id ID) TrackerID() string {
return TrackerPrefix + id.HexString()
}
// Scan implements sql.Scanner
func (id *ID) Scan(src interface{}) error {
var err error
switch x := src.(type) {
case []byte:
*id, err = parseID(x)
case string:
*id, err = parseID([]byte(x))
default:
return errors.Errorf("scanning fileset.ID: can't turn %T into fileset.ID", src)
}
return err
}
// Value implements sql.Valuer
func (id ID) Value() (driver.Value, error) {
return id.HexString(), nil
}
func parseID(x []byte) (ID, error) {
x = bytes.Replace(x, []byte{'-'}, []byte{}, -1)
id := ID{}
if len(x) < 32 {
return ID{}, errors.Errorf("parsing fileset.ID: too short to be ID len=%d", len(x))
}
if len(x) > 32 {
return ID{}, errors.Errorf("parsing fileset.ID: too long to be ID len=%d", len(x))
}
_, err := hex.Decode(id[:], x)
if err != nil {
return ID{}, errors.EnsureStack(err)
}
return id, nil
}
func HexStringsToIDs(xs []string) ([]ID, error) {
ids := []ID{}
for _, x := range xs {
id, err := ParseID(x)
if err != nil {
return nil, err
}
ids = append(ids, *id)
}
return ids, nil
}
func IDsToHexStrings(ids []ID) []string {
var xs []string
for _, id := range ids {
xs = append(xs, id.HexString())
}
return xs
}
// PointsTo returns a slice of the chunk.IDs which this fileset immediately points to.
// Transitively reachable chunks are not included in the slice.
func (p *Primitive) PointsTo() []chunk.ID {
var ids []chunk.ID
ids = append(ids, index.PointsTo(p.Additive)...)
ids = append(ids, index.PointsTo(p.Deletive)...)
return ids
}
// PointsTo returns the IDs of the filesets which this composite fileset points to
func (c *Composite) PointsTo() ([]ID, error) {
ids := make([]ID, len(c.Layers))
for i := range c.Layers {
id, err := ParseID(c.Layers[i])
if err != nil {
return nil, err
}
ids[i] = *id
}
return ids, nil
}
// File represents a file.
type File interface {
// Index returns the index for the file.
Index() *index.Index
// Content writes the content of the file.
Content(ctx context.Context, w io.Writer, opts ...chunk.ReaderOption) error
// Hash returns the hash of the file.
Hash(ctx context.Context) ([]byte, error)
}
var _ File = &MergeFileReader{}
var _ File = &FileReader{}
// FileSet represents a set of files.
type FileSet interface {
// Iterate iterates over the files in the file set.
Iterate(ctx context.Context, cb func(File) error, opts ...index.Option) error
// IterateDeletes iterates over the deleted files in the file set.
IterateDeletes(ctx context.Context, cb func(File) error, opts ...index.Option) error
// Shards returns a list of shards for the file set.
Shards(ctx context.Context, opts ...index.Option) ([]*index.PathRange, error)
}
var _ FileSet = &MergeReader{}
var _ FileSet = &Reader{}
type emptyFileSet struct{}
func (efs emptyFileSet) Iterate(_ context.Context, _ func(File) error, _ ...index.Option) error {
return nil
}
func (efs emptyFileSet) IterateDeletes(_ context.Context, _ func(File) error, _ ...index.Option) error {
return nil
}
func (efs emptyFileSet) Shards(_ context.Context, _ ...index.Option) ([]*index.PathRange, error) {
return []*index.PathRange{{}}, nil
}
func idsToHex(xs []ID) []string {
ys := make([]string, len(xs))
for i := range xs {
ys[i] = xs[i].HexString()
}
return ys
}