Skip to content

Commit

Permalink
[proposal] Support creating directories on erasure coded backend
Browse files Browse the repository at this point in the history
This PR continues from #5049 where we started supporting
directories for erasure coded backend
  • Loading branch information
harshavardhana committed Dec 31, 2017
1 parent 659f724 commit 117161c
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 53 deletions.
22 changes: 3 additions & 19 deletions cmd/fs-v1.go
Expand Up @@ -386,7 +386,7 @@ func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string
// startOffset indicates the starting read location of the object.
// length indicates the total length of the object.
func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer) (err error) {
if err = checkBucketAndObjectNamesFS(bucket, object); err != nil {
if err = checkGetObjArgs(bucket, object); err != nil {
return err
}

Expand Down Expand Up @@ -498,25 +498,9 @@ func (fs fsObjects) getObjectInfo(bucket, object string) (oi ObjectInfo, e error
return fsMeta.ToObjectInfo(bucket, object, fi), nil
}

// Checks bucket and object name validity, returns nil if both are valid.
func checkBucketAndObjectNamesFS(bucket, object string) error {
// Verify if bucket is valid.
if !IsValidBucketName(bucket) {
return errors.Trace(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if len(object) == 0 {
return errors.Trace(ObjectNameInvalid{Bucket: bucket, Object: object})
}
if !IsValidObjectPrefix(object) {
return errors.Trace(ObjectNameInvalid{Bucket: bucket, Object: object})
}
return nil
}

// GetObjectInfo - reads object metadata and replies back ObjectInfo.
func (fs fsObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error) {
if err := checkBucketAndObjectNamesFS(bucket, object); err != nil {
if err := checkGetObjArgs(bucket, object); err != nil {
return oi, err
}

Expand Down Expand Up @@ -678,7 +662,7 @@ func (fs fsObjects) PutObject(bucket string, object string, data *hash.Reader, m
// DeleteObject - deletes an object from a bucket, this operation is destructive
// and there are no rollbacks supported.
func (fs fsObjects) DeleteObject(bucket, object string) error {
if err := checkBucketAndObjectNamesFS(bucket, object); err != nil {
if err := checkDelObjArgs(bucket, object); err != nil {
return err
}

Expand Down
9 changes: 4 additions & 5 deletions cmd/object-api-input-checks.go
Expand Up @@ -38,11 +38,10 @@ func checkBucketAndObjectNames(bucket, object string) error {
return errors.Trace(BucketNameInvalid{Bucket: bucket})
}
// Verify if object is valid.
if !IsValidObjectName(object) {
// Objects with "/" are invalid, verify to return a different error.
if hasSuffix(object, slashSeparator) || hasPrefix(object, slashSeparator) {
return errors.Trace(ObjectNotFound{Bucket: bucket, Object: object})
}
if len(object) == 0 {
return errors.Trace(ObjectNameInvalid{Bucket: bucket, Object: object})
}
if !IsValidObjectPrefix(object) {
return errors.Trace(ObjectNameInvalid{Bucket: bucket, Object: object})
}
return nil
Expand Down
21 changes: 12 additions & 9 deletions cmd/posix.go
Expand Up @@ -280,18 +280,21 @@ func (s *posix) MakeVol(volume string) (err error) {
if err != nil {
return err
}
// Make a volume entry, with mode 0777 mkdir honors system umask.
err = os.Mkdir((volumeDir), 0777)
if err != nil {
if os.IsExist(err) {
return errVolumeExists
} else if os.IsPermission(err) {

if _, err := os.Stat(volumeDir); err != nil {
// Volume does not exist we proceed to create.
if os.IsNotExist(err) {
// Make a volume entry, with mode 0777 mkdir honors system umask.
err = os.MkdirAll(volumeDir, 0777)
}
if os.IsPermission(err) {
return errDiskAccessDenied
}
return err
}
// Success
return nil

// Stat succeeds we return errVolumeExists.
return errVolumeExists
}

// ListVols - list volumes.
Expand Down Expand Up @@ -381,7 +384,7 @@ func (s *posix) StatVol(volume string) (volInfo VolInfo, err error) {
}
// Stat a volume entry.
var st os.FileInfo
st, err = os.Stat((volumeDir))
st, err = os.Stat(volumeDir)
if err != nil {
if os.IsNotExist(err) {
return VolInfo{}, errVolumeNotFound
Expand Down
8 changes: 3 additions & 5 deletions cmd/server_test.go
Expand Up @@ -76,9 +76,7 @@ func verifyError(c *check, response *http.Response, code, description string, st
func runAllTests(suite *TestSuiteCommon, c *check) {
suite.SetUpSuite(c)
suite.TestBucketSQSNotificationWebHook(c)
if suite.serverType == "XL" {
suite.TestObjectDir(c)
}
suite.TestObjectDir(c)
suite.TestBucketSQSNotificationAMQP(c)
suite.TestBucketPolicy(c)
suite.TestDeleteBucket(c)
Expand Down Expand Up @@ -260,7 +258,7 @@ func (s *TestSuiteCommon) TestObjectDir(c *check) {
response, err = client.Do(request)

c.Assert(err, nil)
c.Assert(response.StatusCode, http.StatusNotFound)
c.Assert(response.StatusCode, http.StatusOK)

request, err = newTestSignedRequest("GET", getGetObjectURL(s.endPoint, bucketName, "my-object-directory/"),
0, nil, s.accessKey, s.secretKey, s.signer)
Expand All @@ -271,7 +269,7 @@ func (s *TestSuiteCommon) TestObjectDir(c *check) {
response, err = client.Do(request)

c.Assert(err, nil)
c.Assert(response.StatusCode, http.StatusNotFound)
c.Assert(response.StatusCode, http.StatusOK)

request, err = newTestSignedRequest("DELETE", getDeleteObjectURL(s.endPoint, bucketName, "my-object-directory/"),
0, nil, s.accessKey, s.secretKey, s.signer)
Expand Down
105 changes: 92 additions & 13 deletions cmd/xl-v1-object.go
Expand Up @@ -33,6 +33,29 @@ import (
// list all errors which can be ignored in object operations.
var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied)

// putObjectDir hints the bottom layer to create a new directory.
func (xl xlObjects) putObjectDir(bucket, object string) error {
var wg = &sync.WaitGroup{}

errs := make([]error, len(xl.storageDisks))
// Prepare object creation in a all disks
for index, disk := range xl.storageDisks {
if disk == nil {
continue
}
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
if err := disk.MakeVol(pathJoin(bucket, object)); err != nil && err != errVolumeExists {
errs[index] = err
}
}(index, disk)
}
wg.Wait()

return reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum)
}

// prepareFile hints the bottom layer to optimize the creation of a new object
func (xl xlObjects) prepareFile(bucket, object string, size int64, onlineDisks []StorageAPI, blockSize int64, dataBlocks, writeQuorum int) error {
pErrs := make([]error, len(onlineDisks))
Expand Down Expand Up @@ -159,6 +182,12 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
return errors.Trace(errUnexpected)
}

// If its a directory request, we return an empty body.
if hasSuffix(object, slashSeparator) {
_, err := writer.Write([]byte(""))
return toObjectErr(errors.Trace(err), bucket, object)
}

// Read metadata associated with the object from all disks.
metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object)

Expand Down Expand Up @@ -310,21 +339,61 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
return nil
}

func (xl xlObjects) getObjectInfoDir(bucket, object string) (oi ObjectInfo, err error) {
var wg = &sync.WaitGroup{}

errs := make([]error, len(xl.storageDisks))
// Prepare object creation in a all disks
for index, disk := range xl.storageDisks {
if disk == nil {
continue
}
wg.Add(1)
go func(index int, disk StorageAPI) {
defer wg.Done()
if _, err := disk.StatVol(pathJoin(bucket, object)); err != nil {
// Since we are re-purposing StatVol, an object which
// is a directory if it doesn't exist should be
// returned as errFileNotFound instead, convert
// the error right here accordingly.
if err == errVolumeNotFound {
err = errFileNotFound
} else if err == errVolumeAccessDenied {
err = errFileAccessDenied
}

// Save error to reduce it later
errs[index] = err
}
}(index, disk)
}

wg.Wait()

return dirObjectInfo(bucket, object, 0, map[string]string{}), reduceReadQuorumErrs(errs, objectOpIgnoredErrs, xl.readQuorum)
}

// GetObjectInfo - reads object metadata and replies back ObjectInfo.
func (xl xlObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error) {
if err := checkGetObjArgs(bucket, object); err != nil {
return oi, err
}

if hasSuffix(object, slashSeparator) {
return xl.getObjectInfoDir(bucket, object)
}

info, err := xl.getObjectInfo(bucket, object)
if err != nil {
return oi, toObjectErr(err, bucket, object)
}

return info, nil
}

// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
func (xl xlObjects) getObjectInfo(bucket, object string) (objInfo ObjectInfo, err error) {

// Extracts xlStat and xlMetaMap.
xlStat, xlMetaMap, err := xl.readXLMetaStat(bucket, object)
if err != nil {
Expand Down Expand Up @@ -446,6 +515,19 @@ func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject
// writes `xl.json` which carries the necessary metadata for future
// object operations.
func (xl xlObjects) PutObject(bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) {
uniqueID := mustGetUUID()
tempObj := uniqueID

// No metadata is set, allocate a new one.
if metadata == nil {
metadata = make(map[string]string)
}

// Delete temporary object in the event of failure.
// If PutObject succeeded there would be no temporary
// object to delete.
defer xl.deleteObject(minioMetaTmpBucket, tempObj)

// This is a special case with size as '0' and object ends with
// a slash separator, we treat it like a valid operation and
// return success.
Expand All @@ -456,6 +538,16 @@ func (xl xlObjects) PutObject(bucket string, object string, data *hash.Reader, m
if xl.parentDirIsObject(bucket, path.Dir(object)) {
return ObjectInfo{}, toObjectErr(errors.Trace(errFileAccessDenied), bucket, object)
}

if err = xl.putObjectDir(minioMetaTmpBucket, tempObj); err != nil {
return ObjectInfo{}, toObjectErr(errors.Trace(err), bucket, object)
}

// Rename the successfully written temporary object to final location.
if _, err = renameObject(xl.storageDisks, minioMetaTmpBucket, tempObj, bucket, object, xl.writeQuorum); err != nil {
return ObjectInfo{}, toObjectErr(err, bucket, object)
}

return dirObjectInfo(bucket, object, data.Size(), metadata), nil
}

Expand All @@ -476,14 +568,6 @@ func (xl xlObjects) PutObject(bucket string, object string, data *hash.Reader, m
return ObjectInfo{}, toObjectErr(errors.Trace(errFileAccessDenied), bucket, object)
}

// No metadata is set, allocate a new one.
if metadata == nil {
metadata = make(map[string]string)
}

uniqueID := mustGetUUID()
tempObj := uniqueID

// Limit the reader to its provided size if specified.
var reader io.Reader = data

Expand Down Expand Up @@ -527,11 +611,6 @@ func (xl xlObjects) PutObject(bucket string, object string, data *hash.Reader, m
// Order disks according to erasure distribution
onlineDisks := shuffleDisks(xl.storageDisks, partsMetadata[0].Erasure.Distribution)

// Delete temporary object in the event of failure.
// If PutObject succeeded there would be no temporary
// object to delete.
defer xl.deleteObject(minioMetaTmpBucket, tempObj)

// Total size of the written object
var sizeWritten int64

Expand Down
2 changes: 0 additions & 2 deletions cmd/xl-v1-object_test.go
Expand Up @@ -75,8 +75,6 @@ func TestXLDeleteObjectBasic(t *testing.T) {
{"----", "obj", BucketNameInvalid{Bucket: "----"}},
{"bucket", "", ObjectNameInvalid{Bucket: "bucket", Object: ""}},
{"bucket", "doesnotexist", ObjectNotFound{Bucket: "bucket", Object: "doesnotexist"}},
{"bucket", "obj/", ObjectNotFound{Bucket: "bucket", Object: "obj/"}},
{"bucket", "/obj", ObjectNotFound{Bucket: "bucket", Object: "/obj"}},
{"bucket", "obj", nil},
}

Expand Down

0 comments on commit 117161c

Please sign in to comment.