diff --git a/cmd/fs-v1.go b/cmd/fs-v1.go index c7368a358aa558..8530da2ca99534 100644 --- a/cmd/fs-v1.go +++ b/cmd/fs-v1.go @@ -386,7 +386,7 @@ func (fs fsObjects) CopyObject(srcBucket, srcObject, dstBucket, dstObject string // startOffset indicates the starting read location of the object. // length indicates the total length of the object. func (fs fsObjects) GetObject(bucket, object string, offset int64, length int64, writer io.Writer) (err error) { - if err = checkBucketAndObjectNamesFS(bucket, object); err != nil { + if err = checkGetObjArgs(bucket, object); err != nil { return err } @@ -498,25 +498,9 @@ func (fs fsObjects) getObjectInfo(bucket, object string) (oi ObjectInfo, e error return fsMeta.ToObjectInfo(bucket, object, fi), nil } -// Checks bucket and object name validity, returns nil if both are valid. -func checkBucketAndObjectNamesFS(bucket, object string) error { - // Verify if bucket is valid. - if !IsValidBucketName(bucket) { - return errors.Trace(BucketNameInvalid{Bucket: bucket}) - } - // Verify if object is valid. - if len(object) == 0 { - return errors.Trace(ObjectNameInvalid{Bucket: bucket, Object: object}) - } - if !IsValidObjectPrefix(object) { - return errors.Trace(ObjectNameInvalid{Bucket: bucket, Object: object}) - } - return nil -} - // GetObjectInfo - reads object metadata and replies back ObjectInfo. func (fs fsObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error) { - if err := checkBucketAndObjectNamesFS(bucket, object); err != nil { + if err := checkGetObjArgs(bucket, object); err != nil { return oi, err } @@ -678,7 +662,7 @@ func (fs fsObjects) PutObject(bucket string, object string, data *hash.Reader, m // DeleteObject - deletes an object from a bucket, this operation is destructive // and there are no rollbacks supported. func (fs fsObjects) DeleteObject(bucket, object string) error { - if err := checkBucketAndObjectNamesFS(bucket, object); err != nil { + if err := checkDelObjArgs(bucket, object); err != nil { return err } diff --git a/cmd/object-api-input-checks.go b/cmd/object-api-input-checks.go index 7bb14955d1ee60..c671f98cc2a54a 100644 --- a/cmd/object-api-input-checks.go +++ b/cmd/object-api-input-checks.go @@ -38,11 +38,10 @@ func checkBucketAndObjectNames(bucket, object string) error { return errors.Trace(BucketNameInvalid{Bucket: bucket}) } // Verify if object is valid. - if !IsValidObjectName(object) { - // Objects with "/" are invalid, verify to return a different error. - if hasSuffix(object, slashSeparator) || hasPrefix(object, slashSeparator) { - return errors.Trace(ObjectNotFound{Bucket: bucket, Object: object}) - } + if len(object) == 0 { + return errors.Trace(ObjectNameInvalid{Bucket: bucket, Object: object}) + } + if !IsValidObjectPrefix(object) { return errors.Trace(ObjectNameInvalid{Bucket: bucket, Object: object}) } return nil diff --git a/cmd/posix.go b/cmd/posix.go index 1184c8a5b4f353..4c1edc5730863c 100644 --- a/cmd/posix.go +++ b/cmd/posix.go @@ -280,18 +280,21 @@ func (s *posix) MakeVol(volume string) (err error) { if err != nil { return err } - // Make a volume entry, with mode 0777 mkdir honors system umask. - err = os.Mkdir((volumeDir), 0777) - if err != nil { - if os.IsExist(err) { - return errVolumeExists - } else if os.IsPermission(err) { + + if _, err := os.Stat(volumeDir); err != nil { + // Volume does not exist we proceed to create. + if os.IsNotExist(err) { + // Make a volume entry, with mode 0777 mkdir honors system umask. + err = os.MkdirAll(volumeDir, 0777) + } + if os.IsPermission(err) { return errDiskAccessDenied } return err } - // Success - return nil + + // Stat succeeds we return errVolumeExists. + return errVolumeExists } // ListVols - list volumes. @@ -381,7 +384,7 @@ func (s *posix) StatVol(volume string) (volInfo VolInfo, err error) { } // Stat a volume entry. var st os.FileInfo - st, err = os.Stat((volumeDir)) + st, err = os.Stat(volumeDir) if err != nil { if os.IsNotExist(err) { return VolInfo{}, errVolumeNotFound diff --git a/cmd/server_test.go b/cmd/server_test.go index 533429600e2a59..e6f9bfe229efdd 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -76,9 +76,7 @@ func verifyError(c *check, response *http.Response, code, description string, st func runAllTests(suite *TestSuiteCommon, c *check) { suite.SetUpSuite(c) suite.TestBucketSQSNotificationWebHook(c) - if suite.serverType == "XL" { - suite.TestObjectDir(c) - } + suite.TestObjectDir(c) suite.TestBucketSQSNotificationAMQP(c) suite.TestBucketPolicy(c) suite.TestDeleteBucket(c) @@ -260,7 +258,7 @@ func (s *TestSuiteCommon) TestObjectDir(c *check) { response, err = client.Do(request) c.Assert(err, nil) - c.Assert(response.StatusCode, http.StatusNotFound) + c.Assert(response.StatusCode, http.StatusOK) request, err = newTestSignedRequest("GET", getGetObjectURL(s.endPoint, bucketName, "my-object-directory/"), 0, nil, s.accessKey, s.secretKey, s.signer) @@ -271,7 +269,7 @@ func (s *TestSuiteCommon) TestObjectDir(c *check) { response, err = client.Do(request) c.Assert(err, nil) - c.Assert(response.StatusCode, http.StatusNotFound) + c.Assert(response.StatusCode, http.StatusOK) request, err = newTestSignedRequest("DELETE", getDeleteObjectURL(s.endPoint, bucketName, "my-object-directory/"), 0, nil, s.accessKey, s.secretKey, s.signer) diff --git a/cmd/xl-v1-object.go b/cmd/xl-v1-object.go index b4b999b1a13612..b8fe2602a8fe5d 100644 --- a/cmd/xl-v1-object.go +++ b/cmd/xl-v1-object.go @@ -33,6 +33,29 @@ import ( // list all errors which can be ignored in object operations. var objectOpIgnoredErrs = append(baseIgnoredErrs, errDiskAccessDenied) +// putObjectDir hints the bottom layer to create a new directory. +func (xl xlObjects) putObjectDir(bucket, object string) error { + var wg = &sync.WaitGroup{} + + errs := make([]error, len(xl.storageDisks)) + // Prepare object creation in a all disks + for index, disk := range xl.storageDisks { + if disk == nil { + continue + } + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + if err := disk.MakeVol(pathJoin(bucket, object)); err != nil && err != errVolumeExists { + errs[index] = err + } + }(index, disk) + } + wg.Wait() + + return reduceWriteQuorumErrs(errs, objectOpIgnoredErrs, xl.writeQuorum) +} + // prepareFile hints the bottom layer to optimize the creation of a new object func (xl xlObjects) prepareFile(bucket, object string, size int64, onlineDisks []StorageAPI, blockSize int64, dataBlocks, writeQuorum int) error { pErrs := make([]error, len(onlineDisks)) @@ -159,6 +182,12 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i return errors.Trace(errUnexpected) } + // If its a directory request, we return an empty body. + if hasSuffix(object, slashSeparator) { + _, err := writer.Write([]byte("")) + return toObjectErr(errors.Trace(err), bucket, object) + } + // Read metadata associated with the object from all disks. metaArr, errs := readAllXLMetadata(xl.storageDisks, bucket, object) @@ -310,21 +339,61 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i return nil } +func (xl xlObjects) getObjectInfoDir(bucket, object string) (oi ObjectInfo, err error) { + var wg = &sync.WaitGroup{} + + errs := make([]error, len(xl.storageDisks)) + // Prepare object creation in a all disks + for index, disk := range xl.storageDisks { + if disk == nil { + continue + } + wg.Add(1) + go func(index int, disk StorageAPI) { + defer wg.Done() + if _, err := disk.StatVol(pathJoin(bucket, object)); err != nil { + // Since we are re-purposing StatVol, an object which + // is a directory if it doesn't exist should be + // returned as errFileNotFound instead, convert + // the error right here accordingly. + if err == errVolumeNotFound { + err = errFileNotFound + } else if err == errVolumeAccessDenied { + err = errFileAccessDenied + } + + // Save error to reduce it later + errs[index] = err + } + }(index, disk) + } + + wg.Wait() + + return dirObjectInfo(bucket, object, 0, map[string]string{}), reduceReadQuorumErrs(errs, objectOpIgnoredErrs, xl.readQuorum) +} + // GetObjectInfo - reads object metadata and replies back ObjectInfo. func (xl xlObjects) GetObjectInfo(bucket, object string) (oi ObjectInfo, e error) { if err := checkGetObjArgs(bucket, object); err != nil { return oi, err } + if hasSuffix(object, slashSeparator) { + return xl.getObjectInfoDir(bucket, object) + } + info, err := xl.getObjectInfo(bucket, object) if err != nil { return oi, toObjectErr(err, bucket, object) } + return info, nil } // getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo. func (xl xlObjects) getObjectInfo(bucket, object string) (objInfo ObjectInfo, err error) { + // Extracts xlStat and xlMetaMap. xlStat, xlMetaMap, err := xl.readXLMetaStat(bucket, object) if err != nil { @@ -446,6 +515,19 @@ func renameObject(disks []StorageAPI, srcBucket, srcObject, dstBucket, dstObject // writes `xl.json` which carries the necessary metadata for future // object operations. func (xl xlObjects) PutObject(bucket string, object string, data *hash.Reader, metadata map[string]string) (objInfo ObjectInfo, err error) { + uniqueID := mustGetUUID() + tempObj := uniqueID + + // No metadata is set, allocate a new one. + if metadata == nil { + metadata = make(map[string]string) + } + + // Delete temporary object in the event of failure. + // If PutObject succeeded there would be no temporary + // object to delete. + defer xl.deleteObject(minioMetaTmpBucket, tempObj) + // This is a special case with size as '0' and object ends with // a slash separator, we treat it like a valid operation and // return success. @@ -456,6 +538,16 @@ func (xl xlObjects) PutObject(bucket string, object string, data *hash.Reader, m if xl.parentDirIsObject(bucket, path.Dir(object)) { return ObjectInfo{}, toObjectErr(errors.Trace(errFileAccessDenied), bucket, object) } + + if err = xl.putObjectDir(minioMetaTmpBucket, tempObj); err != nil { + return ObjectInfo{}, toObjectErr(errors.Trace(err), bucket, object) + } + + // Rename the successfully written temporary object to final location. + if _, err = renameObject(xl.storageDisks, minioMetaTmpBucket, tempObj, bucket, object, xl.writeQuorum); err != nil { + return ObjectInfo{}, toObjectErr(err, bucket, object) + } + return dirObjectInfo(bucket, object, data.Size(), metadata), nil } @@ -476,14 +568,6 @@ func (xl xlObjects) PutObject(bucket string, object string, data *hash.Reader, m return ObjectInfo{}, toObjectErr(errors.Trace(errFileAccessDenied), bucket, object) } - // No metadata is set, allocate a new one. - if metadata == nil { - metadata = make(map[string]string) - } - - uniqueID := mustGetUUID() - tempObj := uniqueID - // Limit the reader to its provided size if specified. var reader io.Reader = data @@ -527,11 +611,6 @@ func (xl xlObjects) PutObject(bucket string, object string, data *hash.Reader, m // Order disks according to erasure distribution onlineDisks := shuffleDisks(xl.storageDisks, partsMetadata[0].Erasure.Distribution) - // Delete temporary object in the event of failure. - // If PutObject succeeded there would be no temporary - // object to delete. - defer xl.deleteObject(minioMetaTmpBucket, tempObj) - // Total size of the written object var sizeWritten int64 diff --git a/cmd/xl-v1-object_test.go b/cmd/xl-v1-object_test.go index b6d3ca4994f5a2..fd0efb819348b1 100644 --- a/cmd/xl-v1-object_test.go +++ b/cmd/xl-v1-object_test.go @@ -75,8 +75,6 @@ func TestXLDeleteObjectBasic(t *testing.T) { {"----", "obj", BucketNameInvalid{Bucket: "----"}}, {"bucket", "", ObjectNameInvalid{Bucket: "bucket", Object: ""}}, {"bucket", "doesnotexist", ObjectNotFound{Bucket: "bucket", Object: "doesnotexist"}}, - {"bucket", "obj/", ObjectNotFound{Bucket: "bucket", Object: "obj/"}}, - {"bucket", "/obj", ObjectNotFound{Bucket: "bucket", Object: "/obj"}}, {"bucket", "obj", nil}, }