Skip to content

Commit

Permalink
satellite/metainfo: usage limits for copy
Browse files Browse the repository at this point in the history
Previously there was no realtime administration of the storage usage
during copies. Now there is.

Closes #4719

Change-Id: I0d536bf551d16208116c3aceac89ed590ec473bf
  • Loading branch information
Erikvv committed Jul 25, 2022
1 parent 6df867b commit b5fc04a
Show file tree
Hide file tree
Showing 11 changed files with 314 additions and 18 deletions.
14 changes: 14 additions & 0 deletions private/testplanet/uplink.go
Expand Up @@ -336,6 +336,20 @@ func (client *Uplink) DeleteObject(ctx context.Context, satellite *Satellite, bu
return err
}

// CopyObject copies an object.
func (client *Uplink) CopyObject(ctx context.Context, satellite *Satellite, oldBucket, oldKey, newBucket, newKey string) (err error) {
defer mon.Task()(&ctx)(&err)

project, err := client.GetProject(ctx, satellite)
if err != nil {
return err
}
defer func() { err = errs.Combine(err, project.Close()) }()

_, err = project.CopyObject(ctx, oldBucket, oldKey, newBucket, newKey, nil)
return err
}

// CreateBucket creates a new bucket.
func (client *Uplink) CreateBucket(ctx context.Context, satellite *Satellite, bucketName string) (err error) {
defer mon.Task()(&ctx)(&err)
Expand Down
6 changes: 6 additions & 0 deletions satellite/accounting/db.go
Expand Up @@ -277,6 +277,9 @@ type Cache interface {
GetProjectBandwidthUsage(ctx context.Context, projectID uuid.UUID, now time.Time) (currentUsed int64, err error)
// GetProjectSegmentUsage returns the project's segment usage.
GetProjectSegmentUsage(ctx context.Context, projectID uuid.UUID) (currentUsed int64, err error)
// AddProjectSegmentUsageUpToLimit increases segment usage up to the limit.
// If the limit is exceeded, the usage is not increased and accounting.ErrProjectLimitExceeded is returned.
AddProjectSegmentUsageUpToLimit(ctx context.Context, projectID uuid.UUID, increment int64, segmentLimit int64) error
// InsertProjectBandwidthUsage inserts a project bandwidth usage if it
// doesn't exist. It returns true if it's inserted, otherwise false.
InsertProjectBandwidthUsage(ctx context.Context, projectID uuid.UUID, value int64, ttl time.Duration, now time.Time) (inserted bool, _ error)
Expand All @@ -292,6 +295,9 @@ type Cache interface {
// The projectID is inserted to the spaceUsed when it doesn't exists, hence
// this method will never return ErrKeyNotFound.
AddProjectStorageUsage(ctx context.Context, projectID uuid.UUID, spaceUsed int64) error
// AddProjectStorageUsageUpToLimit increases storage usage up to the limit.
// If the limit is exceeded, the usage is not increased and accounting.ErrProjectLimitExceeded is returned.
AddProjectStorageUsageUpToLimit(ctx context.Context, projectID uuid.UUID, increment int64, spaceLimit int64) error
// GetAllProjectTotals return the total projects' storage and segments used space.
GetAllProjectTotals(ctx context.Context) (map[uuid.UUID]Usage, error)
// Close the client, releasing any open resources. Once it's called any other
Expand Down
52 changes: 52 additions & 0 deletions satellite/accounting/live/redis.go
Expand Up @@ -165,6 +165,33 @@ func (cache *redisLiveAccounting) UpdateProjectSegmentUsage(ctx context.Context,
return nil
}

// AddProjectSegmentUsageUpToLimit increases segment usage up to the limit.
// If the limit is exceeded, the usage is not increased and accounting.ErrProjectLimitExceeded is returned.
func (cache *redisLiveAccounting) AddProjectSegmentUsageUpToLimit(ctx context.Context, projectID uuid.UUID, increment int64, segmentLimit int64) (err error) {
defer mon.Task()(&ctx, projectID, increment)(&err)

key := createSegmentProjectIDKey(projectID)

// do a blind increment and checking the limit afterwards,
// so that the success path has only one round-trip.
newSegmentUsage, err := cache.client.IncrBy(ctx, key, increment).Result()
if err != nil {
return accounting.ErrSystemOrNetError.New("Redis incrby failed: %w", err)
}

if newSegmentUsage > segmentLimit {
// roll back
_, err = cache.client.DecrBy(ctx, key, increment).Result()
if err != nil {
return accounting.ErrSystemOrNetError.New("Redis decrby failed: %w", err)
}

return accounting.ErrProjectLimitExceeded.New("Additional %d segments exceed project limit of %d", increment, segmentLimit)
}

return nil
}

// AddProjectStorageUsage lets the live accounting know that the given
// project has just added spaceUsed bytes of storage (from the user's
// perspective; i.e. segment size).
Expand All @@ -179,6 +206,31 @@ func (cache *redisLiveAccounting) AddProjectStorageUsage(ctx context.Context, pr
return nil
}

// AddProjectStorageUsageUpToLimit increases storage usage up to the limit.
// If the limit is exceeded, the usage is not increased and accounting.ErrProjectLimitExceeded is returned.
func (cache *redisLiveAccounting) AddProjectStorageUsageUpToLimit(ctx context.Context, projectID uuid.UUID, increment int64, spaceLimit int64) (err error) {
defer mon.Task()(&ctx, projectID, increment)(&err)

// do a blind increment and checking the limit afterwards,
// so that the success path has only one round-trip.
newSpaceUsage, err := cache.client.IncrBy(ctx, string(projectID[:]), increment).Result()
if err != nil {
return accounting.ErrSystemOrNetError.New("Redis incrby failed: %w", err)
}

if newSpaceUsage > spaceLimit {
// roll back
_, err = cache.client.DecrBy(ctx, string(projectID[:]), increment).Result()
if err != nil {
return accounting.ErrSystemOrNetError.New("Redis decrby failed: %w", err)
}

return accounting.ErrProjectLimitExceeded.New("Additional storage of %d bytes exceeds project limit of %d", increment, spaceLimit)
}

return nil
}

// GetAllProjectTotals iterates through the live accounting DB and returns a map of project IDs and totals, amount of segments.
//
// TODO (https://storjlabs.atlassian.net/browse/IN-173): see if it possible to
Expand Down
42 changes: 35 additions & 7 deletions satellite/accounting/projectusage.go
Expand Up @@ -21,6 +21,9 @@ var mon = monkit.Package()
// ErrProjectUsage general error for project usage.
var ErrProjectUsage = errs.Class("project usage")

// ErrProjectLimitExceeded is used when the configured limits of a project are reached.
var ErrProjectLimitExceeded = errs.Class("project limit")

// Service is handling project usage related logic.
//
// architecture: Service
Expand Down Expand Up @@ -115,7 +118,10 @@ type UploadLimit struct {
}

// ExceedsUploadLimits returns combined checks for storage and segment limits.
func (usage *Service) ExceedsUploadLimits(ctx context.Context, projectID uuid.UUID, checkSegmentsLimit bool) (limit UploadLimit, err error) {
// Supply nonzero headroom parameters to check if there is room for a new object.
func (usage *Service) ExceedsUploadLimits(
ctx context.Context, projectID uuid.UUID, storageSizeHeadroom int64, segmentCountHeadroom int64, checkSegmentsLimit bool,
) (limit UploadLimit, err error) {
defer mon.Task()(&ctx)(&err)

var group errgroup.Group
Expand Down Expand Up @@ -154,15 +160,37 @@ func (usage *Service) ExceedsUploadLimits(ctx context.Context, projectID uuid.UU
return UploadLimit{}, ErrProjectUsage.Wrap(err)
}

if segmentUsage >= limit.SegmentsLimit {
limit.ExceedsSegments = true
limit.ExceedsSegments = (segmentUsage + segmentCountHeadroom) > limit.SegmentsLimit
limit.ExceedsStorage = (storageUsage + storageSizeHeadroom) > limit.StorageLimit.Int64()

return limit, nil
}

// AddProjectUsageUpToLimit increases segment and storage usage up to the projects limit.
// If the limit is exceeded, neither usage is increased and accounting.ErrProjectLimitExceeded is returned.
func (usage *Service) AddProjectUsageUpToLimit(ctx context.Context, projectID uuid.UUID, storage int64, segments int64) (err error) {
defer mon.Task()(&ctx, projectID)(&err)

limits, err := usage.projectLimitCache.GetProjectLimits(ctx, projectID)
if err != nil {
return err
}

if storageUsage >= limit.StorageLimit.Int64() {
limit.ExceedsStorage = true
err = usage.liveAccounting.AddProjectStorageUsageUpToLimit(ctx, projectID, storage, *limits.Usage)
if err != nil {
return err
}

return limit, nil
err = usage.liveAccounting.AddProjectSegmentUsageUpToLimit(ctx, projectID, segments, *limits.Segments)
if ErrProjectLimitExceeded.Has(err) {
// roll back storage increase
err = usage.liveAccounting.AddProjectStorageUsage(ctx, projectID, -1*storage)
if err != nil {
return err
}
}

return err
}

// GetProjectStorageTotals returns total amount of storage used by project.
Expand Down Expand Up @@ -238,7 +266,7 @@ func (usage *Service) GetProjectBandwidthUsage(ctx context.Context, projectID uu
// UpdateProjectBandwidthUsage increments the bandwidth cache key for a specific project.
//
// It can return one of the following errors returned by
// storj.io/storj/satellite/accounting.Cache.UpdatProjectBandwidthUsage, wrapped
// storj.io/storj/satellite/accounting.Cache.UpdateProjectBandwidthUsage, wrapped
// by ErrProjectUsage.
func (usage *Service) UpdateProjectBandwidthUsage(ctx context.Context, projectID uuid.UUID, increment int64) (err error) {
return usage.liveAccounting.UpdateProjectBandwidthUsage(ctx, projectID, increment, usage.bandwidthCacheTTL, usage.nowFn())
Expand Down
2 changes: 1 addition & 1 deletion satellite/accounting/projectusage_test.go
Expand Up @@ -515,7 +515,7 @@ func TestProjectUsageCustomLimit(t *testing.T) {
err = projectUsage.AddProjectStorageUsage(ctx, project.ID, expectedLimit.Int64())
require.NoError(t, err)

limit, err := projectUsage.ExceedsUploadLimits(ctx, project.ID, false)
limit, err := projectUsage.ExceedsUploadLimits(ctx, project.ID, 1, 1, false)
require.NoError(t, err)
require.True(t, limit.ExceedsStorage)
require.Equal(t, expectedLimit.Int64(), limit.StorageLimit.Int64())
Expand Down
27 changes: 26 additions & 1 deletion satellite/metabase/copy_object.go
Expand Up @@ -32,6 +32,10 @@ type BeginCopyObjectResult struct {
type BeginCopyObject struct {
Version Version
ObjectLocation

// VerifyLimits holds a callback by which the caller can interrupt the copy
// if it turns out the copy would exceed a limit.
VerifyLimits func(encryptedObjectSize int64, nSegments int64) error
}

// BeginCopyObject collects all data needed to begin object copy procedure.
Expand All @@ -47,10 +51,11 @@ func (db *DB) BeginCopyObject(ctx context.Context, opts BeginCopyObject) (result
}

var segmentCount int64
var encryptedObjectSize int64

err = db.db.QueryRowContext(ctx, `
SELECT
stream_id, encryption, segment_count,
stream_id, encryption, total_encrypted_size, segment_count,
encrypted_metadata_encrypted_key, encrypted_metadata_nonce, encrypted_metadata
FROM objects
WHERE
Expand All @@ -63,6 +68,7 @@ func (db *DB) BeginCopyObject(ctx context.Context, opts BeginCopyObject) (result
Scan(
&result.StreamID,
encryptionParameters{&result.EncryptionParameters},
&encryptedObjectSize,
&segmentCount,
&result.EncryptedMetadataKey, &result.EncryptedMetadataKeyNonce, &result.EncryptedMetadata,
)
Expand All @@ -77,6 +83,13 @@ func (db *DB) BeginCopyObject(ctx context.Context, opts BeginCopyObject) (result
return BeginCopyObjectResult{}, Error.New("object to copy has too many segments (%d). Limit is %d.", segmentCount, CopySegmentLimit)
}

if opts.VerifyLimits != nil {
err = opts.VerifyLimits(encryptedObjectSize, segmentCount)
if err != nil {
return BeginCopyObjectResult{}, err
}
}

err = withRows(db.db.QueryContext(ctx, `
SELECT
position, encrypted_key_nonce, encrypted_key
Expand Down Expand Up @@ -117,6 +130,11 @@ type FinishCopyObject struct {
NewEncryptedMetadataKey []byte

NewSegmentKeys []EncryptedKeyAndNonce

// VerifyLimits holds a callback by which the caller can interrupt the copy
// if it turns out completing the copy would exceed a limit.
// It will be called only once.
VerifyLimits func(encryptedObjectSize int64, nSegments int64) error
}

// Verify verifies metabase.FinishCopyObject data.
Expand Down Expand Up @@ -173,6 +191,13 @@ func (db *DB) FinishCopyObject(ctx context.Context, opts FinishCopyObject) (obje
return err
}

if opts.VerifyLimits != nil {
err := opts.VerifyLimits(sourceObject.TotalEncryptedSize, int64(sourceObject.SegmentCount))
if err != nil {
return err
}
}

if int(sourceObject.SegmentCount) != len(opts.NewSegmentKeys) {
return ErrInvalidRequest.New("wrong number of segments keys received (received %d, need %d)", len(opts.NewSegmentKeys), sourceObject.SegmentCount)
}
Expand Down
5 changes: 5 additions & 0 deletions satellite/metainfo/endpoint.go
Expand Up @@ -265,6 +265,11 @@ func (endpoint *Endpoint) unmarshalSatSegmentID(ctx context.Context, segmentID s

// convertMetabaseErr converts domain errors from metabase to appropriate rpc statuses errors.
func (endpoint *Endpoint) convertMetabaseErr(err error) error {
if rpcstatus.Code(err) != rpcstatus.Unknown {
// it's already RPC error
return err
}

switch {
case storj.ErrObjectNotFound.Has(err):
return rpcstatus.Error(rpcstatus.NotFound, err.Error())
Expand Down
6 changes: 6 additions & 0 deletions satellite/metainfo/endpoint_object.go
Expand Up @@ -1747,6 +1747,9 @@ func (endpoint *Endpoint) BeginCopyObject(ctx context.Context, req *pb.ObjectBeg
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
},
Version: metabase.DefaultVersion,
VerifyLimits: func(encryptedObjectSize int64, nSegments int64) error {
return endpoint.checkUploadLimitsForNewObject(ctx, keyInfo.ProjectID, encryptedObjectSize, nSegments)
},
})
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
Expand Down Expand Up @@ -1896,6 +1899,9 @@ func (endpoint *Endpoint) FinishCopyObject(ctx context.Context, req *pb.ObjectFi
NewEncryptedMetadata: req.NewEncryptedMetadata,
NewEncryptedMetadataKeyNonce: req.NewEncryptedMetadataKeyNonce,
NewEncryptedMetadataKey: req.NewEncryptedMetadataKey,
VerifyLimits: func(encryptedObjectSize int64, nSegments int64) error {
return endpoint.addStorageUsageUpToLimit(ctx, keyInfo.ProjectID, encryptedObjectSize, nSegments)
},
})
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
Expand Down

1 comment on commit b5fc04a

@storjrobot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has been mentioned on Storj Community Forum (official). There might be relevant details there:

https://forum.storj.io/t/release-preparation-v1-61/19333/1

Please sign in to comment.