Skip to content

Commit

Permalink
satellite/metabase: GetSegmentByPosition for copied segments
Browse files Browse the repository at this point in the history
Part of server-side copy.

For getting a copied segment GetSegmentByPosition needs to retrieve inline_data or remote_alias_pieces and other information from the original segment.

Fixes #4477

Change-Id: I283cbc546df6e1e2fa34a803c7ef280ecd0f65d7
  • Loading branch information
Fadila82 authored and mniewrzal committed Feb 22, 2022
1 parent ec2bd50 commit b11d144
Show file tree
Hide file tree
Showing 2 changed files with 567 additions and 3 deletions.
42 changes: 39 additions & 3 deletions satellite/metabase/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ func (s Segment) Inline() bool {
return s.Redundancy.IsZero() && len(s.Pieces) == 0
}

// PiecesInAncestorSegment returns true if remote alias pieces are to be found in an ancestor segment.
func (s Segment) PiecesInAncestorSegment() bool {
return s.EncryptedSize != 0 && len(s.InlineData) == 0 && len(s.Pieces) == 0
}

// Expired checks if segment is expired relative to now.
func (s Segment) Expired(now time.Time) bool {
return s.ExpiresAt != nil && s.ExpiresAt.Before(now)
Expand Down Expand Up @@ -166,9 +171,40 @@ func (db *DB) GetSegmentByPosition(ctx context.Context, opts GetSegmentByPositio
return Segment{}, Error.New("unable to query segment: %w", err)
}

segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Segment{}, Error.New("unable to convert aliases to pieces: %w", err)
if len(aliasPieces) > 0 {
segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Segment{}, Error.New("unable to convert aliases to pieces: %w", err)
}
}

if segment.PiecesInAncestorSegment() {
// TODO check performance
err = db.db.QueryRowContext(ctx, `
SELECT
root_piece_id,
repaired_at,
remote_alias_pieces
FROM segments
WHERE
stream_id IN (SELECT ancestor_stream_id FROM segment_copies WHERE stream_id = $1)
AND position = $2
`, opts.StreamID, opts.Position.Encode()).Scan(
&segment.RootPieceID,
&segment.RepairedAt,
&aliasPieces,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return Segment{}, ErrSegmentNotFound.New("segment missing")
}
return Segment{}, Error.New("unable to query segment: %w", err)
}

segment.Pieces, err = db.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
if err != nil {
return Segment{}, Error.New("unable to convert aliases to pieces: %w", err)
}
}

segment.StreamID = opts.StreamID
Expand Down

0 comments on commit b11d144

Please sign in to comment.