Skip to content

Commit

Permalink
fix: all levels deep flat key match (minio#12996)
Browse files Browse the repository at this point in the history
this addresses a regression from minio#12984
which only addresses flat key from single
level deep at bucket level.

added extra tests as well to cover all
these scenarios.

---

Additionally had to update prefix handling in fs-v1-storage.go in order to make the new test cases run in nas mode.
Prefix-check in fsStorage.WalkDir is now identical to xlStorage.WalkDir.
  • Loading branch information
harshavardhana authored and tristanessquare committed Oct 11, 2021
1 parent 5c6389e commit 1fc3d63
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 10 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,4 @@ Please follow MinIO [Contributor's Guide](https://github.com/minio/minio/blob/ma

# License
Use of MinIO is governed by the GNU AGPLv3 license that can be found in the [LICENSE](https://github.com/minio/minio/blob/master/LICENSE) file.

5 changes: 3 additions & 2 deletions cmd/fs-v1-storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,10 +1372,10 @@ func (s *fsv1Storage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Wr
sort.Strings(entries)

for _, entry := range entries {
if len(prefix) > 0 && !strings.HasPrefix(pathJoin(current, entry), prefix) {
if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) {
continue
}
if len(forward) > 0 && pathJoin(current, entry) < forward {
if len(forward) > 0 && entry < forward {
continue
}

Expand All @@ -1394,6 +1394,7 @@ func (s *fsv1Storage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Wr
if opts.Recursive {
// Scan folder we found. Should be in correct sort order where we are.
forward = ""
prefix = "" // Remove prefix after first level.
entry = entry[:len(entry)-1]
if len(opts.ForwardTo) > 0 && strings.HasPrefix(opts.ForwardTo, pathJoin(current, entry)) {
forward = strings.TrimPrefix(opts.ForwardTo, pathJoin(current, entry))
Expand Down
23 changes: 15 additions & 8 deletions cmd/metacache-walk.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"io"
"net/http"
"net/url"
"os"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -58,16 +57,15 @@ type WalkDirOptions struct {
// WalkDir will traverse a directory and return all entries found.
// On success a sorted meta cache stream will be returned.
// Metadata has data stripped, if any.
func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) error {
func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writer) (err error) {
// Verify if volume is valid and it exists.
volumeDir, err := s.getVolDir(opts.Bucket)
if err != nil {
return err
}

// Stat a volume entry.
_, err = os.Lstat(volumeDir)
if err != nil {
if err = Access(volumeDir); err != nil {
if osIsNotExist(err) {
return errVolumeNotFound
} else if isSysErrIO(err) {
Expand Down Expand Up @@ -100,7 +98,8 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
metadata: xlMetaV2TrimData(metadata),
}
} else {
if st, err := os.Lstat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile)); err == nil && st.Mode().IsRegular() {
st, sterr := Lstat(pathJoin(volumeDir, opts.BaseDir, xlStorageFormatFile))
if sterr == nil && st.Mode().IsRegular() {
return errFileNotFound
}
}
Expand All @@ -125,12 +124,21 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
// Forward some errors?
return nil
}
if len(entries) == 0 {
return nil
}
dirObjects := make(map[string]struct{})
for i, entry := range entries {
if len(prefix) > 0 && !strings.HasPrefix(entry, prefix) {
// Do do not retain the file, since it doesn't
// match the prefix.
entries[i] = ""
continue
}
if len(forward) > 0 && entry < forward {
// Do do not retain the file, since its
// lexially smaller than 'forward'
entries[i] = ""
continue
}
if strings.HasSuffix(entry, slashSeparator) {
Expand Down Expand Up @@ -228,11 +236,9 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
meta.name = strings.TrimSuffix(meta.name, globalDirSuffixWithSlash) + slashSeparator
}
out <- meta
case osIsNotExist(err):
case osIsNotExist(err), isSysErrIsDir(err):
meta.metadata, err = xioutil.ReadFile(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1))
if err == nil {
// Maybe rename? Would make it inconsistent across disks though.
// os.Rename(pathJoin(volumeDir, meta.name, xlStorageFormatFileV1), pathJoin(volumeDir, meta.name, xlStorageFormatFile))
// It was an object
out <- meta
continue
Expand All @@ -251,6 +257,7 @@ func (s *xlStorage) WalkDir(ctx context.Context, opts WalkDirOptions, wr io.Writ
logger.LogIf(ctx, err)
}
}

// If directory entry left on stack, pop it now.
for len(dirStack) > 0 {
pop := dirStack[len(dirStack)-1]
Expand Down
25 changes: 25 additions & 0 deletions cmd/object-api-listobjects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
{testBuckets[4], "file1/guidSplunk-aaaa/file", "content", nil},
{testBuckets[5], "dir/day_id=2017-10-10/issue", "content", nil},
{testBuckets[5], "dir/day_id=2017-10-11/issue", "content", nil},
{testBuckets[5], "foo/201910/1122", "content", nil},
{testBuckets[5], "foo/201910/1112", "content", nil},
{testBuckets[5], "foo/201910/2112", "content", nil},
{testBuckets[5], "foo/201910_txt", "content", nil},
}
for _, object := range testObjects {
md5Bytes := md5.Sum([]byte(object.content))
Expand Down Expand Up @@ -477,6 +481,24 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
IsTruncated: true,
Prefixes: []string{"dir/day_id=2017-10-10/"},
},
// ListObjectsResult-37 list with prefix match 2 levels deep
{
IsTruncated: false,
Objects: []ObjectInfo{
{Name: "foo/201910/1112"},
{Name: "foo/201910/1122"},
},
},
// ListObjectsResult-38 list with prefix match 1 level deep
{
IsTruncated: false,
Objects: []ObjectInfo{
{Name: "foo/201910/1112"},
{Name: "foo/201910/1122"},
{Name: "foo/201910/2112"},
{Name: "foo/201910_txt"},
},
},
}

testCases := []struct {
Expand Down Expand Up @@ -602,6 +624,9 @@ func testListObjects(obj ObjectLayer, instanceType string, t1 TestErrHandler) {
{testBuckets[4], "file1/", "", "guidSplunk", 1000, resultCases[35], nil, true},
// Test listing at prefix with expected prefix markers
{testBuckets[5], "dir/", "", SlashSeparator, 1, resultCases[36], nil, true},
// Test listing with prefix match
{testBuckets[5], "foo/201910/11", "", "", 1000, resultCases[37], nil, true},
{testBuckets[5], "foo/201910", "", "", 1000, resultCases[38], nil, true},
}

for i, testCase := range testCases {
Expand Down

0 comments on commit 1fc3d63

Please sign in to comment.