Skip to content

Commit

Permalink
satellite/metabase: introduce a new adapter interface for datasource …
Browse files Browse the repository at this point in the history
…specific SQL queries

Change-Id: Ice6e28e516cf188c0b76d6c937a0e5c0f5357906
  • Loading branch information
elek authored and mniewrzal committed Apr 3, 2024
1 parent 96317db commit 816a386
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 72 deletions.
14 changes: 14 additions & 0 deletions satellite/metabase/adapter.go
@@ -0,0 +1,14 @@
// Copyright (C) 2024 Storj Labs, Inc.
// See LICENSE for copying information.

package metabase

import "context"

// Adapter is a low level extension point to use datasource related queries.
// TODO: we may need separated adapter for segments/objects/etc.
type Adapter interface {
BeginObject(context.Context, BeginObjectNextVersion, *Object) error
GetObjectLastCommitted(ctx context.Context, opts GetObjectLastCommitted, object *Object) error
TestingBeginObjectExactVersion(ctx context.Context, opts BeginObjectExactVersion, object *Object) error
}
9 changes: 9 additions & 0 deletions satellite/metabase/adapter_cockroach.go
@@ -0,0 +1,9 @@
// Copyright (C) 2024 Storj Labs, Inc.
// See LICENSE for copying information.

package metabase

// CockroachAdapter uses Cockroach related SQL queries.
type CockroachAdapter struct {
PostgresAdapter
}
105 changes: 105 additions & 0 deletions satellite/metabase/adapter_postgres.go
@@ -0,0 +1,105 @@
// Copyright (C) 2024 Storj Labs, Inc.
// See LICENSE for copying information.

package metabase

import (
"context"
"database/sql"
"errors"

"storj.io/common/tagsql"
)

// PostgresAdapter uses Cockroach related SQL queries.
type PostgresAdapter struct {
db tagsql.DB
}

// TestingBeginObjectExactVersion implements Adapter.
func (p *PostgresAdapter) TestingBeginObjectExactVersion(ctx context.Context, opts BeginObjectExactVersion, object *Object) error {
return p.db.QueryRowContext(ctx, `
INSERT INTO objects (
project_id, bucket_name, object_key, version, stream_id,
expires_at, encryption,
zombie_deletion_deadline,
encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key
) VALUES (
$1, $2, $3, $4, $5,
$6, $7,
$8,
$9, $10, $11
)
RETURNING status, created_at
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID,
opts.ExpiresAt, encryptionParameters{&opts.Encryption},
opts.ZombieDeletionDeadline,
opts.EncryptedMetadata, opts.EncryptedMetadataNonce, opts.EncryptedMetadataEncryptedKey,
).Scan(
&object.Status, &object.CreatedAt,
)
}

// BeginObject implements Adapter.
func (p *PostgresAdapter) BeginObject(ctx context.Context, opts BeginObjectNextVersion, object *Object) error {
return p.db.QueryRowContext(ctx, `
INSERT INTO objects (
project_id, bucket_name, object_key, version, stream_id,
expires_at, encryption,
zombie_deletion_deadline,
encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key
) VALUES (
$1, $2, $3,
coalesce((
SELECT version + 1
FROM objects
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
ORDER BY version DESC
LIMIT 1
), 1),
$4, $5, $6,
$7,
$8, $9, $10)
RETURNING status, version, created_at
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID,
opts.ExpiresAt, encryptionParameters{&opts.Encryption},
opts.ZombieDeletionDeadline,
opts.EncryptedMetadata, opts.EncryptedMetadataNonce, opts.EncryptedMetadataEncryptedKey,
).Scan(&object.Status, &object.Version, &object.CreatedAt)
}

// GetObjectLastCommitted implements Adapter.
func (p *PostgresAdapter) GetObjectLastCommitted(ctx context.Context, opts GetObjectLastCommitted, object *Object) error {
row := p.db.QueryRowContext(ctx, `
SELECT
stream_id, version, status,
created_at, expires_at,
segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption
FROM objects
WHERE
(project_id, bucket_name, object_key) = ($1, $2, $3) AND
status <> `+statusPending+` AND
(expires_at IS NULL OR expires_at > now())
ORDER BY version DESC
LIMIT 1`,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey)

err := row.Scan(
&object.StreamID, &object.Version, &object.Status,
&object.CreatedAt, &object.ExpiresAt,
&object.SegmentCount,
&object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey,
&object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize,
encryptionParameters{&object.Encryption},
)

if errors.Is(err, sql.ErrNoRows) || object.Status.IsDeleteMarker() {
return ErrObjectNotFound.Wrap(Error.Wrap(sql.ErrNoRows))
}
return nil
}

var _ Adapter = &PostgresAdapter{}
47 changes: 3 additions & 44 deletions satellite/metabase/commit.go
Expand Up @@ -93,30 +93,8 @@ func (db *DB) BeginObjectNextVersion(ctx context.Context, opts BeginObjectNextVe
ZombieDeletionDeadline: opts.ZombieDeletionDeadline,
}

if err := db.db.QueryRowContext(ctx, `
INSERT INTO objects (
project_id, bucket_name, object_key, version, stream_id,
expires_at, encryption,
zombie_deletion_deadline,
encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key
) VALUES (
$1, $2, $3,
coalesce((
SELECT version + 1
FROM objects
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
ORDER BY version DESC
LIMIT 1
), 1),
$4, $5, $6,
$7,
$8, $9, $10)
RETURNING status, version, created_at
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID,
opts.ExpiresAt, encryptionParameters{&opts.Encryption},
opts.ZombieDeletionDeadline,
opts.EncryptedMetadata, opts.EncryptedMetadataNonce, opts.EncryptedMetadataEncryptedKey,
).Scan(&object.Status, &object.Version, &object.CreatedAt); err != nil {
err = db.ChooseAdapter(opts.ProjectID).BeginObject(ctx, opts, &object)
if err != nil {
return Object{}, Error.New("unable to insert object: %w", err)
}

Expand Down Expand Up @@ -183,26 +161,7 @@ func (db *DB) TestingBeginObjectExactVersion(ctx context.Context, opts BeginObje
ZombieDeletionDeadline: opts.ZombieDeletionDeadline,
}

err = db.db.QueryRowContext(ctx, `
INSERT INTO objects (
project_id, bucket_name, object_key, version, stream_id,
expires_at, encryption,
zombie_deletion_deadline,
encrypted_metadata, encrypted_metadata_nonce, encrypted_metadata_encrypted_key
) VALUES (
$1, $2, $3, $4, $5,
$6, $7,
$8,
$9, $10, $11
)
RETURNING status, created_at
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.Version, opts.StreamID,
opts.ExpiresAt, encryptionParameters{&opts.Encryption},
opts.ZombieDeletionDeadline,
opts.EncryptedMetadata, opts.EncryptedMetadataNonce, opts.EncryptedMetadataEncryptedKey,
).Scan(
&object.Status, &object.CreatedAt,
)
err = db.ChooseAdapter(opts.ProjectID).TestingBeginObjectExactVersion(ctx, opts, &object)
if err != nil {
if code := pgerrcode.FromError(err); code == pgxerrcode.UniqueViolation {
return Object{}, Error.Wrap(ErrObjectAlreadyExists.New(""))
Expand Down
23 changes: 23 additions & 0 deletions satellite/metabase/db.go
Expand Up @@ -20,6 +20,7 @@ import (
"storj.io/common/dbutil/pgutil"
"storj.io/common/memory"
"storj.io/common/tagsql"
"storj.io/common/uuid"
"storj.io/storj/private/logging"
"storj.io/storj/private/migrate"
)
Expand Down Expand Up @@ -54,6 +55,8 @@ type DB struct {
testCleanup func() error

config Config

adapters []Adapter
}

// Open opens a connection to metabase.
Expand Down Expand Up @@ -92,6 +95,20 @@ func Open(ctx context.Context, log *zap.Logger, connstr string, config Config) (
config: config,
}
db.aliasCache = NewNodeAliasCache(db)
switch impl {
case dbutil.Postgres:
db.adapters = append(db.adapters, &PostgresAdapter{
db: rawdb,
})
case dbutil.Cockroach:
db.adapters = append(db.adapters, &CockroachAdapter{
PostgresAdapter{
db: rawdb,
},
})
default:
return nil, Error.New("unsupported implementation: %s", connstr)
}

if log.Level() == zap.DebugLevel {
log.Debug("Connected", zap.String("db source", logging.Redacted(connstr)))
Expand All @@ -103,6 +120,12 @@ func Open(ctx context.Context, log *zap.Logger, connstr string, config Config) (
// Implementation rturns the database implementation.
func (db *DB) Implementation() dbutil.Implementation { return db.impl }

// ChooseAdapter selects the right adapter based on configuration.
func (db *DB) ChooseAdapter(projectID uuid.UUID) Adapter {
// TODO: choose based on configuration.
return db.adapters[0]
}

// UnderlyingTagSQL returns *tagsql.DB.
// TODO: remove.
func (db *DB) UnderlyingTagSQL() tagsql.DB { return db.db }
Expand Down
31 changes: 3 additions & 28 deletions satellite/metabase/get.go
Expand Up @@ -137,34 +137,9 @@ func (db *DB) GetObjectLastCommitted(ctx context.Context, opts GetObjectLastComm
object.BucketName = opts.BucketName
object.ObjectKey = opts.ObjectKey

row := db.db.QueryRowContext(ctx, `
SELECT
stream_id, version, status,
created_at, expires_at,
segment_count,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
total_plain_size, total_encrypted_size, fixed_segment_size,
encryption
FROM objects
WHERE
(project_id, bucket_name, object_key) = ($1, $2, $3) AND
status <> `+statusPending+` AND
(expires_at IS NULL OR expires_at > now())
ORDER BY version DESC
LIMIT 1`,
opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey)

err = row.Scan(
&object.StreamID, &object.Version, &object.Status,
&object.CreatedAt, &object.ExpiresAt,
&object.SegmentCount,
&object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey,
&object.TotalPlainSize, &object.TotalEncryptedSize, &object.FixedSegmentSize,
encryptionParameters{&object.Encryption},
)

if errors.Is(err, sql.ErrNoRows) || object.Status.IsDeleteMarker() {
return Object{}, ErrObjectNotFound.Wrap(Error.Wrap(sql.ErrNoRows))
err = db.ChooseAdapter(opts.ProjectID).GetObjectLastCommitted(ctx, opts, &object)
if ErrObjectNotFound.Has(err) {
return Object{}, err
}
if err != nil {
return Object{}, Error.New("unable to query object status: %w", err)
Expand Down

0 comments on commit 816a386

Please sign in to comment.