Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement putMetacacheObject() optimizing List operations #12903

Merged
merged 1 commit into from
Aug 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
161 changes: 161 additions & 0 deletions cmd/erasure-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,167 @@ func rename(ctx context.Context, disks []StorageAPI, srcBucket, srcEntry, dstBuc
return evalDisks(disks, errs), err
}

func (er erasureObjects) putMetacacheObject(ctx context.Context, key string, r *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error) {
data := r.Reader

// No metadata is set, allocate a new one.
if opts.UserDefined == nil {
opts.UserDefined = make(map[string]string)
}

storageDisks := er.getDisks()
// Get parity and data drive count based on storage class metadata
parityDrives := globalStorageClass.GetParityForSC(opts.UserDefined[xhttp.AmzStorageClass])
if parityDrives <= 0 {
parityDrives = er.defaultParityCount
}
dataDrives := len(storageDisks) - parityDrives

// we now know the number of blocks this object needs for data and parity.
// writeQuorum is dataBlocks + 1
writeQuorum := dataDrives
if dataDrives == parityDrives {
writeQuorum++
}

// Validate input data size and it can never be less than zero.
if data.Size() < -1 {
logger.LogIf(ctx, errInvalidArgument, logger.Application)
return ObjectInfo{}, toObjectErr(errInvalidArgument)
}

// Initialize parts metadata
partsMetadata := make([]FileInfo, len(storageDisks))

fi := newFileInfo(pathJoin(minioMetaBucket, key), dataDrives, parityDrives)
fi.DataDir = mustGetUUID()

// Initialize erasure metadata.
for index := range partsMetadata {
partsMetadata[index] = fi
}

// Order disks according to erasure distribution
var onlineDisks []StorageAPI
onlineDisks, partsMetadata = shuffleDisksAndPartsMetadata(storageDisks, partsMetadata, fi)

erasure, err := NewErasure(ctx, fi.Erasure.DataBlocks, fi.Erasure.ParityBlocks, fi.Erasure.BlockSize)
if err != nil {
return ObjectInfo{}, toObjectErr(err, minioMetaBucket, key)
}

// Fetch buffer for I/O, returns from the pool if not allocates a new one and returns.
var buffer []byte
switch size := data.Size(); {
case size == 0:
buffer = make([]byte, 1) // Allocate atleast a byte to reach EOF
case size >= fi.Erasure.BlockSize:
buffer = er.bp.Get()
defer er.bp.Put(buffer)
case size < fi.Erasure.BlockSize:
// No need to allocate fully blockSizeV1 buffer if the incoming data is smaller.
buffer = make([]byte, size, 2*size+int64(fi.Erasure.ParityBlocks+fi.Erasure.DataBlocks-1))
}

if len(buffer) > int(fi.Erasure.BlockSize) {
buffer = buffer[:fi.Erasure.BlockSize]
}

shardFileSize := erasure.ShardFileSize(data.Size())
writers := make([]io.Writer, len(onlineDisks))
inlineBuffers := make([]*bytes.Buffer, len(onlineDisks))
for i, disk := range onlineDisks {
if disk == nil {
continue
}

inlineBuffers[i] = bytes.NewBuffer(make([]byte, 0, shardFileSize))
writers[i] = newStreamingBitrotWriterBuffer(inlineBuffers[i], DefaultBitrotAlgorithm, erasure.ShardSize())
}

n, erasureErr := erasure.Encode(ctx, data, writers, buffer, writeQuorum)
closeBitrotWriters(writers)
if erasureErr != nil {
return ObjectInfo{}, toObjectErr(erasureErr, minioMetaBucket, key)
}

// Should return IncompleteBody{} error when reader has fewer bytes
// than specified in request header.
if n < data.Size() {
return ObjectInfo{}, IncompleteBody{Bucket: minioMetaBucket, Object: key}
}

for i, w := range writers {
if w == nil {
onlineDisks[i] = nil
continue
}
if len(inlineBuffers) > 0 && inlineBuffers[i] != nil {
partsMetadata[i].Data = inlineBuffers[i].Bytes()
} else {
partsMetadata[i].Data = nil
}
partsMetadata[i].AddObjectPart(1, "", n, data.ActualSize())
partsMetadata[i].Erasure.AddChecksumInfo(ChecksumInfo{
PartNumber: 1,
Algorithm: DefaultBitrotAlgorithm,
Hash: bitrotWriterSum(w),
})
}

modTime := UTCNow()

// Fill all the necessary metadata.
// Update `xl.meta` content on each disks.
for index := range partsMetadata {
partsMetadata[index].Metadata = opts.UserDefined
partsMetadata[index].Size = n
partsMetadata[index].ModTime = modTime
}

// Set an additional header when data is inlined.
for index := range partsMetadata {
partsMetadata[index].SetInlineData()
}

for i := 0; i < len(onlineDisks); i++ {
if onlineDisks[i] != nil && onlineDisks[i].IsOnline() {
// Object info is the same in all disks, so we can pick
// the first meta from online disk
fi = partsMetadata[i]
break
}
}

g := errgroup.WithNErrs(len(onlineDisks))

// Rename file on all underlying storage disks.
for index := range onlineDisks {
index := index
g.Go(func() error {
if onlineDisks[index] == nil {
return errDiskNotFound
}
// Pick one FileInfo for a disk at index.
fi := partsMetadata[index]
// Assign index when index is initialized
if fi.Erasure.Index == 0 {
fi.Erasure.Index = index + 1
}

if fi.IsValid() {
return onlineDisks[index].WriteMetadata(ctx, minioMetaBucket, key, fi)
}
return errFileCorrupt
}, index)
}

// Wait for all renames to finish.
errs := g.Wait()

return fi.ToObjectInfo(minioMetaBucket, key), reduceWriteQuorumErrs(ctx, errs, objectOpIgnoredErrs, writeQuorum)
}

// PutObject - creates an object upon reading from the input stream
// until EOF, erasure codes the data across all disk and additionally
// writes `xl.meta` which carries the necessary metadata for future
Expand Down
3 changes: 1 addition & 2 deletions cmd/metacache-set.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,9 +648,8 @@ func (er *erasureObjects) saveMetaCacheStream(ctx context.Context, mc *metaCache
r, err := hash.NewReader(bytes.NewReader(b.data), int64(len(b.data)), "", "", int64(len(b.data)))
logger.LogIf(ctx, err)
custom := b.headerKV()
_, err = er.putObject(ctx, minioMetaBucket, o.objectPath(b.n), NewPutObjReader(r), ObjectOptions{
_, err = er.putMetacacheObject(ctx, o.objectPath(b.n), NewPutObjReader(r), ObjectOptions{
UserDefined: custom,
NoLock: true, // No need to hold namespace lock, each prefix caches uniquely.
})
if err != nil {
mc.setErr(err.Error())
Expand Down