Skip to content

Commit

Permalink
satellite/metainfo: Don't response errors when Redis down
Browse files Browse the repository at this point in the history
For being able to have resilient multi-region satellites we cannot stop
processing uploads/download client request when Redis isn't responding
properly.

These changes avoid to stop the processing of the client requests when
we cannot check if the client exceeds its storage or bandwidth limits
and we cannot update its used storage/bandwidth limits because Redis is
not responding successfully or the satellite database returns an error.

Change-Id: Ia7f12c07fc9ffdfad0e7ff052ff3fd81eca0f0e3
  • Loading branch information
ifraixedes committed Jan 13, 2021
1 parent a73c59b commit a4d06b9
Showing 1 changed file with 49 additions and 45 deletions.
94 changes: 49 additions & 45 deletions satellite/metainfo/metainfo.go
Expand Up @@ -1170,16 +1170,8 @@ func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBegin
return nil, rpcstatus.Error(rpcstatus.InvalidArgument, "segment index must be greater then 0")
}

exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID)
if err != nil {
endpoint.log.Error("Retrieving project storage totals failed.", zap.Error(err))
}
if exceeded {
endpoint.log.Error("Monthly storage limit exceeded.",
zap.Stringer("Limit", limit),
zap.Stringer("Project ID", keyInfo.ProjectID),
)
return nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
if err := endpoint.checkExceedsStorageUsage(ctx, keyInfo.ProjectID); err != nil {
return nil, err
}

redundancy, err := eestream.NewRedundancyStrategyFromProto(streamID.Redundancy)
Expand Down Expand Up @@ -1311,16 +1303,8 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
return nil, nil, err
}

exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if exceeded {
endpoint.log.Error("The project limit of storage and bandwidth has been exceeded",
zap.Int64("limit", limit.Int64()),
zap.Stringer("Project ID", keyInfo.ProjectID),
)
return nil, nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
if err := endpoint.checkExceedsStorageUsage(ctx, keyInfo.ProjectID); err != nil {
return nil, nil, err
}

// clear hashes so we don't store them
Expand All @@ -1346,12 +1330,13 @@ func (endpoint *Endpoint) commitSegment(ctx context.Context, req *pb.SegmentComm
}

if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, segmentSize); err != nil {
endpoint.log.Error("Could not track new storage usage by project",
// log it and continue. it's most likely our own fault that we couldn't
// track it, and the only thing that will be affected is our per-project
// storage limits.
endpoint.log.Error("Could not track new project's storage usage",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Error(err),
)
// but continue. it's most likely our own fault that we couldn't track it, and the only thing
// that will be affected is our per-project bandwidth and storage limits.
}

if savePointer {
Expand Down Expand Up @@ -1407,22 +1392,18 @@ func (endpoint *Endpoint) makeInlineSegment(ctx context.Context, req *pb.Segment
return nil, nil, rpcstatus.Error(rpcstatus.InvalidArgument, fmt.Sprintf("inline segment size cannot be larger than %s", endpoint.config.MaxInlineSegmentSize))
}

exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, keyInfo.ProjectID)
if err != nil {
return nil, nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
}
if exceeded {
endpoint.log.Error("Monthly storage limit exceeded.",
zap.Stringer("Limit", limit),
zap.Stringer("Project ID", keyInfo.ProjectID),
)
return nil, nil, rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
if err := endpoint.checkExceedsStorageUsage(ctx, keyInfo.ProjectID); err != nil {
return nil, nil, err
}

if err := endpoint.projectUsage.AddProjectStorageUsage(ctx, keyInfo.ProjectID, inlineUsed); err != nil {
endpoint.log.Error("Could not track new storage usage.", zap.Stringer("Project ID", keyInfo.ProjectID), zap.Error(err))
// but continue. it's most likely our own fault that we couldn't track it, and the only thing
// that will be affected is our per-project bandwidth and storage limits.
// log it and continue. it's most likely our own fault that we couldn't
// track it, and the only thing that will be affected is our per-project
// bandwidth and storage limits.
endpoint.log.Error("Could not track new project's storage usage",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Error(err),
)
}

metadata, err := pb.Marshal(&pb.SegmentMeta{
Expand Down Expand Up @@ -1502,12 +1483,10 @@ func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDo

bucket := metabase.BucketLocation{ProjectID: keyInfo.ProjectID, BucketName: string(streamID.Bucket)}

exceeded, limit, err := endpoint.projectUsage.ExceedsBandwidthUsage(ctx, keyInfo.ProjectID)
if err != nil {
endpoint.log.Error("Retrieving project bandwidth total failed.", zap.Error(err))
}
if exceeded {
endpoint.log.Error("Monthly bandwidth limit exceeded.",
if exceeded, limit, err := endpoint.projectUsage.ExceedsBandwidthUsage(ctx, keyInfo.ProjectID); err != nil {
endpoint.log.Error("Retrieving project bandwidth total failed; bandwidth limit won't be enforced", zap.Error(err))
} else if exceeded {
endpoint.log.Error("Monthly bandwidth limit exceeded",
zap.Stringer("Limit", limit),
zap.Stringer("Project ID", keyInfo.ProjectID),
)
Expand All @@ -1520,9 +1499,14 @@ func (endpoint *Endpoint) DownloadSegment(ctx context.Context, req *pb.SegmentDo
}

// Update the current bandwidth cache value incrementing the SegmentSize.
err = endpoint.projectUsage.UpdateProjectBandwidthUsage(ctx, keyInfo.ProjectID, pointer.SegmentSize)
if err != nil {
return nil, rpcstatus.Error(rpcstatus.Internal, err.Error())
if err = endpoint.projectUsage.UpdateProjectBandwidthUsage(ctx, keyInfo.ProjectID, pointer.SegmentSize); err != nil {
// log it and continue. it's most likely our own fault that we couldn't
// track it, and the only thing that will be affected is our per-project
// bandwidth limits.
endpoint.log.Error("Could not track the new project's bandwidth usage",
zap.Stringer("Project ID", keyInfo.ProjectID),
zap.Error(err),
)
}

segmentID, err := endpoint.packSegmentID(ctx, &internalpb.SegmentID{})
Expand Down Expand Up @@ -1837,6 +1821,26 @@ func (endpoint *Endpoint) RevokeAPIKey(ctx context.Context, req *pb.RevokeAPIKey
return &pb.RevokeAPIKeyResponse{}, nil
}

func (endpoint *Endpoint) checkExceedsStorageUsage(ctx context.Context, projectID uuid.UUID) (err error) {
defer mon.Task()(&ctx)(&err)

exceeded, limit, err := endpoint.projectUsage.ExceedsStorageUsage(ctx, projectID)
if err != nil {
endpoint.log.Error(
"Retrieving project storage totals failed; storage usage limit won't be enforced",
zap.Error(err),
)
} else if exceeded {
endpoint.log.Error("Monthly storage limit exceeded",
zap.Stringer("Limit", limit),
zap.Stringer("Project ID", projectID),
)
return rpcstatus.Error(rpcstatus.ResourceExhausted, "Exceeded Usage Limit")
}

return nil
}

// CreatePath creates a segment key.
func CreatePath(ctx context.Context, projectID uuid.UUID, segmentIndex int64, bucket, path []byte) (_ metabase.SegmentLocation, err error) {
// TODO rename to CreateLocation
Expand Down

0 comments on commit a4d06b9

Please sign in to comment.