Skip to content

Commit

Permalink
satellite/metabase: add some spanner automatic columns encoding/decoding
Browse files Browse the repository at this point in the history
Change-Id: I26b2b13addd994ec774291bb9bd1b94806f59003
  • Loading branch information
mniewrzal authored and Storj Robot committed Apr 22, 2024
1 parent 3d0ddd4 commit a817f8c
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 49 deletions.
20 changes: 19 additions & 1 deletion satellite/metabase/aliaspiece.go
Expand Up @@ -5,6 +5,7 @@ package metabase

import (
"database/sql/driver"
"encoding/base64"
"encoding/binary"
)

Expand Down Expand Up @@ -157,7 +158,7 @@ func (aliases *AliasPieces) SetBytes(data []byte) error {
}

// Scan implements the database/sql Scanner interface.
func (aliases *AliasPieces) Scan(src interface{}) error {
func (aliases *AliasPieces) Scan(src any) error {
if src == nil {
*aliases = nil
return nil
Expand All @@ -176,6 +177,23 @@ func (aliases AliasPieces) Value() (driver.Value, error) {
return aliases.Bytes()
}

// DecodeSpanner implements spanner.Decoder.
func (aliases *AliasPieces) DecodeSpanner(val any) (err error) {
// TODO(spanner) why spanner returns BYTES as base64
if v, ok := val.(string); ok {
val, err = base64.StdEncoding.DecodeString(v)
if err != nil {
return err
}
}
return aliases.Scan(val)
}

// EncodeSpanner implements spanner.Encoder.
func (aliases AliasPieces) EncodeSpanner() (any, error) {
return aliases.Value()
}

// EqualAliasPieces compares whether xs and ys are equal.
func EqualAliasPieces(xs, ys AliasPieces) bool {
if len(xs) != len(ys) {
Expand Down
20 changes: 19 additions & 1 deletion satellite/metabase/encoding.go
Expand Up @@ -7,6 +7,7 @@ import (
"database/sql"
"database/sql/driver"
"encoding/binary"
"strconv"

"github.com/jackc/pgtype"

Expand Down Expand Up @@ -119,7 +120,7 @@ func (params redundancyScheme) Value() (driver.Value, error) {
return int64(binary.LittleEndian.Uint64(bytes[:])), nil
}

func (params redundancyScheme) Scan(value interface{}) error {
func (params redundancyScheme) Scan(value any) error {
switch value := value.(type) {
case int64:
var bytes [8]byte
Expand All @@ -141,6 +142,23 @@ func (params redundancyScheme) Scan(value interface{}) error {
}
}

// DecodeSpanner implements spanner.Decoder.
func (params redundancyScheme) DecodeSpanner(val any) (err error) {
// TODO(spanner) why spanner provide sometimes string
if v, ok := val.(string); ok {
val, err = strconv.ParseInt(v, 10, 64)
if err != nil {
return Error.New("unable to scan %T into RedundancyScheme: %v", val, err)
}
}
return params.Scan(val)
}

// EncodeSpanner implements spanner.Encoder.
func (params redundancyScheme) EncodeSpanner() (any, error) {
return params.Value()
}

// Value implements sql/driver.Valuer interface.
func (pieces Pieces) Value() (driver.Value, error) {
if len(pieces) == 0 {
Expand Down
22 changes: 6 additions & 16 deletions satellite/metabase/loop.go
Expand Up @@ -554,14 +554,15 @@ func (it *spannerLoopSegmentIterator) scanItem(ctx context.Context, item *LoopSe
var position int64
var createdAt time.Time
var repairedAt, expiresAt spanner.NullTime
var encryptedSize, plainOffset, plainSize, redundancy, placement int64
var streamID, rootPieceID, remoteAliasPieces []byte
var encryptedSize, plainOffset, plainSize, placement int64
var streamID, rootPieceID []byte
var aliasPieces AliasPieces
if err := it.curRow.Columns(&streamID, &position,
&createdAt, &repairedAt, &expiresAt,
&rootPieceID,
&encryptedSize, &plainOffset, &plainSize,
&redundancy,
&remoteAliasPieces,
redundancyScheme{&item.Redundancy},
&aliasPieces,
&placement,
); err != nil {
return Error.New("failed to scan segment: %w", err)
Expand All @@ -586,21 +587,10 @@ func (it *spannerLoopSegmentIterator) scanItem(ctx context.Context, item *LoopSe
item.EncryptedSize = int32(encryptedSize)
item.PlainOffset = plainOffset
item.PlainSize = int32(plainSize)
rs := redundancyScheme{RedundancyScheme: &storj.RedundancyScheme{}}
if err := rs.Scan(redundancy); err != nil {
return Error.New("failed to scan segment: %w", err)
}
item.Redundancy = *rs.RedundancyScheme

aliasPieces := AliasPieces{}
err = aliasPieces.SetBytes(remoteAliasPieces)
if err != nil {
return Error.New("failed to scan segment: %w", err)
}
item.AliasPieces = aliasPieces

item.Placement = storj.PlacementConstraint(placement)
item.Pieces, err = it.aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
item.Pieces, err = it.aliasCache.ConvertAliasesToPieces(ctx, item.AliasPieces)
if err != nil {
return Error.New("failed to scan segment: %w", err)
}
Expand Down
42 changes: 12 additions & 30 deletions satellite/metabase/raw.go
Expand Up @@ -399,23 +399,27 @@ func (s *SpannerAdapter) TestingGetAllSegments(ctx context.Context, aliasCache *
return nil, Error.Wrap(err)
}

var segment RawSegment

var position int64
var createdAt time.Time
var repairedAt, expiresAt spanner.NullTime
var encryptedSize, plainOffset, plainSize, redundancy, placement int64
var streamID, rootPieceID, encryptedKeyNonce, encryptedKey, encryptedETag, inlineData, remoteAliasPieces []byte
if err := row.Columns(&streamID, &position,
var encryptedSize, plainOffset, plainSize, placement int64
var streamID, rootPieceID, encryptedKeyNonce, encryptedKey, encryptedETag, inlineData []byte
var aliasPieces AliasPieces
if err := row.Columns(
&streamID, &position,
&createdAt, &repairedAt, &expiresAt,
&rootPieceID, &encryptedKeyNonce, &encryptedKey,
&encryptedSize, &plainOffset, &plainSize,
&encryptedETag,
&redundancy,
&inlineData, &remoteAliasPieces,
redundancyScheme{&segment.Redundancy},
&inlineData, &aliasPieces,
&placement,
); err != nil {
return nil, Error.Wrap(err)
}
var segment RawSegment

segment.StreamID, err = uuid.FromBytes(streamID)
if err != nil {
return nil, Error.Wrap(err)
Expand All @@ -438,19 +442,7 @@ func (s *SpannerAdapter) TestingGetAllSegments(ctx context.Context, aliasCache *
segment.PlainOffset = plainOffset
segment.PlainSize = int32(plainSize)
segment.EncryptedETag = encryptedETag
rs := redundancyScheme{RedundancyScheme: &storj.RedundancyScheme{}}
err = rs.Scan(redundancy)
if err != nil {
return nil, Error.Wrap(err)
}
segment.Redundancy = *rs.RedundancyScheme
segment.InlineData = inlineData

aliasPieces := AliasPieces{}
err = aliasPieces.SetBytes(remoteAliasPieces)
if err != nil {
return nil, Error.Wrap(err)
}
segment.Placement = storj.PlacementConstraint(placement)

segment.Pieces, err = aliasCache.ConvertAliasesToPieces(ctx, aliasPieces)
Expand Down Expand Up @@ -599,16 +591,6 @@ func (s *SpannerAdapter) TestingBatchInsertSegments(ctx context.Context, aliasCa
if err != nil {
return Error.Wrap(err)
}
aliasPiecesBytes, err := aliasPieces.Bytes()
if err != nil {
return Error.Wrap(err)
}
redundancyValue, err := redundancyScheme{&segment.Redundancy}.Value()
if err != nil {
return Error.Wrap(err)
}

redundancy := redundancyValue.(int64)

// TODO(spanner) verify if casting is good
vals := append([]interface{}{},
Expand All @@ -628,9 +610,9 @@ func (s *SpannerAdapter) TestingBatchInsertSegments(ctx context.Context, aliasCa
int64(segment.PlainSize),
segment.PlainOffset,

redundancy,
redundancyScheme{&segment.Redundancy},
segment.InlineData,
aliasPiecesBytes,
aliasPieces,
int64(segment.Placement),
)

Expand Down
2 changes: 1 addition & 1 deletion satellite/metabase/raw_test.go
Expand Up @@ -68,5 +68,5 @@ func TestTestingBatchInsertSegments(t *testing.T) {

require.Zero(t, cmp.Diff(rawSegments, metabasetest.SegmentsToRaw(insertedSegments),
cmpopts.EquateApproxTime(1*time.Second)))
})
}, metabasetest.WithSpanner())
}

0 comments on commit a817f8c

Please sign in to comment.