Skip to content

Commit

Permalink
satellite/metabase: drop pending objects table support
Browse files Browse the repository at this point in the history
We decided that we won't use seprate table for handling pending
objects. We need to remove related code.

#6421

Change-Id: I442b0f58da75409f725e08e2cd83d29ed4f91ec6
  • Loading branch information
mniewrzal authored and Storj Robot committed Nov 15, 2023
1 parent cd9518a commit 6834c04
Show file tree
Hide file tree
Showing 19 changed files with 175 additions and 4,873 deletions.
419 changes: 111 additions & 308 deletions satellite/metabase/commit.go

Large diffs are not rendered by default.

1,779 changes: 60 additions & 1,719 deletions satellite/metabase/commit_test.go

Large diffs are not rendered by default.

67 changes: 0 additions & 67 deletions satellite/metabase/delete.go
Expand Up @@ -205,47 +205,6 @@ func (db *DB) DeletePendingObject(ctx context.Context, opts DeletePendingObject)
return result, nil
}

// DeletePendingObjectNew deletes a pending object.
// TODO DeletePendingObjectNew will replace DeletePendingObject when objects table will be free from pending objects.
func (db *DB) DeletePendingObjectNew(ctx context.Context, opts DeletePendingObject) (result DeleteObjectResult, err error) {
defer mon.Task()(&ctx)(&err)

if err := opts.Verify(); err != nil {
return DeleteObjectResult{}, err
}

err = withRows(db.db.QueryContext(ctx, `
WITH deleted_objects AS (
DELETE FROM pending_objects
WHERE
(project_id, bucket_name, object_key, stream_id) = ($1, $2, $3, $4)
RETURNING
stream_id, created_at, expires_at,
encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key,
encryption
), deleted_segments AS (
DELETE FROM segments
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING segments.stream_id
)
SELECT * FROM deleted_objects
`, opts.ProjectID, []byte(opts.BucketName), opts.ObjectKey, opts.StreamID))(func(rows tagsql.Rows) error {
result.Removed, err = db.scanPendingObjectDeletion(ctx, opts.Location(), rows)
return err
})
if err != nil {
return DeleteObjectResult{}, err
}

if len(result.Removed) == 0 {
return DeleteObjectResult{}, ErrObjectNotFound.Wrap(Error.New("no rows deleted"))
}

mon.Meter("object_delete").Mark(len(result.Removed))

return result, nil
}

// DeleteObjectsAllVersions deletes all versions of multiple objects from the same bucket.
func (db *DB) DeleteObjectsAllVersions(ctx context.Context, opts DeleteObjectsAllVersions) (result DeleteObjectResult, err error) {
defer mon.Task()(&ctx)(&err)
Expand Down Expand Up @@ -375,32 +334,6 @@ func (db *DB) scanMultipleObjectsDeletion(ctx context.Context, rows tagsql.Rows)
return objects, nil
}

func (db *DB) scanPendingObjectDeletion(ctx context.Context, location ObjectLocation, rows tagsql.Rows) (objects []Object, err error) {
defer mon.Task()(&ctx)(&err)

objects = make([]Object, 0, 10)

var object Object
for rows.Next() {
object.ProjectID = location.ProjectID
object.BucketName = location.BucketName
object.ObjectKey = location.ObjectKey

err = rows.Scan(&object.StreamID,
&object.CreatedAt, &object.ExpiresAt,
&object.EncryptedMetadataNonce, &object.EncryptedMetadata, &object.EncryptedMetadataEncryptedKey,
encryptionParameters{&object.Encryption},
)
if err != nil {
return nil, Error.New("unable to delete pending object: %w", err)
}

object.Status = Pending
objects = append(objects, object)
}
return objects, nil
}

// DeleteObjectLastCommitted contains arguments necessary for deleting last committed version of object.
type DeleteObjectLastCommitted struct {
ObjectLocation
Expand Down
67 changes: 0 additions & 67 deletions satellite/metabase/delete_bucket.go
Expand Up @@ -32,7 +32,6 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)

deleteBatchSizeLimit.Ensure(&opts.BatchSize)

// TODO we may think about doing pending and committed objects in parallel
deletedBatchCount := int64(opts.BatchSize)
for deletedBatchCount > 0 {
if err := ctx.Err(); err != nil {
Expand All @@ -47,20 +46,6 @@ func (db *DB) DeleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)
}
}

deletedBatchCount = int64(opts.BatchSize)
for deletedBatchCount > 0 {
if err := ctx.Err(); err != nil {
return deletedObjectCount, err
}

deletedBatchCount, err = db.deleteBucketPendingObjects(ctx, opts)
deletedObjectCount += deletedBatchCount

if err != nil {
return deletedObjectCount, err
}
}

return deletedObjectCount, nil
}

Expand Down Expand Up @@ -116,55 +101,3 @@ func (db *DB) deleteBucketObjects(ctx context.Context, opts DeleteBucketObjects)

return deletedObjectCount, nil
}

func (db *DB) deleteBucketPendingObjects(ctx context.Context, opts DeleteBucketObjects) (deletedObjectCount int64, err error) {
defer mon.Task()(&ctx)(&err)

var query string

// TODO handle number of deleted segments
switch db.impl {
case dbutil.Cockroach:
query = `
WITH deleted_objects AS (
DELETE FROM pending_objects
WHERE (project_id, bucket_name) = ($1, $2)
LIMIT $3
RETURNING pending_objects.stream_id
), deleted_segments AS (
DELETE FROM segments
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING segments.stream_id
)
SELECT COUNT(1) FROM deleted_objects
`
case dbutil.Postgres:
query = `
WITH deleted_objects AS (
DELETE FROM pending_objects
WHERE stream_id IN (
SELECT stream_id FROM pending_objects
WHERE (project_id, bucket_name) = ($1, $2)
LIMIT $3
)
RETURNING pending_objects.stream_id
), deleted_segments AS (
DELETE FROM segments
WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects)
RETURNING segments.stream_id
)
SELECT COUNT(1) FROM deleted_objects
`
default:
return 0, Error.New("unhandled database: %v", db.impl)
}

err = db.db.QueryRowContext(ctx, query, opts.Bucket.ProjectID, []byte(opts.Bucket.BucketName), opts.BatchSize).Scan(&deletedObjectCount)
if err != nil {
return 0, Error.Wrap(err)
}

mon.Meter("object_delete").Mark64(deletedObjectCount)

return deletedObjectCount, nil
}
47 changes: 0 additions & 47 deletions satellite/metabase/delete_bucket_test.go
Expand Up @@ -238,53 +238,6 @@ func TestDeleteBucketObjects(t *testing.T) {

metabasetest.Verify{}.Check(ctx, t, db)
})

t.Run("pending and committed objects", func(t *testing.T) {
defer metabasetest.DeleteAll{}.Check(ctx, t, db)

metabasetest.CreateObject(ctx, t, db, obj1, 2)

obj1.ObjectKey = "some key"
obj1.Version = metabase.NextVersion
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj1,
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: true,
},
Version: metabase.PendingVersion,
}.Check(ctx, t, db)

metabasetest.DeleteBucketObjects{
Opts: metabase.DeleteBucketObjects{
Bucket: obj1.Location().Bucket(),
BatchSize: 2,
},
Deleted: 2,
}.Check(ctx, t, db)

metabasetest.Verify{}.Check(ctx, t, db)

// object only in pending_objects table
metabasetest.BeginObjectNextVersion{
Opts: metabase.BeginObjectNextVersion{
ObjectStream: obj1,
Encryption: metabasetest.DefaultEncryption,
UsePendingObjectsTable: true,
},
Version: metabase.PendingVersion,
}.Check(ctx, t, db)

metabasetest.DeleteBucketObjects{
Opts: metabase.DeleteBucketObjects{
Bucket: obj1.Location().Bucket(),
BatchSize: 2,
},
Deleted: 1,
}.Check(ctx, t, db)

metabasetest.Verify{}.Check(ctx, t, db)
})
})
}

Expand Down
116 changes: 0 additions & 116 deletions satellite/metabase/delete_objects.go
Expand Up @@ -290,119 +290,3 @@ func (db *DB) deleteInactiveObjectsAndSegments(ctx context.Context, objects []Ob

return nil
}

// DeleteInactivePendingObjects deletes all pending objects that are inactive. Inactive means that zombie deletion deadline passed
// and no new segmets were uploaded after opts.InactiveDeadline.
func (db *DB) DeleteInactivePendingObjects(ctx context.Context, opts DeleteZombieObjects) (err error) {
defer mon.Task()(&ctx)(&err)

return db.deleteObjectsAndSegmentsBatch(ctx, opts.BatchSize, func(startAfter ObjectStream, batchsize int) (last ObjectStream, err error) {
defer mon.Task()(&ctx)(&err)

query := `
SELECT
project_id, bucket_name, object_key, stream_id
FROM pending_objects
` + db.impl.AsOfSystemInterval(opts.AsOfSystemInterval) + `
WHERE
(project_id, bucket_name, object_key, stream_id) > ($1, $2, $3, $4)
AND (zombie_deletion_deadline IS NULL OR zombie_deletion_deadline < $5)
ORDER BY project_id, bucket_name, object_key, stream_id
LIMIT $6;`

objects := make([]ObjectStream, 0, batchsize)

scanErrClass := errs.Class("DB rows scan has failed")
err = withRows(db.db.QueryContext(ctx, query,
startAfter.ProjectID, []byte(startAfter.BucketName), []byte(startAfter.ObjectKey), startAfter.StreamID,
opts.DeadlineBefore,
batchsize),
)(func(rows tagsql.Rows) error {
for rows.Next() {
err = rows.Scan(&last.ProjectID, &last.BucketName, &last.ObjectKey, &last.StreamID)
if err != nil {
return scanErrClass.Wrap(err)
}

db.log.Debug("selected zombie object for deleting it",
zap.Stringer("Project", last.ProjectID),
zap.String("Bucket", last.BucketName),
zap.String("Object Key", string(last.ObjectKey)),
zap.Stringer("StreamID", last.StreamID),
)
objects = append(objects, last)
}

return nil
})
if err != nil {
if scanErrClass.Has(err) {
return ObjectStream{}, Error.New("unable to select zombie objects for deletion: %w", err)
}

db.log.Warn("unable to select zombie objects for deletion", zap.Error(Error.Wrap(err)))
return ObjectStream{}, nil
}

err = db.deleteInactiveObjectsAndSegmentsNew(ctx, objects, opts)
if err != nil {
db.log.Warn("delete from DB zombie objects", zap.Error(err))
return ObjectStream{}, nil
}

return last, nil
})
}

func (db *DB) deleteInactiveObjectsAndSegmentsNew(ctx context.Context, objects []ObjectStream, opts DeleteZombieObjects) (err error) {
defer mon.Task()(&ctx)(&err)

if len(objects) == 0 {
return nil
}

err = pgxutil.Conn(ctx, db.db, func(conn *pgx.Conn) error {
var batch pgx.Batch
for _, obj := range objects {
batch.Queue(`
WITH check_segments AS (
SELECT 1 FROM segments
WHERE stream_id = $4::BYTEA AND created_at > $5
), deleted_objects AS (
DELETE FROM pending_objects
WHERE
(project_id, bucket_name, object_key, stream_id) = ($1::BYTEA, $2::BYTEA, $3::BYTEA, $4) AND
NOT EXISTS (SELECT 1 FROM check_segments)
RETURNING stream_id
)
DELETE FROM segments
WHERE segments.stream_id IN (SELECT stream_id FROM deleted_objects)
`, obj.ProjectID, []byte(obj.BucketName), []byte(obj.ObjectKey), obj.StreamID, opts.InactiveDeadline)
}

results := conn.SendBatch(ctx, &batch)
defer func() { err = errs.Combine(err, results.Close()) }()

var segmentsDeleted int64
var errlist errs.Group
for i := 0; i < batch.Len(); i++ {
result, err := results.Exec()
errlist.Add(err)

if err == nil {
segmentsDeleted += result.RowsAffected()
}
}

// TODO calculate deleted objects
mon.Meter("zombie_segment_delete").Mark64(segmentsDeleted)
mon.Meter("segment_delete").Mark64(segmentsDeleted)

return errlist.Err()
})
if err != nil {
return Error.New("unable to delete zombie objects: %w", err)
}

return nil
}

0 comments on commit 6834c04

Please sign in to comment.