Skip to content

Commit

Permalink
miniogw: make CompleteMultipartUpload verify uploaded parts
Browse files Browse the repository at this point in the history
This change will make CompleteMultipartUpload verify uploaded parts and
report every mismatch encountered in a better-safe-than-sorry manner
(see storj/edge#43 for why we are paranoid).

Closes storj/edge#43
Closes storj/edge#105

Change-Id: I250330d11059ef11910fee520137ed4ba42a67aa
  • Loading branch information
amwolff committed Feb 3, 2022
1 parent 0067a0b commit a85a953
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 31 deletions.
12 changes: 6 additions & 6 deletions miniogw/config.go
Expand Up @@ -18,10 +18,10 @@ type ServerConfig struct {
// S3CompatibilityConfig is a configuration struct that determines details about
// how strict the gateway should be S3-compatible.
type S3CompatibilityConfig struct {
IncludeCustomMetadataListing bool `help:"include custom metadata in S3's ListObjects, ListObjectsV2 and ListMultipartUploads responses" default:"true"`
MaxKeysLimit int `help:"MaxKeys parameter limit for S3's ListObjects and ListObjectsV2 responses" default:"1000"`
MaxKeysExhaustiveLimit int `help:"maximum number of items to list for gateway-side filtering using arbitrary delimiter/prefix" default:"100000"`
FullyCompatibleListing bool `help:"make ListObjects(V2) fully S3-compatible (specifically: always return lexicographically ordered results) but slow" default:"false"`
DisableCopyObject bool `help:"return 501 (Not Implemented) for CopyObject calls" devDefault:"false" releaseDefault:"true"`
MinPartSize int `help:"minimum part size for multipart uploads" default:"5242880"` // 5 MiB
IncludeCustomMetadataListing bool `help:"include custom metadata in S3's ListObjects, ListObjectsV2 and ListMultipartUploads responses" default:"true"`
MaxKeysLimit int `help:"MaxKeys parameter limit for S3's ListObjects and ListObjectsV2 responses" default:"1000"`
MaxKeysExhaustiveLimit int `help:"maximum number of items to list for gateway-side filtering using arbitrary delimiter/prefix" default:"100000"`
FullyCompatibleListing bool `help:"make ListObjects(V2) fully S3-compatible (specifically: always return lexicographically ordered results) but slow" default:"false"`
DisableCopyObject bool `help:"return 501 (Not Implemented) for CopyObject calls" devDefault:"false" releaseDefault:"true"`
MinPartSize int64 `help:"minimum part size for multipart uploads" default:"5242880"` // 5 MiB
}
74 changes: 50 additions & 24 deletions miniogw/multipart.go
Expand Up @@ -292,30 +292,56 @@ func (layer *gatewayLayer) CompleteMultipartUpload(ctx context.Context, bucket,
return minio.ObjectInfo{}, err
}

// todo: part size validation has been disabled, as we have concerns that
// it's causing problems in production since minio expects CompleteMultipartUpload
// to finish quickly.

// sort.Slice(uploadedParts, func(i, k int) bool {
// return uploadedParts[i].PartNumber < uploadedParts[k].PartNumber
// })

// list := project.ListUploadParts(ctx, bucket, object, uploadID, &uplink.ListUploadPartsOptions{})
// for list.Next() {
// part := list.Item()
// uploadedPart := uploadedParts[int(part.PartNumber)]
// if uploadedPart.ETag != string(part.ETag) {
// return minio.ObjectInfo{}, minio.InvalidPart{PartNumber: int(part.PartNumber), GotETag: uploadedPart.ETag}
// }
// if int(part.PartNumber) != len(uploadedParts)-1 {
// if part.Size < int64(layer.compatibilityConfig.MinPartSize) {
// return minio.ObjectInfo{}, minio.PartTooSmall{PartNumber: int(part.PartNumber), PartSize: part.Size, PartETag: string(part.ETag)}
// }
// }
// }
// if list.Err() != nil {
// return minio.ObjectInfo{}, convertMultipartError(list.Err(), bucket, object, uploadID)
// }
var idx int
list := project.ListUploadParts(ctx, bucket, object, uploadID, nil)
for ; list.Next(); idx++ {
part := list.Item()
// Are we listing past what we received?
if idx >= len(uploadedParts) {
return minio.ObjectInfo{}, minio.InvalidPart{
PartNumber: int(part.PartNumber),
ExpETag: string(part.ETag),
GotETag: "",
}
}
// Is size okay for everything except the last part?
if idx != len(uploadedParts)-1 {
if part.Size < layer.compatibilityConfig.MinPartSize {
return minio.ObjectInfo{}, minio.PartTooSmall{
PartSize: part.Size,
PartNumber: int(part.PartNumber),
PartETag: string(part.ETag),
}
}
}
// Do we agree on the part number?
if uploadedParts[idx].PartNumber != int(part.PartNumber) {
return minio.ObjectInfo{}, minio.InvalidPart{
PartNumber: uploadedParts[idx].PartNumber,
ExpETag: "",
GotETag: uploadedParts[idx].ETag,
}
}
// Do we agree on ETag?
if uploadedParts[idx].ETag != string(part.ETag) {
return minio.ObjectInfo{}, minio.InvalidPart{
PartNumber: int(part.PartNumber),
ExpETag: string(part.ETag),
GotETag: uploadedParts[idx].ETag,
}
}
}
if list.Err() != nil {
return minio.ObjectInfo{}, convertMultipartError(list.Err(), bucket, object, uploadID)
}

if len(uploadedParts) > idx { // We didn't list enough
return minio.ObjectInfo{}, minio.InvalidPart{
PartNumber: uploadedParts[idx].PartNumber, // Condition guarantees safe access
ExpETag: "", // We expected nothing
GotETag: uploadedParts[idx].ETag,
}
}

etag := minio.ComputeCompleteMultipartMD5(uploadedParts)

Expand Down
207 changes: 206 additions & 1 deletion testsuite/miniogw/gateway_test.go
Expand Up @@ -1954,7 +1954,7 @@ func listBucketObjects(ctx context.Context, listObjects listObjectsFunc, layer m
}

func testListObjectsLimits(t *testing.T, listObjects listObjectsFunc) {
runTest(t, func(t *testing.T, ctx context.Context, layer minio.ObjectLayer, project *uplink.Project) {
runTest(t, func(t *testing.T, ctx context.Context, _ minio.ObjectLayer, project *uplink.Project) {
testBucketInfo, err := project.CreateBucket(ctx, testBucket)
require.NoError(t, err)

Expand Down Expand Up @@ -2857,6 +2857,211 @@ func TestCompleteMultipartUpload(t *testing.T) {
})
}

func TestCompleteMultipartUploadPartNumberETagChecks(t *testing.T) {
t.Parallel()

runTest(t, func(t *testing.T, ctx context.Context, layer minio.ObjectLayer, project *uplink.Project) {
for _, tt := range [...]struct {
name string
expectedPartNumbers []int
receivedPartNumbers []int
receivedETags []string
err error
}{
{
name: "ok",
expectedPartNumbers: []int{1, 2},
receivedPartNumbers: []int{1, 2},
receivedETags: []string{"098f6bcd4621d373cade4e832627b4f6", "098f6bcd4621d373cade4e832627b4f6"},
err: nil,
},
{
name: "more expected than received parts I",
expectedPartNumbers: []int{1, 2},
receivedPartNumbers: []int{1},
receivedETags: []string{"098f6bcd4621d373cade4e832627b4f6"},
err: minio.InvalidPart{PartNumber: 2, ExpETag: "098f6bcd4621d373cade4e832627b4f6", GotETag: ""},
},
{
name: "more expected than received parts II",
expectedPartNumbers: []int{1},
receivedPartNumbers: nil,
receivedETags: nil,
err: minio.InvalidPart{PartNumber: 1, ExpETag: "098f6bcd4621d373cade4e832627b4f6", GotETag: ""},
},
{
name: "invalid part number",
expectedPartNumbers: []int{1, 2},
receivedPartNumbers: []int{8, 9},
receivedETags: []string{"098f6bcd4621d373cade4e832627b4f6", "098f6bcd4621d373cade4e832627b4f6"},
err: minio.InvalidPart{PartNumber: 8, ExpETag: "", GotETag: "098f6bcd4621d373cade4e832627b4f6"},
},
{
name: "invalid etag",
expectedPartNumbers: []int{1, 2, 3, 4, 5},
receivedPartNumbers: []int{1, 2, 3, 4, 5},
receivedETags: []string{"shine", "on", "you", "crazy", "etag"},
err: minio.InvalidPart{PartNumber: 1, ExpETag: "098f6bcd4621d373cade4e832627b4f6", GotETag: "shine"},
},
{
name: "more received than expected parts I",
expectedPartNumbers: []int{1},
receivedPartNumbers: []int{1, 2},
receivedETags: []string{"098f6bcd4621d373cade4e832627b4f6", "098f6bcd4621d373cade4e832627b4f6"},
err: minio.InvalidPart{PartNumber: 2, ExpETag: "", GotETag: "098f6bcd4621d373cade4e832627b4f6"},
},
{
name: "more received than expected parts II",
expectedPartNumbers: []int{},
receivedPartNumbers: []int{1},
receivedETags: []string{"098f6bcd4621d373cade4e832627b4f6"},
err: minio.InvalidPart{PartNumber: 1, ExpETag: "", GotETag: "098f6bcd4621d373cade4e832627b4f6"},
},
{
name: "all over the place I",
expectedPartNumbers: []int{1, 2},
receivedPartNumbers: []int{3, 4, 99},
receivedETags: []string{"acerola", "apple", "apricots", "avocado"},
err: minio.InvalidPart{PartNumber: 3, ExpETag: "", GotETag: "acerola"},
},
{
name: "all over the place II",
expectedPartNumbers: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
receivedPartNumbers: []int{1, 2, 9000},
receivedETags: []string{"", "", "3"},
err: minio.InvalidPart{PartNumber: 1, ExpETag: "098f6bcd4621d373cade4e832627b4f6", GotETag: ""},
},
{
name: "all over the place III",
expectedPartNumbers: []int{9000, 2, 1},
receivedPartNumbers: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
receivedETags: []string{"098f6bcd4621d373cade4e832627b4f6", "2", ""},
err: minio.InvalidPart{PartNumber: 2, ExpETag: "098f6bcd4621d373cade4e832627b4f6", GotETag: "2"},
},
} {
bucket, err := project.CreateBucket(ctx, testrand.BucketName())
require.NoError(t, err, tt.name)
object := testrand.Path()
uploadID, err := layer.NewMultipartUpload(ctx, bucket.Name, object, minio.ObjectOptions{})
require.NoError(t, err, tt.name)
defer func(name string) {
if err = layer.AbortMultipartUpload(ctx, bucket.Name, object, uploadID, minio.ObjectOptions{}); err != nil {
assert.ErrorIs(t, err, minio.InvalidUploadID{Bucket: bucket.Name, Object: object, UploadID: uploadID}, name)
}
}(tt.name)

for _, n := range tt.expectedPartNumbers {
_, err = layer.PutObjectPart(ctx, bucket.Name, object, uploadID, n, newMinioPutObjReader(t), minio.ObjectOptions{})
require.NoError(t, err, tt.name)
}

var uploadedParts []minio.CompletePart
for i, n := range tt.receivedPartNumbers {
p := minio.CompletePart{
PartNumber: n,
}
if len(tt.receivedETags) > i {
p.ETag = tt.receivedETags[i]
}
uploadedParts = append(uploadedParts, p)
}

_, err = layer.CompleteMultipartUpload(ctx, bucket.Name, object, uploadID, uploadedParts, minio.ObjectOptions{})
if tt.err != nil {
assert.ErrorIs(t, err, tt.err, tt.name)
} else {
require.NoError(t, err, tt.name)
}
}
})
}

func TestCompleteMultipartUploadSizeCheck(t *testing.T) {
t.Parallel()

runTest(t, func(t *testing.T, ctx context.Context, _ minio.ObjectLayer, project *uplink.Project) {
// Reconfigure the layer so we can hit MinPartSize limit.
c := miniogw.S3CompatibilityConfig{MinPartSize: 50 * memory.KiB.Int64()}
l, err := miniogw.NewStorjGateway(c).NewGatewayLayer(auth.Credentials{})
require.NoError(t, err)

defer func() { require.NoError(t, l.Shutdown(ctx)) }()

for _, tt := range [...]struct {
name string
sizes []memory.Size
err error
}{
{
name: "all parts meet or are above the size threshold",
sizes: []memory.Size{50 * memory.KiB, 51 * memory.KiB, 52 * memory.KiB},
err: nil,
},
{
name: "the last part is below the size threshold",
sizes: []memory.Size{51 * memory.KiB, 52 * memory.KiB, 1 * memory.KiB},
err: nil,
},
{
name: "the middle part is below the size threshold",
sizes: []memory.Size{51 * memory.KiB, 0 * memory.KiB, 53 * memory.KiB},
// In tests, we rewrite this 0 KiB part with a part of size 7
// (still under the threshold) and the following ETag.
err: minio.PartTooSmall{PartNumber: 2, PartSize: 7, PartETag: "d69b751558e2033dd8e63fa124676d5a"},
},
{
name: "only one part and meets the size threshold",
sizes: []memory.Size{50 * memory.KiB},
err: nil,
},
{
name: "only one part and is above the size threshold",
sizes: []memory.Size{1123 * memory.KiB},
err: nil,
},
{
name: "only one part and is below the size threshold",
sizes: []memory.Size{25 * memory.KiB},
err: nil,
},
} {
bucket, err := project.CreateBucket(ctx, testrand.BucketName())
require.NoError(t, err, tt.name)
object := testrand.Path()
uploadID, err := l.NewMultipartUpload(ctx, bucket.Name, object, minio.ObjectOptions{})
require.NoError(t, err, tt.name)
defer func(name string) {
if err = l.AbortMultipartUpload(ctx, bucket.Name, object, uploadID, minio.ObjectOptions{}); err != nil {
assert.ErrorIs(t, err, minio.InvalidUploadID{Bucket: bucket.Name, Object: object, UploadID: uploadID}, name)
}
}(tt.name)

var uploadedParts []minio.CompletePart
for i, n := range tt.sizes {
var b []byte
if n == 0 {
b = []byte("3.14159")
} else {
b = testrand.Bytes(n)
}
h, err := hash.NewReader(bytes.NewReader(b), int64(len(b)), md5Hex(b), sha256Hex(b), int64(len(b)), true)
require.NoError(t, err, tt.name)
r := minio.NewPutObjReader(h, nil, nil)
p, err := l.PutObjectPart(ctx, bucket.Name, object, uploadID, i+1, r, minio.ObjectOptions{})
require.NoError(t, err, tt.name)
uploadedParts = append(uploadedParts, minio.CompletePart{PartNumber: p.PartNumber, ETag: p.ETag})
}

_, err = l.CompleteMultipartUpload(ctx, bucket.Name, object, uploadID, uploadedParts, minio.ObjectOptions{})
if tt.err != nil {
assert.ErrorIs(t, err, tt.err, tt.name)
} else {
require.NoError(t, err, tt.name)
}
}
})
}

func TestDeleteObjectWithNoReadOrListPermission(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit a85a953

Please sign in to comment.