Skip to content

Commit

Permalink
satellite/metainfo: reduce database hits for segment creation
Browse files Browse the repository at this point in the history
Change-Id: I48a748b48daefa95f1dfbf6a0d75e65a568ee36a
  • Loading branch information
jtolio authored and Storj Robot committed Dec 1, 2023
1 parent 5d492a9 commit 591971b
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 14 deletions.
27 changes: 16 additions & 11 deletions satellite/metabase/commit.go
Expand Up @@ -224,6 +224,8 @@ type BeginSegment struct {
RootPieceID storj.PieceID

Pieces Pieces

ObjectExistsChecked bool
}

// BeginSegment verifies, whether a new segment upload can be started.
Expand All @@ -242,24 +244,27 @@ func (db *DB) BeginSegment(ctx context.Context, opts BeginSegment) (err error) {
return ErrInvalidRequest.New("RootPieceID missing")
}

// NOTE: this isn't strictly necessary, since we can also fail this in CommitSegment.
// however, we should prevent creating segements for non-partial objects.
if !opts.ObjectExistsChecked {
// NOTE: Find a way to safely remove this. This isn't strictly necessary,
// since we can also fail this in CommitSegment.
// We should prevent creating segements for non-partial objects.

// Verify that object exists and is partial.
var exists bool
err = db.db.QueryRowContext(ctx, `
// Verify that object exists and is partial.
var exists bool
err = db.db.QueryRowContext(ctx, `
SELECT EXISTS (
SELECT 1
FROM objects
WHERE (project_id, bucket_name, object_key, version, stream_id) = ($1, $2, $3, $4, $5) AND
status = `+statusPending+`
)`,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID).Scan(&exists)
if err != nil {
return Error.New("unable to query object status: %w", err)
}
if !exists {
return ErrPendingObjectMissing.New("")
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID).Scan(&exists)
if err != nil {
return Error.New("unable to query object status: %w", err)
}
if !exists {
return ErrPendingObjectMissing.New("")
}
}

mon.Meter("segment_begin").Mark(1)
Expand Down
4 changes: 3 additions & 1 deletion satellite/metainfo/batch.go
Expand Up @@ -226,11 +226,13 @@ func (endpoint *Endpoint) Batch(ctx context.Context, req *pb.BatchRequest) (resp
case *pb.BatchRequestItem_SegmentBegin:
singleRequest.SegmentBegin.Header = req.Header

justCreatedObject := false
if singleRequest.SegmentBegin.StreamId.IsZero() && !lastStreamID.IsZero() {
singleRequest.SegmentBegin.StreamId = lastStreamID
justCreatedObject = true
}

response, err := endpoint.BeginSegment(ctx, singleRequest.SegmentBegin)
response, err := endpoint.beginSegment(ctx, singleRequest.SegmentBegin, justCreatedObject)
if err != nil {
return resp, err
}
Expand Down
10 changes: 8 additions & 2 deletions satellite/metainfo/endpoint_segment.go
Expand Up @@ -31,6 +31,11 @@ func calculateSpaceUsed(segmentSize int64, numberOfPieces int, rs storj.Redundan
// BeginSegment begins segment uploading.
func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBeginRequest) (resp *pb.SegmentBeginResponse, err error) {
defer mon.Task()(&ctx)(&err)
return endpoint.beginSegment(ctx, req, false)
}

func (endpoint *Endpoint) beginSegment(ctx context.Context, req *pb.SegmentBeginRequest, objectJustCreated bool) (resp *pb.SegmentBeginResponse, err error) {
defer mon.Task()(&ctx)(&err)

endpoint.versionCollector.collect(req.Header.UserAgent, mon.Func().ShortName())

Expand Down Expand Up @@ -121,8 +126,9 @@ func (endpoint *Endpoint) BeginSegment(ctx context.Context, req *pb.SegmentBegin
Part: uint32(req.Position.PartNumber),
Index: uint32(req.Position.Index),
},
RootPieceID: rootPieceID,
Pieces: pieces,
RootPieceID: rootPieceID,
Pieces: pieces,
ObjectExistsChecked: objectJustCreated,
})
if err != nil {
return nil, endpoint.convertMetabaseErr(err)
Expand Down

0 comments on commit 591971b

Please sign in to comment.