Skip to content

Commit

Permalink
satellite/metainfo: fix panic when we batch BeginObjectDelete without
Browse files Browse the repository at this point in the history
all permissions

Without read and list permissions BeginObjectDelete won't return error
if occurs. This was breaking Batch processing because there was
assumption that without error response will be always not nil.

https://storjlabs.atlassian.net/browse/SM-590

Change-Id: I0fc9539e429110a660eb28725b266d5e4771d198
  • Loading branch information
mniewrzal authored and ethanadams committed Apr 3, 2020
1 parent 85af0d4 commit 793be43
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 8 deletions.
24 changes: 18 additions & 6 deletions satellite/metainfo/batch.go
Expand Up @@ -95,7 +95,9 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
ObjectBegin: response,
},
})
lastStreamID = response.StreamId
if response != nil {
lastStreamID = response.StreamId
}
case *pb.BatchRequestItem_ObjectCommit:
singleRequest.ObjectCommit.Header = req.Header

Expand Down Expand Up @@ -154,7 +156,9 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
ObjectGet: response,
},
})
lastStreamID = response.Object.StreamId
if response != nil && response.Object != nil {
lastStreamID = response.Object.StreamId
}
case *pb.BatchRequestItem_ObjectList:
singleRequest.ObjectList.Header = req.Header
response, err := endpoint.ListObjects(ctx, singleRequest.ObjectList)
Expand All @@ -177,7 +181,9 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
ObjectBeginDelete: response,
},
})
lastStreamID = response.StreamId
if response != nil {
lastStreamID = response.StreamId
}
case *pb.BatchRequestItem_ObjectFinishDelete:
singleRequest.ObjectFinishDelete.Header = req.Header

Expand Down Expand Up @@ -211,7 +217,9 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
SegmentBegin: response,
},
})
lastSegmentID = response.SegmentId
if response != nil {
lastSegmentID = response.SegmentId
}
case *pb.BatchRequestItem_SegmentCommit:
singleRequest.SegmentCommit.Header = req.Header

Expand Down Expand Up @@ -289,7 +297,9 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
SegmentDownload: response,
},
})
lastSegmentID = response.SegmentId
if response != nil {
lastSegmentID = response.SegmentId
}
case *pb.BatchRequestItem_SegmentBeginDelete:
singleRequest.SegmentBeginDelete.Header = req.Header

Expand All @@ -306,7 +316,9 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
SegmentBeginDelete: response,
},
})
lastSegmentID = response.SegmentId
if response != nil {
lastSegmentID = response.SegmentId
}
case *pb.BatchRequestItem_SegmentFinishDelete:
singleRequest.SegmentFinishDelete.Header = req.Header

Expand Down
4 changes: 2 additions & 2 deletions satellite/metainfo/metainfo.go
Expand Up @@ -758,7 +758,7 @@ func (endpoint *Endpoint) DeleteBucket(ctx context.Context, req *pb.BucketDelete
if err != nil {
if !canRead && !canList {
// No error info is returned if neither Read, nor List permission is granted
return nil, nil
return &pb.BucketDeleteResponse{}, nil
}
if ErrBucketNotEmpty.Has(err) {
return nil, rpcstatus.Error(rpcstatus.FailedPrecondition, err.Error())
Expand Down Expand Up @@ -1413,7 +1413,7 @@ func (endpoint *Endpoint) BeginDeleteObject(ctx context.Context, req *pb.ObjectB
if err != nil {
if !canRead && !canList {
// No error info is returned if neither Read, nor List permission is granted
return nil, nil
return &pb.ObjectBeginDeleteResponse{}, nil
}
return nil, err
}
Expand Down
46 changes: 46 additions & 0 deletions satellite/metainfo/metainfo_test.go
Expand Up @@ -1359,3 +1359,49 @@ func TestBucketEmptinessBeforeDelete(t *testing.T) {
require.NoError(t, err)
})
}

func TestDeleteBatchWithoutPermission(t *testing.T) {
testplanet.Run(t, testplanet.Config{
SatelliteCount: 1, StorageNodeCount: 0, UplinkCount: 1,
}, func(t *testing.T, ctx *testcontext.Context, planet *testplanet.Planet) {
apiKey := planet.Uplinks[0].APIKey[planet.Satellites[0].ID()]

err := planet.Uplinks[0].CreateBucket(ctx, planet.Satellites[0], "test-bucket")
require.NoError(t, err)

apiKey, err = apiKey.Restrict(macaroon.Caveat{
DisallowLists: true,
DisallowReads: true,
})
require.NoError(t, err)

metainfoClient, err := planet.Uplinks[0].DialMetainfo(ctx, planet.Satellites[0], apiKey)
require.NoError(t, err)
defer ctx.Check(metainfoClient.Close)

responses, err := metainfoClient.Batch(ctx,
// this request was causing panic becase for deleting object
// its possible to return no error and empty response for
// specific set of permissions, see `apiKey.Restrict` from above
&metainfo.BeginDeleteObjectParams{
Bucket: []byte("test-bucket"),
EncryptedPath: []byte("not-existing-object"),
},

// TODO this code should be enabled then issue with read permissions in
// DeleteBucket method currently user have always permission to read bucket
// https://storjlabs.atlassian.net/browse/USR-603
// when it will be fixed commented code from bellow should replace existing DeleteBucketParams
// the same situation like above
// &metainfo.DeleteBucketParams{
// Name: []byte("not-existing-bucket"),
// },

&metainfo.DeleteBucketParams{
Name: []byte("test-bucket"),
},
)
require.NoError(t, err)
require.Equal(t, 2, len(responses))
})
}

0 comments on commit 793be43

Please sign in to comment.