/
object_manager.go
229 lines (184 loc) · 7.25 KB
/
object_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
// Package object implements repository support for content-addressable objects of arbitrary size.
package object
import (
"context"
"io"
"sync"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/splitter"
)
// ErrObjectNotFound is returned when an object cannot be found.
var ErrObjectNotFound = errors.New("object not found")
// Reader allows reading, seeking, getting the length of and closing of a repository object.
type Reader interface {
io.Reader
io.Seeker
io.Closer
Length() int64
}
type contentReader interface {
ContentInfo(ctx context.Context, contentID content.ID) (content.Info, error)
GetContent(ctx context.Context, contentID content.ID) ([]byte, error)
PrefetchContents(ctx context.Context, contentIDs []content.ID, prefetchHint string) []content.ID
}
type contentManager interface {
contentReader
SupportsContentCompression() bool
WriteContent(ctx context.Context, data gather.Bytes, prefix content.IDPrefix, comp compression.HeaderID) (content.ID, error)
}
// Format describes the format of objects in a repository.
type Format struct {
Splitter string `json:"splitter,omitempty"` // splitter used to break objects into pieces of content
}
// Manager implements a content-addressable storage on top of blob storage.
type Manager struct {
Format Format
contentMgr contentManager
newSplitter splitter.Factory
writerPool sync.Pool
}
// NewWriter creates an ObjectWriter for writing to the repository.
func (om *Manager) NewWriter(ctx context.Context, opt WriterOptions) Writer {
w, _ := om.writerPool.Get().(*objectWriter)
w.ctx = ctx
w.om = om
w.splitter = om.newSplitter()
w.description = opt.Description
w.prefix = opt.Prefix
w.compressor = compression.ByName[opt.Compressor]
w.totalLength = 0
w.currentPosition = 0
// point the slice at the embedded array, so that we avoid allocations most of the time
w.indirectIndex = w.indirectIndexBuf[:0]
if opt.AsyncWrites > 0 {
if len(w.asyncWritesSemaphore) != 0 || cap(w.asyncWritesSemaphore) != opt.AsyncWrites {
w.asyncWritesSemaphore = make(chan struct{}, opt.AsyncWrites)
}
} else {
w.asyncWritesSemaphore = nil
}
w.buffer.Reset()
w.contentWriteError = nil
return w
}
func (om *Manager) closedWriter(ow *objectWriter) {
om.writerPool.Put(ow)
}
// Concatenate creates an object that's a result of concatenation of other objects. This is more efficient than reading
// and rewriting the objects because Concatenate can efficiently merge index entries without reading the underlying
// contents.
//
// This function exists primarily to facilitate efficient parallel uploads of very large files (>1GB). Due to bottleneck of
// splitting which is inherently sequential, we can only one use CPU core for each Writer, which limits throughput.
//
// For example when uploading a 100 GB file it is beneficial to independently upload sections of [0..25GB),
// [25..50GB), [50GB..75GB) and [75GB..100GB) and concatenate them together as this allows us to run four splitters
// in parallel utilizing more CPU cores. Because some split points now start at fixed bounaries and not content-specific,
// this causes some slight loss of deduplication at concatenation points (typically 1-2 contents, usually <10MB),
// so this method should only be used for very large files where this overhead is relatively small.
func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error) {
if len(objectIDs) == 0 {
return EmptyID, errors.Errorf("empty list of objects")
}
if len(objectIDs) == 1 {
return objectIDs[0], nil
}
var (
concatenatedEntries []IndirectObjectEntry
totalLength int64
err error
)
for _, objectID := range objectIDs {
concatenatedEntries, totalLength, err = appendIndexEntriesForObject(ctx, om.contentMgr, concatenatedEntries, totalLength, objectID)
if err != nil {
return EmptyID, errors.Wrapf(err, "error appending %v", objectID)
}
}
log(ctx).Debugf("concatenated: %v total: %v", concatenatedEntries, totalLength)
w := om.NewWriter(ctx, WriterOptions{
Prefix: indirectContentPrefix,
Description: "CONCATENATED INDEX",
})
defer w.Close() // nolint:errcheck
if werr := writeIndirectObject(w, concatenatedEntries); werr != nil {
return EmptyID, werr
}
concatID, err := w.Result()
if err != nil {
return EmptyID, errors.Wrap(err, "error writing concatenated index")
}
return IndirectObjectID(concatID), nil
}
func appendIndexEntriesForObject(ctx context.Context, cr contentReader, indexEntries []IndirectObjectEntry, startingLength int64, objectID ID) (result []IndirectObjectEntry, totalLength int64, _ error) {
if indexObjectID, ok := objectID.IndexObjectID(); ok {
ndx, err := LoadIndexObject(ctx, cr, indexObjectID)
if err != nil {
return nil, 0, errors.Wrapf(err, "error reading index of %v", objectID)
}
indexEntries, totalLength = appendIndexEntries(indexEntries, startingLength, ndx...)
return indexEntries, totalLength, nil
}
// non-index object - the precise length of the object cannot be determined from content due to compression and padding,
// so we must open the object to read its length.
r, err := Open(ctx, cr, objectID)
if err != nil {
return nil, 0, errors.Wrapf(err, "error opening %v", objectID)
}
defer r.Close() //nolint:errcheck
indexEntries, totalLength = appendIndexEntries(indexEntries, startingLength, IndirectObjectEntry{
Start: 0,
Length: r.Length(),
Object: objectID,
})
return indexEntries, totalLength, nil
}
func appendIndexEntries(indexEntries []IndirectObjectEntry, startingLength int64, incoming ...IndirectObjectEntry) (result []IndirectObjectEntry, totalLength int64) {
totalLength = startingLength
for _, inc := range incoming {
indexEntries = append(indexEntries, IndirectObjectEntry{
Start: inc.Start + startingLength,
Length: inc.Length,
Object: inc.Object,
})
totalLength += inc.Length
}
return indexEntries, totalLength
}
func noop(contentID content.ID) error { return nil }
// PrefetchBackingContents attempts to brings contents backing the provided object IDs into the cache.
// This may succeed only partially due to cache size limits and other.
// Returns the list of content IDs prefetched.
func PrefetchBackingContents(ctx context.Context, contentMgr contentManager, objectIDs []ID, hint string) ([]content.ID, error) {
tracker := &contentIDTracker{}
for _, oid := range objectIDs {
if err := iterateBackingContents(ctx, contentMgr, oid, tracker, noop); err != nil && !errors.Is(err, ErrObjectNotFound) && !errors.Is(err, content.ErrContentNotFound) {
return nil, err
}
}
return contentMgr.PrefetchContents(ctx, tracker.contentIDs(), hint), nil
}
// NewObjectManager creates an ObjectManager with the specified content manager and format.
func NewObjectManager(ctx context.Context, bm contentManager, f Format) (*Manager, error) {
om := &Manager{
contentMgr: bm,
Format: f,
}
om.writerPool = sync.Pool{
New: func() interface{} {
return new(objectWriter)
},
}
splitterID := f.Splitter
if splitterID == "" {
splitterID = "FIXED"
}
os := splitter.GetFactory(splitterID)
if os == nil {
return nil, errors.Errorf("unsupported splitter %q", f.Splitter)
}
om.newSplitter = splitter.Pooled(os)
return om, nil
}