Skip to content

Commit

Permalink
azure: handle list objects properly (#4953)
Browse files Browse the repository at this point in the history
When removing `minio.sys.tmp` prefixed entries and objects/prefixes is
empty, populate till we get all valid entries.
  • Loading branch information
balamurugana authored and deekoder committed Sep 29, 2017
1 parent ce2d185 commit 60cc618
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 74 deletions.
2 changes: 1 addition & 1 deletion cmd/gateway-azure-anonymous.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (a *azureObjects) AnonListObjects(bucket, prefix, marker, delimiter string,
}

// AnonListObjectsV2 - List objects in V2 mode, anonymously
func (a *azureObjects) AnonListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (result ListObjectsV2Info, err error) {
func (a *azureObjects) AnonListObjectsV2(bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) {
params := storage.ListBlobsParameters{
Prefix: prefix,
Marker: continuationToken,
Expand Down
119 changes: 54 additions & 65 deletions cmd/gateway-azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,84 +385,73 @@ func (a *azureObjects) DeleteBucket(bucket string) error {
// ListObjects - lists all blobs on azure with in a container filtered by prefix
// and marker, uses Azure equivalent ListBlobs.
func (a *azureObjects) ListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error) {
var objects []ObjectInfo
var prefixes []string
container := a.client.GetContainerReference(bucket)
resp, err := container.ListBlobs(storage.ListBlobsParameters{
Prefix: prefix,
Marker: marker,
Delimiter: delimiter,
MaxResults: uint(maxKeys),
})
if err != nil {
return result, azureToObjectError(traceError(err), bucket, prefix)
}
result.IsTruncated = resp.NextMarker != ""
result.NextMarker = resp.NextMarker
for _, object := range resp.Blobs {
if strings.HasPrefix(object.Name, globalMinioSysTmp) {
continue
}
result.Objects = append(result.Objects, ObjectInfo{
Bucket: bucket,
Name: object.Name,
ModTime: time.Time(object.Properties.LastModified),
Size: object.Properties.ContentLength,
ETag: azureToS3ETag(object.Properties.Etag),
ContentType: object.Properties.ContentType,
ContentEncoding: object.Properties.ContentEncoding,
for len(objects) == 0 && len(prefixes) == 0 {
resp, err := container.ListBlobs(storage.ListBlobsParameters{
Prefix: prefix,
Marker: marker,
Delimiter: delimiter,
MaxResults: uint(maxKeys),
})
}
if err != nil {
return result, azureToObjectError(traceError(err), bucket, prefix)
}

for _, object := range resp.Blobs {
if strings.HasPrefix(object.Name, globalMinioSysTmp) {
continue
}
objects = append(objects, ObjectInfo{
Bucket: bucket,
Name: object.Name,
ModTime: time.Time(object.Properties.LastModified),
Size: object.Properties.ContentLength,
ETag: azureToS3ETag(object.Properties.Etag),
ContentType: object.Properties.ContentType,
ContentEncoding: object.Properties.ContentEncoding,
})
}

// Remove minio.sys.tmp prefix.
for _, prefix := range resp.BlobPrefixes {
if prefix != globalMinioSysTmp {
prefixes = append(prefixes, prefix)
}
}

// Remove minio.sys.tmp prefix.
for i, prefix := range resp.BlobPrefixes {
if prefix == globalMinioSysTmp {
resp.BlobPrefixes = append(resp.BlobPrefixes[:i], resp.BlobPrefixes[i+1:]...)
marker = resp.NextMarker
if resp.NextMarker == "" {
break
}
}
result.Prefixes = resp.BlobPrefixes

result.Objects = objects
result.Prefixes = prefixes
result.NextMarker = marker
result.IsTruncated = (marker != "")
return result, nil
}

// ListObjectsV2 - list all blobs in Azure bucket filtered by prefix
func (a *azureObjects) ListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (result ListObjectsV2Info, err error) {
container := a.client.GetContainerReference(bucket)
resp, err := container.ListBlobs(storage.ListBlobsParameters{
Prefix: prefix,
Marker: continuationToken,
Delimiter: delimiter,
MaxResults: uint(maxKeys),
})
if err != nil {
return result, azureToObjectError(traceError(err), bucket, prefix)
}
// If NextMarker is not empty, this means response is truncated and NextContinuationToken should be set
if resp.NextMarker != "" {
result.IsTruncated = true
result.NextContinuationToken = resp.NextMarker
}
for _, object := range resp.Blobs {
if strings.HasPrefix(object.Name, globalMinioSysTmp) {
continue
}
result.Objects = append(result.Objects, ObjectInfo{
Bucket: bucket,
Name: object.Name,
ModTime: time.Time(object.Properties.LastModified),
Size: object.Properties.ContentLength,
ETag: azureToS3ETag(object.Properties.Etag),
ContentType: object.Properties.ContentType,
ContentEncoding: object.Properties.ContentEncoding,
})
func (a *azureObjects) ListObjectsV2(bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error) {
marker := continuationToken
if startAfter != "" {
marker = startAfter
}

// Remove minio.sys.tmp prefix.
for i, prefix := range resp.BlobPrefixes {
if prefix == globalMinioSysTmp {
resp.BlobPrefixes = append(resp.BlobPrefixes[:i], resp.BlobPrefixes[i+1:]...)
break
}
var resultV1 ListObjectsInfo
resultV1, err = a.ListObjects(bucket, prefix, marker, delimiter, maxKeys)
if err != nil {
return result, err
}
result.Prefixes = resp.BlobPrefixes

result.Objects = resultV1.Objects
result.Prefixes = resultV1.Prefixes
result.ContinuationToken = continuationToken
result.NextContinuationToken = resultV1.NextMarker
result.IsTruncated = (resultV1.NextMarker != "")
return result, nil
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/gateway-gcs-anonymous.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (l *gcsGateway) AnonListObjects(bucket string, prefix string, marker string
}

// AnonListObjectsV2 - List objects in V2 mode, anonymously
func (l *gcsGateway) AnonListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (ListObjectsV2Info, error) {
func (l *gcsGateway) AnonListObjectsV2(bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (ListObjectsV2Info, error) {
// Request V1 List Object to the backend
result, err := l.anonClient.ListObjects(bucket, prefix, continuationToken, delimiter, maxKeys)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions cmd/gateway-gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,8 +562,7 @@ func (l *gcsGateway) ListObjects(bucket string, prefix string, marker string, de
}

// ListObjectsV2 - lists all blobs in GCS bucket filtered by prefix
func (l *gcsGateway) ListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool,
delimiter string, maxKeys int) (ListObjectsV2Info, error) {
func (l *gcsGateway) ListObjectsV2(bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (ListObjectsV2Info, error) {

it := l.client.Bucket(bucket).Objects(l.ctx, &storage.Query{
Delimiter: delimiter,
Expand Down
2 changes: 1 addition & 1 deletion cmd/gateway-handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ func (api gatewayAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *htt
// Inititate a list objects operation based on the input params.
// On success would return back ListObjectsV2Info object to be
// serialized as XML and sent as S3 compatible response body.
listObjectsV2Info, err := listObjectsV2(bucket, prefix, token, fetchOwner, delimiter, maxKeys)
listObjectsV2Info, err := listObjectsV2(bucket, prefix, token, delimiter, maxKeys, fetchOwner, startAfter)
if err != nil {
errorIf(err, "Unable to list objects. Args to listObjectsV2 are bucket=%s, prefix=%s, token=%s, delimiter=%s", bucket, prefix, token, delimiter)
writeErrorResponse(w, toAPIErrorCode(err), r.URL)
Expand Down
4 changes: 2 additions & 2 deletions cmd/gateway-router.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type GatewayLayer interface {
GetBucketPolicies(string) (policy.BucketAccessPolicy, error)
DeleteBucketPolicies(string) error
AnonListObjects(bucket, prefix, marker, delimiter string, maxKeys int) (result ListObjectsInfo, err error)
AnonListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (result ListObjectsV2Info, err error)
ListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (result ListObjectsV2Info, err error)
AnonListObjectsV2(bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error)
ListObjectsV2(bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (result ListObjectsV2Info, err error)
AnonGetBucketInfo(bucket string) (bucketInfo BucketInfo, err error)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/gateway-s3-anonymous.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (l *s3Objects) AnonListObjects(bucket string, prefix string, marker string,
}

// AnonListObjectsV2 - List objects in V2 mode, anonymously
func (l *s3Objects) AnonListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (loi ListObjectsV2Info, e error) {
func (l *s3Objects) AnonListObjectsV2(bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (loi ListObjectsV2Info, e error) {
result, err := l.anonClient.ListObjectsV2(bucket, prefix, continuationToken, fetchOwner, delimiter, maxKeys)
if err != nil {
return loi, s3ToObjectError(traceError(err), bucket)
Expand Down
2 changes: 1 addition & 1 deletion cmd/gateway-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (l *s3Objects) ListObjects(bucket string, prefix string, marker string, del
}

// ListObjectsV2 lists all blobs in S3 bucket filtered by prefix
func (l *s3Objects) ListObjectsV2(bucket, prefix, continuationToken string, fetchOwner bool, delimiter string, maxKeys int) (loi ListObjectsV2Info, e error) {
func (l *s3Objects) ListObjectsV2(bucket, prefix, continuationToken, delimiter string, maxKeys int, fetchOwner bool, startAfter string) (loi ListObjectsV2Info, e error) {
result, err := l.Client.ListObjectsV2(bucket, prefix, continuationToken, fetchOwner, delimiter, maxKeys)
if err != nil {
return loi, s3ToObjectError(traceError(err), bucket)
Expand Down

0 comments on commit 60cc618

Please sign in to comment.