Skip to content

Commit

Permalink
satellite/metabase: use order by in precommit hook
Browse files Browse the repository at this point in the history
Looks like there's a slight improvement, but the results were initially
quite unstable.

                                             │ before.txt  │             after.txt              │
                                             │   sec/op    │   sec/op     vs base               │
PrecommitConstraint/Postgres/unversioned-32    314.9µ ± 4%   293.7µ ± 2%   -6.72% (p=0.000 n=8)
PrecommitConstraint/Postgres/versioned-32      224.7µ ± 3%   212.0µ ± 3%   -5.63% (p=0.000 n=8)
PrecommitConstraint/Cockroach/unversioned-32   3.227m ± 4%   3.236m ± 3%        ~ (p=0.798 n=8)
PrecommitConstraint/Cockroach/versioned-32     864.7µ ± 2%   768.9µ ± 3%  -11.08% (p=0.000 n=8)
geomean                                        666.6µ        627.4µ        -5.87%

Update #6692

Change-Id: I77b16beae235cca11aab3633b894dfc07f1af30e
  • Loading branch information
egonelbre committed Jan 17, 2024
1 parent 573e7c7 commit d10f8b8
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 6 deletions.
30 changes: 24 additions & 6 deletions satellite/metabase/precommit.go
Expand Up @@ -6,6 +6,7 @@ package metabase
import (
"context"
"database/sql"
"errors"

"go.uber.org/zap"

Expand Down Expand Up @@ -82,10 +83,15 @@ func (db *DB) precommitQueryHighest(ctx context.Context, loc ObjectLocation, tx
}

err = tx.QueryRowContext(ctx, `
SELECT COALESCE(MAX(version), 0) as version
SELECT version
FROM objects
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
ORDER BY version DESC
LIMIT 1
`, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).Scan(&highest)
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
}
if err != nil {
return 0, Error.Wrap(err)
}
Expand All @@ -101,12 +107,15 @@ func (db *DB) precommitQueryHighestAndUnversioned(ctx context.Context, loc Objec
return 0, false, Error.Wrap(err)
}

var version sql.NullInt64
err = tx.QueryRowContext(ctx, `
SELECT
(
SELECT COALESCE(MAX(version), 0) as version
SELECT version
FROM objects
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
ORDER BY version DESC
LIMIT 1
),
(
SELECT EXISTS (
Expand All @@ -116,10 +125,13 @@ func (db *DB) precommitQueryHighestAndUnversioned(ctx context.Context, loc Objec
status IN `+statusesUnversioned+`
)
)
`, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).Scan(&highest, &unversionedExists)
`, loc.ProjectID, []byte(loc.BucketName), loc.ObjectKey).Scan(&version, &unversionedExists)
if err != nil {
return 0, false, Error.Wrap(err)
}
if version.Valid {
highest = Version(version.Int64)
}

return highest, unversionedExists, nil
}
Expand Down Expand Up @@ -147,9 +159,11 @@ func (db *DB) precommitDeleteUnversioned(ctx context.Context, loc ObjectLocation

err = tx.QueryRowContext(ctx, `
WITH highest_object AS (
SELECT MAX(version) as version
SELECT version
FROM objects
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
ORDER BY version DESC
LIMIT 1
), deleted_objects AS (
DELETE FROM objects
WHERE
Expand Down Expand Up @@ -284,14 +298,18 @@ func (db *DB) precommitDeleteUnversionedWithNonPending(ctx context.Context, loc

err = tx.QueryRowContext(ctx, `
WITH highest_object AS (
SELECT MAX(version) as version
SELECT version
FROM objects
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
ORDER BY version DESC
LIMIT 1
), highest_non_pending_object AS (
SELECT MAX(version) as version
SELECT version
FROM objects
WHERE (project_id, bucket_name, object_key) = ($1, $2, $3)
AND status <> `+statusPending+`
ORDER BY version DESC
LIMIT 1
), deleted_objects AS (
DELETE FROM objects
WHERE
Expand Down
9 changes: 9 additions & 0 deletions satellite/metabase/precommit_expose_test.go
Expand Up @@ -21,3 +21,12 @@ func (db *DB) PrecommitConstraint(ctx context.Context, opts PrecommitConstraint,
r, err := db.precommitConstraint(ctx, precommitConstraint(opts), tx)
return PrecommitConstraintResult(r), err
}

// PrecommitConstraintWithNonPendingResult exposes precommitConstraintWithNonPendingResult for testing.
type PrecommitConstraintWithNonPendingResult precommitConstraintWithNonPendingResult

// PrecommitDeleteUnversionedWithNonPending exposes precommitDeleteUnversionedWithNonPending for testing.
func (db *DB) PrecommitDeleteUnversionedWithNonPending(ctx context.Context, loc ObjectLocation, tx stmtRow) (result PrecommitConstraintWithNonPendingResult, err error) {
r, err := db.precommitDeleteUnversionedWithNonPending(ctx, loc, tx)
return PrecommitConstraintWithNonPendingResult(r), err
}
28 changes: 28 additions & 0 deletions satellite/metabase/precommit_test.go
Expand Up @@ -4,6 +4,7 @@
package metabase_test

import (
"fmt"
"strconv"
"testing"

Expand All @@ -14,6 +15,33 @@ import (
"storj.io/storj/satellite/metabase/metabasetest"
)

func TestPrecommitConstraint_Empty(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
obj := metabasetest.RandObjectStream()

for _, versioned := range []bool{false, true} {
for _, disallowDelete := range []bool{false, true} {
name := fmt.Sprintf("Versioned:%v,DisallowDelete:%v", versioned, disallowDelete)
t.Run(name, func(t *testing.T) {
result, err := db.PrecommitConstraint(ctx, metabase.PrecommitConstraint{
Location: obj.Location(),
Versioned: versioned,
DisallowDelete: disallowDelete,
}, db.UnderlyingTagSQL())
require.NoError(t, err)
require.Equal(t, metabase.PrecommitConstraintResult{}, result)
})
}
}

t.Run("with-non-pending", func(t *testing.T) {
result, err := db.PrecommitDeleteUnversionedWithNonPending(ctx, obj.Location(), db.UnderlyingTagSQL())
require.NoError(t, err)
require.Equal(t, metabase.PrecommitConstraintWithNonPendingResult{}, result)
})
})
}

func BenchmarkPrecommitConstraint(b *testing.B) {
metabasetest.Bench(b, func(ctx *testcontext.Context, b *testing.B, db *metabase.DB) {
baseObj := metabasetest.RandObjectStream()
Expand Down

0 comments on commit d10f8b8

Please sign in to comment.