Skip to content

Commit

Permalink
satellite/metabase: reorganize adapter structure
Browse files Browse the repository at this point in the history
By accident older version of initial adapter implementation was merged. This change is fixing that.

Change-Id: Idff76029b1d29c5d3189277a71e41153dd088f75
  • Loading branch information
elek authored and mniewrzal committed Apr 5, 2024
1 parent 33e4603 commit 8415273
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 121 deletions.
22 changes: 20 additions & 2 deletions satellite/metabase/adapter.go
Expand Up @@ -3,12 +3,30 @@

package metabase

import "context"
import (
"context"

"storj.io/common/tagsql"
)

// Adapter is a low level extension point to use datasource related queries.
// TODO: we may need separated adapter for segments/objects/etc.
type Adapter interface {
BeginObject(context.Context, BeginObjectNextVersion, *Object) error
BeginObjectNextVersion(context.Context, BeginObjectNextVersion, *Object) error
GetObjectLastCommitted(ctx context.Context, opts GetObjectLastCommitted, object *Object) error
TestingBeginObjectExactVersion(ctx context.Context, opts BeginObjectExactVersion, object *Object) error
}

// PostgresAdapter uses Cockroach related SQL queries.
type PostgresAdapter struct {
db tagsql.DB
}

var _ Adapter = &PostgresAdapter{}

// CockroachAdapter uses Cockroach related SQL queries.
type CockroachAdapter struct {
PostgresAdapter
}

var _ Adapter = &CockroachAdapter{}
9 changes: 0 additions & 9 deletions satellite/metabase/adapter_cockroach.go

This file was deleted.

105 changes: 0 additions & 105 deletions satellite/metabase/adapter_postgres.go

This file was deleted.

54 changes: 53 additions & 1 deletion satellite/metabase/commit.go
Expand Up @@ -93,7 +93,7 @@ func (db *DB) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVe
ZombieDeletionDeadline: opts.ZombieDeletionDeadline,
}

err = db.ChooseAdapter(opts.ProjectID).BeginObject(ctx, opts, &object)
err = db.ChooseAdapter(opts.ProjectID).BeginObjectNextVersion(ctx, opts, &object)
if err != nil {
return Object{}, Error.New("unable to insert object: %w", err)
}
Expand All @@ -103,6 +103,34 @@ func (db *DB) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVe
return object, nil
}

// BeginObjectNextVersion implements Adapter.
func (p *PostgresAdapter) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVersion, object *Object) error {
return p.db.QueryRowContext(ctx, `
INSERT INTO objects (
project_id, bucket_name, object_key, version, stream_id,
expires_at, encryption,
zombie_deletion_deadline,
encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key
) VALUES (
$1, $2, $3,
coalesce((
SELECT version + 1
FROM objects
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
ORDER BY version DESC
LIMIT 1
), 1),
$4, $5, $6,
$7,
$8, $9, $10)
RETURNING status, version, created_at
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID,
opts.ExpiresAt, encryptionParameters{&opts.Encryption},
opts.ZombieDeletionDeadline,
opts.EncryptedMetadata, opts.EncryptedMetadataNonce, opts.EncryptedMetadataEncryptedKey,
).Scan(&object.Status, &object.Version, &object.CreatedAt)
}

// BeginObjectExactVersion contains arguments necessary for starting an object upload.
type BeginObjectExactVersion struct {
ObjectStream
Expand Down Expand Up @@ -174,6 +202,30 @@ func (db *DB) TestingBeginObjectExactVersion(ctx context.Context, opts BeginObje
return object, nil
}

// TestingBeginObjectExactVersion implements Adapter.
func (p *PostgresAdapter) TestingBeginObjectExactVersion(ctx context.Context, opts BeginObjectExactVersion, object *Object) error {
return p.db.QueryRowContext(ctx, `
INSERT INTO objects (
project_id, bucket_name, object_key, version, stream_id,
expires_at, encryption,
zombie_deletion_deadline,
encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key
) VALUES (
$1, $2, $3, $4, $5,
$6, $7,
$8,
$9, $10, $11
)
RETURNING status, created_at
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID,
opts.ExpiresAt, encryptionParameters{&opts.Encryption},
opts.ZombieDeletionDeadline,
opts.EncryptedMetadata, opts.EncryptedMetadataNonce, opts.EncryptedMetadataEncryptedKey,
).Scan(
&object.Status, &object.CreatedAt,
)
}

// BeginSegment contains options to verify, whether a new segment upload can be started.
type BeginSegment struct {
ObjectStream
Expand Down
39 changes: 35 additions & 4 deletions satellite/metabase/get.go
Expand Up @@ -138,16 +138,47 @@ func (db *DB) GetObjectLastCommitted(ctx context.Context, opts GetObjectLastComm
object.ObjectKey = opts.ObjectKey

err = db.ChooseAdapter(opts.ProjectID).GetObjectLastCommitted(ctx, opts, &object)
if ErrObjectNotFound.Has(err) {
return Object{}, err
}
if err != nil {
return Object{}, Error.New("unable to query object status: %w", err)
return Object{}, err
}

return object, nil
}

// GetObjectLastCommitted implements Adapter.
func (p *PostgresAdapter) GetObjectLastCommitted(ctx context.Context, opts GetObjectLastCommitted, object *Object) error {
row := p.db.QueryRowContext(ctx, `
SELECT
stream_id, version, status,
created_at, expires_at,
segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption
FROM objects
WHERE
(project_id, bucket_name, object_key) = ($1, $2, $3) AND
status <> `+statusPending+` AND
(expires_at IS NULL OR expires_at > now())
ORDER BY version DESC
LIMIT 1`,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey)

err := row.Scan(
&object.StreamID, &object.Version, &object.Status,
&object.CreatedAt, &object.ExpiresAt,
&object.SegmentCount,
&object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey,
&object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize,
encryptionParameters{&object.Encryption},
)

if errors.Is(err, sql.ErrNoRows) || object.Status.IsDeleteMarker() {
return ErrObjectNotFound.Wrap(Error.Wrap(sql.ErrNoRows))
}
return Error.Wrap(err)
}

// GetSegmentByPosition contains arguments necessary for fetching a segment on specific position.
type GetSegmentByPosition struct {
StreamID uuid.UUID
Expand Down

0 comments on commit 8415273

Please sign in to comment.