From 89dad05c65395db1719735589b76253cf4f8f1c6 Mon Sep 17 00:00:00 2001 From: Egon Elbre Date: Wed, 22 Nov 2023 14:00:22 +0200 Subject: [PATCH] satellite/metabase: don't create delete markers for pending Fix a case where it was possible to create a delete marker when only pending object was present. The solution is not pretty, but we have a TODO note to cleanup the precommit code, so let's fix the bug first. Change-Id: I0ab66d745443c9dccbf29ef32389dd912b2d9caf --- satellite/metabase/delete.go | 7 +- satellite/metabase/delete_test.go | 32 +++++++ satellite/metabase/precommit.go | 144 ++++++++++++++++++++++++++++++ 3 files changed, 179 insertions(+), 4 deletions(-) diff --git a/satellite/metabase/delete.go b/satellite/metabase/delete.go index 004a730f8fe9..4d43bded130d 100644 --- a/satellite/metabase/delete.go +++ b/satellite/metabase/delete.go @@ -364,14 +364,13 @@ func (db *DB) DeleteObjectLastCommitted( return DeleteObjectResult{}, Error.Wrap(err) } - var precommit precommitConstraintResult + var precommit precommitConstraintWithNonPendingResult err = txutil.WithTx(ctx, db.db, nil, func(ctx context.Context, tx tagsql.Tx) (err error) { - precommit, err = db.precommitDeleteUnversioned(ctx, opts.ObjectLocation, tx) + precommit, err = db.precommitDeleteUnversionedWithNonPending(ctx, opts.ObjectLocation, tx) if err != nil { return Error.Wrap(err) } - // TODO(ver): currently it allows adding delete markers when pending objects are present. - if precommit.HighestVersion == 0 { + if precommit.HighestVersion == 0 || precommit.HighestNonPendingVersion == 0 { // an object didn't exist in the first place return ErrObjectNotFound.New("unable to delete object") } diff --git a/satellite/metabase/delete_test.go b/satellite/metabase/delete_test.go index 6de98d6375ef..3de8717ee1b9 100644 --- a/satellite/metabase/delete_test.go +++ b/satellite/metabase/delete_test.go @@ -704,6 +704,38 @@ func TestDeleteObjectVersioning(t *testing.T) { }, }.Check(ctx, t, db) }) + + t.Run("delete last pending with suspended", func(t *testing.T) { + defer metabasetest.DeleteAll{}.Check(ctx, t, db) + + obj := metabasetest.RandObjectStream() + pending := metabasetest.BeginObjectExactVersion{ + Opts: metabase.BeginObjectExactVersion{ + ObjectStream: obj, + Encryption: metabasetest.DefaultEncryption, + }, + }.Check(ctx, t, db) + + metabasetest.DeleteObjectLastCommitted{ + Opts: metabase.DeleteObjectLastCommitted{ + ObjectLocation: metabase.ObjectLocation{ + ProjectID: obj.ProjectID, + BucketName: obj.BucketName, + ObjectKey: obj.ObjectKey, + }, + Versioned: false, + Suspended: true, + }, + ErrClass: &metabase.ErrObjectNotFound, + ErrText: "unable to delete object", + }.Check(ctx, t, db) + + metabasetest.Verify{ + Objects: []metabase.RawObject{ + metabase.RawObject(pending), + }, + }.Check(ctx, t, db) + }) }) } diff --git a/satellite/metabase/precommit.go b/satellite/metabase/precommit.go index 01f7cc22e6a1..e3c0498f410a 100644 --- a/satellite/metabase/precommit.go +++ b/satellite/metabase/precommit.go @@ -238,3 +238,147 @@ func (db *DB) precommitDeleteUnversioned(ctx context.Context, loc ObjectLocation return result, nil } + +type precommitConstraintWithNonPendingResult struct { + Deleted []Object + + // DeletedObjectCount returns how many objects were deleted. + DeletedObjectCount int + // DeletedSegmentCount returns how many segments were deleted. + DeletedSegmentCount int + + // HighestVersion returns tha highest version that was present in the table. + // It returns 0 if there was none. + HighestVersion Version + + // HighestNonPendingVersion returns tha highest non-pending version that was present in the table. + // It returns 0 if there was none. + HighestNonPendingVersion Version +} + +func (r *precommitConstraintWithNonPendingResult) submitMetrics() { + mon.Meter("object_delete").Mark(r.DeletedObjectCount) + mon.Meter("segment_delete").Mark(r.DeletedSegmentCount) +} + +// precommitDeleteUnversionedWithNonPending deletes the unversioned object at loc and also returns the highest version and highest committed version. +func (db *DB) precommitDeleteUnversionedWithNonPending(ctx context.Context, loc ObjectLocation, tx stmtRow) (result precommitConstraintWithNonPendingResult, err error) { + defer mon.Task()(&ctx)(&err) + + if err := loc.Verify(); err != nil { + return precommitConstraintWithNonPendingResult{}, Error.Wrap(err) + } + + var deleted Object + + // TODO(ver): this scanning can probably simplified somehow. + + var version sql.NullInt64 + var streamID uuid.NullUUID + var createdAt sql.NullTime + var segmentCount, fixedSegmentSize sql.NullInt32 + var totalPlainSize, totalEncryptedSize sql.NullInt64 + var status sql.NullByte + var encryptionParams nullableValue[encryptionParameters] + encryptionParams.value.EncryptionParameters = &deleted.Encryption + + err = tx.QueryRowContext(ctx, ` + WITH highest_object AS ( + SELECT MAX(version) as version + FROM objects + WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) + ), highest_non_pending_object AS ( + SELECT MAX(version) as version + FROM objects + WHERE (project_id, bucket_name, object_key) = ($1, $2, $3) + AND status <> `+statusPending+` + ), deleted_objects AS ( + DELETE FROM objects + WHERE + (project_id, bucket_name, object_key) = ($1, $2, $3) + AND status IN `+statusesUnversioned+` + RETURNING + version, stream_id, + created_at, expires_at, + status, segment_count, + encrypted_metadata_nonce, encrypted_metadata, encrypted_metadata_encrypted_key, + total_plain_size, total_encrypted_size, fixed_segment_size, + encryption + ), deleted_segments AS ( + DELETE FROM segments + WHERE segments.stream_id IN (SELECT deleted_objects.stream_id FROM deleted_objects) + RETURNING segments.stream_id + ) + SELECT + (SELECT version FROM deleted_objects), + (SELECT stream_id FROM deleted_objects), + (SELECT created_at FROM deleted_objects), + (SELECT expires_at FROM deleted_objects), + (SELECT status FROM deleted_objects), + (SELECT segment_count FROM deleted_objects), + (SELECT encrypted_metadata_nonce FROM deleted_objects), + (SELECT encrypted_metadata FROM deleted_objects), + (SELECT encrypted_metadata_encrypted_key FROM deleted_objects), + (SELECT total_plain_size FROM deleted_objects), + (SELECT total_encrypted_size FROM deleted_objects), + (SELECT fixed_segment_size FROM deleted_objects), + (SELECT encryption FROM deleted_objects), + (SELECT count(*) FROM deleted_objects), + (SELECT count(*) FROM deleted_segments), + coalesce((SELECT version FROM highest_object), 0), + coalesce((SELECT version FROM highest_non_pending_object), 0) + `, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey). + Scan( + &version, + &streamID, + &createdAt, + &deleted.ExpiresAt, + &status, + &segmentCount, + &deleted.EncryptedMetadataNonce, + &deleted.EncryptedMetadata, + &deleted.EncryptedMetadataEncryptedKey, + &totalPlainSize, + &totalEncryptedSize, + &fixedSegmentSize, + &encryptionParams, + &result.DeletedObjectCount, + &result.DeletedSegmentCount, + &result.HighestVersion, + &result.HighestNonPendingVersion, + ) + + if err != nil { + return precommitConstraintWithNonPendingResult{}, Error.Wrap(err) + } + + deleted.ProjectID = loc.ProjectID + deleted.BucketName = loc.BucketName + deleted.ObjectKey = loc.ObjectKey + deleted.Version = Version(version.Int64) + + deleted.Status = ObjectStatus(status.Byte) + deleted.StreamID = streamID.UUID + deleted.CreatedAt = createdAt.Time + deleted.SegmentCount = segmentCount.Int32 + + deleted.TotalPlainSize = totalPlainSize.Int64 + deleted.TotalEncryptedSize = totalEncryptedSize.Int64 + deleted.FixedSegmentSize = fixedSegmentSize.Int32 + + if result.DeletedObjectCount > 1 { + db.log.Error("object with multiple committed versions were found!", + zap.Stringer("Project ID", loc.ProjectID), zap.String("Bucket Name", loc.BucketName), + zap.ByteString("Object Key", []byte(loc.ObjectKey)), zap.Int("deleted", result.DeletedObjectCount)) + + mon.Meter("multiple_committed_versions").Mark(1) + + return result, Error.New("internal error: multiple committed unversioned objects") + } + + if result.DeletedObjectCount > 0 { + result.Deleted = append(result.Deleted, deleted) + } + + return result, nil +}