Skip to content

Commit

Permalink
satellite/metabase/metaloop: use database time
Browse files Browse the repository at this point in the history
The system and database time may drift. We should use database time for
absolute "as of system time" to ensure that it's not newer than the
current database time. When the "as of system time" is in the future,
then the query will fail.

Change-Id: I5423f6aaad966ca03a76b5ff805bfba932e44a51
  • Loading branch information
egonelbre committed May 7, 2021
1 parent 60ff87a commit 2af7e4e
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 1 deletion.
8 changes: 8 additions & 0 deletions satellite/metabase/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"sort"
"strconv"
"time"

_ "github.com/jackc/pgx/v4" // registers pgx as a tagsql driver.
_ "github.com/jackc/pgx/v4/stdlib" // registers pgx as a tagsql driver.
Expand Down Expand Up @@ -415,3 +416,10 @@ func (pq postgresRebind) Rebind(sql string) string {

return string(out)
}

// Now returns time on the database.
func (db *DB) Now(ctx context.Context) (time.Time, error) {
var t time.Time
err := db.db.QueryRowContext(ctx, `SELECT now()`).Scan(&t)
return t, Error.Wrap(err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package metabase_test
import (
"sort"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -116,3 +117,12 @@ func TestMigrateToAliases(t *testing.T) {
})
}
}

func TestNow(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
sysnow := time.Now()
now, err := db.Now(ctx)
require.NoError(t, err)
require.WithinDuration(t, sysnow, now, 5*time.Second)
})
}
7 changes: 6 additions & 1 deletion satellite/metabase/metaloop/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ type Config struct {

// MetabaseDB contains iterators for the metabase data.
type MetabaseDB interface {
// Now returns the time on the database.
Now(ctx context.Context) (time.Time, error)
// IterateLoopObjects iterates through all objects in metabase for metainfo loop purpose.
IterateLoopObjects(ctx context.Context, opts metabase.IterateLoopObjects, fn func(context.Context, metabase.LoopObjectsIterator) error) (err error)
// IterateLoopStreams iterates through all streams passed in as arguments.
Expand Down Expand Up @@ -372,7 +374,10 @@ func iterateObjects(ctx context.Context, metabaseDB MetabaseDB, observers []*obs
limit = batchsizeLimit
}

startingTime := time.Now()
startingTime, err := metabaseDB.Now(ctx)
if err != nil {
return observers, Error.Wrap(err)
}

noObserversErr := errs.New("no observers")

Expand Down
3 changes: 3 additions & 0 deletions satellite/metainfo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ type Config struct {
type MetabaseDB interface {
io.Closer

// Now returns time on the database.
Now(ctx context.Context) (time.Time, error)

// MigrateToLatest migrates to latest schema version.
MigrateToLatest(ctx context.Context) error
// CheckVersion checks the database is the correct version
Expand Down

0 comments on commit 2af7e4e

Please sign in to comment.