Skip to content

Commit

Permalink
satellite/metainfo: use VersionCursor for listing
Browse files Browse the repository at this point in the history
ListObjects request should be using VersionCursor for correct
results paging.

#6545

Change-Id: I3f310af183aa72ddf7605feb87dc6f360fc9a24e
  • Loading branch information
mniewrzal committed Dec 13, 2023
1 parent ffb0eb3 commit f0c0787
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 10 deletions.
34 changes: 24 additions & 10 deletions private/testplanet/uplink.go
Expand Up @@ -25,6 +25,7 @@ import (
"storj.io/storj/satellite/console"
"storj.io/uplink"
"storj.io/uplink/private/metaclient"
"storj.io/uplink/private/object"
"storj.io/uplink/private/piecestore"
"storj.io/uplink/private/testuplink"
)
Expand Down Expand Up @@ -253,11 +254,21 @@ func (client *Uplink) OpenProject(ctx context.Context, satellite *Satellite) (_
// Upload data to specific satellite.
func (client *Uplink) Upload(ctx context.Context, satellite *Satellite, bucket string, path storj.Path, data []byte) (err error) {
defer mon.Task()(&ctx)(&err)
return client.UploadWithExpiration(ctx, satellite, bucket, path, data, time.Time{})
return errs.Wrap(client.UploadWithExpiration(ctx, satellite, bucket, path, data, time.Time{}))
}

// UploadWithExpiration data to specific satellite and expiration time.
func (client *Uplink) UploadWithExpiration(ctx context.Context, satellite *Satellite, bucketName string, path storj.Path, data []byte, expiration time.Time) (err error) {
func (client *Uplink) UploadWithExpiration(ctx context.Context, satellite *Satellite, bucketName string, key string, data []byte, expiration time.Time) (err error) {
defer mon.Task()(&ctx)(&err)

_, err = client.UploadWithOptions(ctx, satellite, bucketName, key, data, &uplink.UploadOptions{
Expires: expiration,
})
return errs.Wrap(err)
}

// UploadWithOptions uploads data to specific satellite, with defined options.
func (client *Uplink) UploadWithOptions(ctx context.Context, satellite *Satellite, bucketName, key string, data []byte, options *uplink.UploadOptions) (obj *object.VersionedObject, err error) {
defer mon.Task()(&ctx)(&err)

_, found := testuplink.GetMaxSegmentSize(ctx)
Expand All @@ -267,30 +278,33 @@ func (client *Uplink) UploadWithExpiration(ctx context.Context, satellite *Satel

project, err := client.GetProject(ctx, satellite)
if err != nil {
return errs.Wrap(err)
return nil, errs.Wrap(err)
}
defer func() { err = errs.Combine(err, project.Close()) }()

_, err = project.EnsureBucket(ctx, bucketName)
if err != nil {
return errs.Wrap(err)
return nil, errs.Wrap(err)
}

upload, err := project.UploadObject(ctx, bucketName, path, &uplink.UploadOptions{
Expires: expiration,
})
upload, err := object.UploadObject(ctx, project, bucketName, key, options)
if err != nil {
return errs.Wrap(err)
return nil, errs.Wrap(err)
}

_, err = io.Copy(upload, bytes.NewReader(data))
if err != nil {
abortErr := upload.Abort()
err = errs.Combine(err, abortErr)
return errs.Wrap(err)
return nil, errs.Wrap(err)
}

err = upload.Commit()
if err != nil {
return nil, errs.Wrap(err)
}

return upload.Commit()
return upload.Info(), nil
}

// Download data from specific satellite.
Expand Down
8 changes: 8 additions & 0 deletions satellite/metainfo/endpoint_object.go
Expand Up @@ -959,6 +959,14 @@ func (endpoint *Endpoint) ListObjects(ctx context.Context, req *pb.ObjectListReq
cursorVersion = metabase.MaxVersion
}

if len(req.VersionCursor) != 0 {
version, err := metabase.VersionFromBytes(req.VersionCursor)
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
}
cursorVersion = version
}

includeCustomMetadata := true
includeSystemMetadata := true
if req.UseObjectIncludes {
Expand Down
53 changes: 53 additions & 0 deletions satellite/metainfo/endpoint_object_test.go
Expand Up @@ -2939,5 +2939,58 @@ func TestEndpoint_Object_No_StorageNodes_Versioning(t *testing.T) {
require.Len(t, lposResponse.Items, 1)
require.Equal(t, response.StreamId.Bytes(), lposResponse.Items[0].StreamId.Bytes())
})

t.Run("listing objects, all versions, version cursor handling", func(t *testing.T) {
defer ctx.Check(deleteBucket(bucketName))

require.NoError(t, createBucket(bucketName))
require.NoError(t, planet.Satellites[0].API.Buckets.Service.EnableBucketVersioning(ctx, []byte(bucketName), projectID))

expectedVersions := [][]byte{}
for i := 0; i < 5; i++ {
object, err := planet.Uplinks[0].UploadWithOptions(ctx, satelliteSys, bucketName, "objectA", testrand.Bytes(100), nil)
require.NoError(t, err)
expectedVersions = append(expectedVersions, object.Version)
}

objects, err := planet.Satellites[0].Metabase.DB.TestingAllObjects(ctx)
require.NoError(t, err)
require.NotEmpty(t, objects)

listObjectVersions := func(version []byte, limit int) *pb.ListObjectsResponse {
response, err := satelliteSys.API.Metainfo.Endpoint.ListObjects(ctx, &pb.ListObjectsRequest{
Header: &pb.RequestHeader{ApiKey: apiKey},
Bucket: []byte(bucketName),
// all objects have the same key but different versions
EncryptedCursor: []byte(objects[0].ObjectKey),
VersionCursor: version,
IncludeAllVersions: true,
Limit: int32(limit),
})
require.NoError(t, err)
return response
}

for i, version := range expectedVersions {
response := listObjectVersions(version, 0)
require.Len(t, response.Items, len(expectedVersions)-i-1)

versions := [][]byte{}
for _, item := range response.Items {
versions = append(versions, item.ObjectVersion)
}

require.Equal(t, expectedVersions[i+1:], versions)
}

response := listObjectVersions(expectedVersions[0], 2)
require.NoError(t, err)
require.Len(t, response.Items, 2)
require.True(t, response.More)

response = listObjectVersions(expectedVersions[2], 2)
require.Len(t, response.Items, 2)
require.False(t, response.More)
})
})
}

0 comments on commit f0c0787

Please sign in to comment.