Skip to content

Commit

Permalink
satellite/metainfo: stop using project limit cache for uploads/downloads
Browse files Browse the repository at this point in the history
To reduce number of requests to DB we are getting all project limits
values with api key lookup. All limits are retrieved only once and
cached together. Later we can remove project limit cache completely.

Change-Id: Ib2fd1c290e63949885182f24d125229ad4a7537c
  • Loading branch information
mniewrzal committed Jan 17, 2024
1 parent 17af0f0 commit 4ea8316
Show file tree
Hide file tree
Showing 14 changed files with 195 additions and 159 deletions.
1 change: 1 addition & 0 deletions satellite/accounting/projectlimitcache.go
Expand Up @@ -39,6 +39,7 @@ type ProjectLimitConfig struct {

// ProjectLimitCache stores the values for both storage usage limit and bandwidth limit for
// each project ID if they differ from the default limits.
// TODO remove this cache as its not used anymore.
type ProjectLimitCache struct {
projectLimitDB ProjectLimitDB

Expand Down
149 changes: 80 additions & 69 deletions satellite/accounting/projectusage.go
Expand Up @@ -30,23 +30,31 @@ var ErrProjectLimitExceeded = errs.Class("project limit")
type Service struct {
projectAccountingDB ProjectAccounting
liveAccounting Cache
projectLimitCache *ProjectLimitCache
metabaseDB metabase.DB
bandwidthCacheTTL time.Duration
nowFn func() time.Time

defaultMaxStorage memory.Size
defaultMaxBandwidth memory.Size
defaultMaxSegments int64
asOfSystemInterval time.Duration
}

// NewService created new instance of project usage service.
func NewService(projectAccountingDB ProjectAccounting, liveAccounting Cache, limitCache *ProjectLimitCache, metabaseDB metabase.DB, bandwidthCacheTTL, asOfSystemInterval time.Duration) *Service {
func NewService(projectAccountingDB ProjectAccounting, liveAccounting Cache, metabaseDB metabase.DB, bandwidthCacheTTL time.Duration,
defaultMaxStorage, defaultMaxBandwidth memory.Size, defaultMaxSegments int64, asOfSystemInterval time.Duration) *Service {
return &Service{
projectAccountingDB: projectAccountingDB,
liveAccounting: liveAccounting,
projectLimitCache: limitCache,
metabaseDB: metabaseDB,
bandwidthCacheTTL: bandwidthCacheTTL,
nowFn: time.Now,
asOfSystemInterval: asOfSystemInterval,

defaultMaxStorage: defaultMaxStorage,
defaultMaxBandwidth: defaultMaxBandwidth,
defaultMaxSegments: defaultMaxSegments,

asOfSystemInterval: asOfSystemInterval,
nowFn: time.Now,
}
}

Expand All @@ -57,48 +65,33 @@ func NewService(projectAccountingDB ProjectAccounting, liveAccounting Cache, lim
// Among others,it can return one of the following errors returned by
// storj.io/storj/satellite/accounting.Cache except the ErrKeyNotFound, wrapped
// by ErrProjectUsage.
func (usage *Service) ExceedsBandwidthUsage(ctx context.Context, projectID uuid.UUID) (_ bool, limit memory.Size, err error) {
func (usage *Service) ExceedsBandwidthUsage(ctx context.Context, projectID uuid.UUID, limits ProjectLimits) (_ bool, limit memory.Size, err error) {
defer mon.Task()(&ctx)(&err)

var (
group errgroup.Group
bandwidthUsage int64
)
limit = usage.defaultMaxBandwidth
if limits.Bandwidth != nil {
limit = memory.Size(*limits.Bandwidth)
}

group.Go(func() error {
var err error
limit, err = usage.projectLimitCache.GetBandwidthLimit(ctx, projectID)
return err
})
group.Go(func() error {
var err error
// Get the current bandwidth usage from cache.
bandwidthUsage, err := usage.liveAccounting.GetProjectBandwidthUsage(ctx, projectID, usage.nowFn())
if err != nil {
// Verify If the cache key was not found
if ErrKeyNotFound.Has(err) {

// Get the current bandwidth usage from cache.
bandwidthUsage, err = usage.liveAccounting.GetProjectBandwidthUsage(ctx, projectID, usage.nowFn())
if err != nil {
// Verify If the cache key was not found
if ErrKeyNotFound.Has(err) {

// Get current bandwidth value from database.
now := usage.nowFn()
bandwidthUsage, err = usage.GetProjectBandwidth(ctx, projectID, now.Year(), now.Month(), now.Day())
if err != nil {
return err
}

// Create cache key with database value.
_, err = usage.liveAccounting.InsertProjectBandwidthUsage(ctx, projectID, bandwidthUsage, usage.bandwidthCacheTTL, usage.nowFn())
if err != nil {
return err
}
// Get current bandwidth value from database.
now := usage.nowFn()
bandwidthUsage, err = usage.GetProjectBandwidth(ctx, projectID, now.Year(), now.Month(), now.Day())
if err != nil {
return false, 0, ErrProjectUsage.Wrap(err)
}
}
return err
})

err = group.Wait()
if err != nil {
return false, 0, ErrProjectUsage.Wrap(err)
// Create cache key with database value.
_, err = usage.liveAccounting.InsertProjectBandwidthUsage(ctx, projectID, bandwidthUsage, usage.bandwidthCacheTTL, usage.nowFn())
if err != nil {
return false, 0, ErrProjectUsage.Wrap(err)
}
}
}

// Verify the bandwidth usage cache.
Expand All @@ -120,22 +113,21 @@ type UploadLimit struct {
// ExceedsUploadLimits returns combined checks for storage and segment limits.
// Supply nonzero headroom parameters to check if there is room for a new object.
func (usage *Service) ExceedsUploadLimits(
ctx context.Context, projectID uuid.UUID, storageSizeHeadroom int64, segmentCountHeadroom int64) (limit UploadLimit, err error) {
ctx context.Context, projectID uuid.UUID, storageSizeHeadroom int64, segmentCountHeadroom int64, limits ProjectLimits) (limit UploadLimit, err error) {
defer mon.Task()(&ctx)(&err)

var group errgroup.Group
var segmentUsage, storageUsage int64

group.Go(func() error {
var err error
limits, err := usage.projectLimitCache.GetLimits(ctx, projectID)
if err != nil {
return err
}
limit.SegmentsLimit = usage.defaultMaxSegments
if limits.Segments != nil {
limit.SegmentsLimit = *limits.Segments
}

limit.StorageLimit = usage.defaultMaxStorage
if limits.Usage != nil {
limit.StorageLimit = memory.Size(*limits.Usage)
return nil
})
}

var group errgroup.Group
var segmentUsage, storageUsage int64

group.Go(func() error {
var err error
Expand Down Expand Up @@ -166,20 +158,25 @@ func (usage *Service) ExceedsUploadLimits(

// AddProjectUsageUpToLimit increases segment and storage usage up to the projects limit.
// If the limit is exceeded, neither usage is increased and accounting.ErrProjectLimitExceeded is returned.
func (usage *Service) AddProjectUsageUpToLimit(ctx context.Context, projectID uuid.UUID, storage int64, segments int64) (err error) {
func (usage *Service) AddProjectUsageUpToLimit(ctx context.Context, projectID uuid.UUID, storage int64, segments int64, limits ProjectLimits) (err error) {
defer mon.Task()(&ctx, projectID)(&err)

limits, err := usage.projectLimitCache.GetLimits(ctx, projectID)
if err != nil {
return err
segmentsLimit := usage.defaultMaxSegments
if limits.Segments != nil {
segmentsLimit = *limits.Segments
}

err = usage.liveAccounting.AddProjectStorageUsageUpToLimit(ctx, projectID, storage, *limits.Usage)
storageLimit := usage.defaultMaxStorage
if limits.Usage != nil {
storageLimit = memory.Size(*limits.Usage)
}

err = usage.liveAccounting.AddProjectStorageUsageUpToLimit(ctx, projectID, storage, storageLimit.Int64())
if err != nil {
return err
}

err = usage.liveAccounting.AddProjectSegmentUsageUpToLimit(ctx, projectID, segments, *limits.Segments)
err = usage.liveAccounting.AddProjectSegmentUsageUpToLimit(ctx, projectID, segments, segmentsLimit)
if ErrProjectLimitExceeded.Has(err) {
// roll back storage increase
err = usage.liveAccounting.AddProjectStorageUsage(ctx, projectID, -1*storage)
Expand Down Expand Up @@ -252,32 +249,46 @@ func (usage *Service) GetProjectBandwidth(ctx context.Context, projectID uuid.UU
// GetProjectStorageLimit returns current project storage limit.
func (usage *Service) GetProjectStorageLimit(ctx context.Context, projectID uuid.UUID) (_ memory.Size, err error) {
defer mon.Task()(&ctx, projectID)(&err)
limits, err := usage.projectLimitCache.GetLimits(ctx, projectID)
storageLimit, err := usage.projectAccountingDB.GetProjectStorageLimit(ctx, projectID)
if err != nil {
return 0, ErrProjectUsage.Wrap(err)
}

return memory.Size(*limits.Usage), nil
if storageLimit == nil {
return usage.defaultMaxStorage, nil
}

return memory.Size(*storageLimit), nil
}

// GetProjectBandwidthLimit returns current project bandwidth limit.
func (usage *Service) GetProjectBandwidthLimit(ctx context.Context, projectID uuid.UUID) (_ memory.Size, err error) {
defer mon.Task()(&ctx, projectID)(&err)
return usage.projectLimitCache.GetBandwidthLimit(ctx, projectID)
bandwidthLimit, err := usage.projectAccountingDB.GetProjectBandwidthLimit(ctx, projectID)
if err != nil {
return 0, ErrProjectUsage.Wrap(err)
}

if bandwidthLimit == nil {
return usage.defaultMaxBandwidth, nil
}

return memory.Size(*bandwidthLimit), nil
}

// GetProjectSegmentLimit returns current project segment limit.
func (usage *Service) GetProjectSegmentLimit(ctx context.Context, projectID uuid.UUID) (_ memory.Size, err error) {
defer mon.Task()(&ctx, projectID)(&err)
return usage.projectLimitCache.GetSegmentLimit(ctx, projectID)
}
segmentLimit, err := usage.projectAccountingDB.GetProjectSegmentLimit(ctx, projectID)
if err != nil {
return 0, ErrProjectUsage.Wrap(err)
}

// UpdateProjectLimits sets new value for project's bandwidth and storage limit.
// TODO remove because it's not used.
func (usage *Service) UpdateProjectLimits(ctx context.Context, projectID uuid.UUID, limit memory.Size) (err error) {
defer mon.Task()(&ctx, projectID)(&err)
if segmentLimit == nil {
return memory.Size(usage.defaultMaxSegments), nil
}

return ErrProjectUsage.Wrap(usage.projectAccountingDB.UpdateProjectUsageLimit(ctx, projectID, limit))
return memory.Size(*segmentLimit), nil
}

// GetProjectBandwidthUsage get the current bandwidth usage from cache.
Expand Down
16 changes: 9 additions & 7 deletions satellite/accounting/projectusage_test.go
Expand Up @@ -157,7 +157,7 @@ func TestProjectUsageBandwidth(t *testing.T) {
projectUsage.SetNow(func() time.Time {
return now
})
actualExceeded, _, err := projectUsage.ExceedsBandwidthUsage(ctx, bucket.ProjectID)
actualExceeded, _, err := projectUsage.ExceedsBandwidthUsage(ctx, bucket.ProjectID, accounting.ProjectLimits{})
require.NoError(t, err)
require.Equal(t, testCase.expectedExceeded, actualExceeded)

Expand Down Expand Up @@ -486,20 +486,22 @@ func TestProjectUsageCustomLimit(t *testing.T) {

project := projects[0]
// set custom usage limit for project
expectedLimit := memory.Size(memory.GiB.Int64() * 10)
err = acctDB.UpdateProjectUsageLimit(ctx, project.ID, expectedLimit)
expectedLimit := memory.GiB.Int64() * 10
err = acctDB.UpdateProjectUsageLimit(ctx, project.ID, memory.Size(expectedLimit))
require.NoError(t, err)

projectUsage := planet.Satellites[0].Accounting.ProjectUsage

// Setup: add data to live accounting to exceed new limit
err = projectUsage.AddProjectStorageUsage(ctx, project.ID, expectedLimit.Int64())
err = projectUsage.AddProjectStorageUsage(ctx, project.ID, expectedLimit)
require.NoError(t, err)

limit, err := projectUsage.ExceedsUploadLimits(ctx, project.ID, 1, 1)
limit, err := projectUsage.ExceedsUploadLimits(ctx, project.ID, 1, 1, accounting.ProjectLimits{
Usage: &expectedLimit,
})
require.NoError(t, err)
require.True(t, limit.ExceedsStorage)
require.Equal(t, expectedLimit.Int64(), limit.StorageLimit.Int64())
require.Equal(t, expectedLimit, limit.StorageLimit.Int64())

// Setup: create some bytes for the uplink to upload
expectedData := testrand.Bytes(50 * memory.KiB)
Expand Down Expand Up @@ -890,7 +892,7 @@ func TestProjectUsageBandwidthResetAfter3days(t *testing.T) {
return tt.now
})

actualExceeded, _, err := projectUsage.ExceedsBandwidthUsage(ctx, bucket.ProjectID)
actualExceeded, _, err := projectUsage.ExceedsBandwidthUsage(ctx, bucket.ProjectID, accounting.ProjectLimits{})
require.NoError(t, err)
require.Equal(t, tt.expectedExceeds, actualExceeded, tt.description)
}
Expand Down
4 changes: 3 additions & 1 deletion satellite/admin.go
Expand Up @@ -240,9 +240,11 @@ func NewAdmin(log *zap.Logger, full *identity.FullIdentity, db DB, metabaseDB *m
peer.Accounting.Service = accounting.NewService(
peer.DB.ProjectAccounting(),
peer.LiveAccounting.Cache,
peer.ProjectLimits.Cache,
*metabaseDB,
config.LiveAccounting.BandwidthCacheTTL,
config.Console.Config.UsageLimits.Storage.Free,
config.Console.Config.UsageLimits.Bandwidth.Free,
config.Console.Config.UsageLimits.Segment.Free,
config.LiveAccounting.AsOfSystemInterval,
)
}
Expand Down
4 changes: 3 additions & 1 deletion satellite/api.go
Expand Up @@ -352,9 +352,11 @@ func NewAPI(log *zap.Logger, full *identity.FullIdentity, db DB,
peer.Accounting.ProjectUsage = accounting.NewService(
peer.DB.ProjectAccounting(),
peer.LiveAccounting.Cache,
peer.ProjectLimits.Cache,
*metabaseDB,
config.LiveAccounting.BandwidthCacheTTL,
config.Console.Config.UsageLimits.Storage.Free,
config.Console.Config.UsageLimits.Bandwidth.Free,
config.Console.Config.UsageLimits.Segment.Free,
config.LiveAccounting.AsOfSystemInterval,
)
}
Expand Down
9 changes: 7 additions & 2 deletions satellite/console/apikeys.go
Expand Up @@ -62,8 +62,13 @@ type APIKeyInfo struct {
Secret []byte `json:"-"`
CreatedAt time.Time `json:"createdAt"`

ProjectRateLimit *int `json:"-"`
ProjectBurstLimit *int `json:"-"`
// TODO move this closer to metainfo
ProjectRateLimit *int
ProjectBurstLimit *int

ProjectStorageLimit *int64
ProjectSegmentsLimit *int64
ProjectBandwidthLimit *int64
}

// APIKeyCursor holds info for api keys cursor pagination.
Expand Down
24 changes: 5 additions & 19 deletions satellite/metainfo/endpoint_object.go
Expand Up @@ -96,7 +96,7 @@ func (endpoint *Endpoint) BeginObject(ctx context.Context, req *pb.ObjectBeginRe
return nil, rpcstatus.Errorf(rpcstatus.InvalidArgument, "key length is too big, got %v, maximum allowed is %v", objectKeyLength, endpoint.config.MaxEncryptedObjectKeyLength)
}

err = endpoint.checkUploadLimits(ctx, keyInfo.ProjectID)
err = endpoint.checkUploadLimits(ctx, keyInfo)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -488,22 +488,8 @@ func (endpoint *Endpoint) DownloadObject(ctx context.Context, req *pb.ObjectDown
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, err.Error())
}

if exceeded, limit, err := endpoint.projectUsage.ExceedsBandwidthUsage(ctx, keyInfo.ProjectID); err != nil {
if errs2.IsCanceled(err) {
return nil, rpcstatus.Wrap(rpcstatus.Canceled, err)
}

endpoint.log.Error(
"Retrieving project bandwidth total failed; bandwidth limit won't be enforced",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Error(err),
)
} else if exceeded {
endpoint.log.Warn("Monthly bandwidth limit exceeded",
zap.Stringer("Limit", limit),
zap.Stringer("Project ID", keyInfo.ProjectID),
)
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
if err := endpoint.checkDownloadLimits(ctx, keyInfo); err != nil {
return nil, err
}

var object metabase.Object
Expand Down Expand Up @@ -2043,7 +2029,7 @@ func (endpoint *Endpoint) BeginCopyObject(ctx context.Context, req *pb.ObjectBeg
ObjectKey: metabase.ObjectKey(req.EncryptedObjectKey),
},
VerifyLimits: func(encryptedObjectSize int64, nSegments int64) error {
return endpoint.checkUploadLimitsForNewObject(ctx, keyInfo.ProjectID, encryptedObjectSize, nSegments)
return endpoint.checkUploadLimitsForNewObject(ctx, keyInfo, encryptedObjectSize, nSegments)
},
})
if err != nil {
Expand Down Expand Up @@ -2166,7 +2152,7 @@ func (endpoint *Endpoint) FinishCopyObject(ctx context.Context, req *pb.ObjectFi
NewDisallowDelete: false,

VerifyLimits: func(encryptedObjectSize int64, nSegments int64) error {
return endpoint.addStorageUsageUpToLimit(ctx, keyInfo.ProjectID, encryptedObjectSize, nSegments)
return endpoint.addStorageUsageUpToLimit(ctx, keyInfo, encryptedObjectSize, nSegments)
},
})
if err != nil {
Expand Down

0 comments on commit 4ea8316

Please sign in to comment.