Skip to content

Commit

Permalink
satellite/metabase: spanner node aliases implementation
Browse files Browse the repository at this point in the history
This is spanner implementation for managing node aliases. This is not
final implementation and it's main purpose is unblock implementation
of different metabase methods.

Change-Id: I8f3a25426ac7a8931868e12054bb0597fd9c19fd
  • Loading branch information
mniewrzal authored and Storj Robot committed Apr 19, 2024
1 parent 1cfb8d2 commit ed6bf03
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 18 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ require (
golang.org/x/text v0.14.0
golang.org/x/time v0.5.0
google.golang.org/api v0.168.0
google.golang.org/grpc v1.62.0
gopkg.in/segmentio/analytics-go.v3 v3.1.0
gopkg.in/yaml.v3 v3.0.1
storj.io/common v0.0.0-20240417231550-f34d2f5f48d5
Expand Down Expand Up @@ -171,7 +172,6 @@ require (
google.golang.org/genproto v0.0.0-20240205150955-31a09d347014 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240205150955-31a09d347014 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240304161311-37d4d3c04a78 // indirect
google.golang.org/grpc v1.62.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions satellite/metabase/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ type Adapter interface {
GetObjectLastCommitted(ctx context.Context, opts GetObjectLastCommitted, object *Object) error
TestingBeginObjectExactVersion(ctx context.Context, opts BeginObjectExactVersion, object *Object) error

EnsureNodeAliases(ctx context.Context, opts EnsureNodeAliases) error
ListNodeAliases(ctx context.Context) (_ []NodeAliasEntry, err error)

TestingBatchInsertSegments(ctx context.Context, aliasCache *NodeAliasCache, segments []RawSegment) (err error)
TestingGetAllSegments(ctx context.Context, aliasCache *NodeAliasCache) (_ []RawSegment, err error)
TestingDeleteAll(ctx context.Context) (err error)
}

// PostgresAdapter uses Cockroach related SQL queries.
Expand Down
134 changes: 122 additions & 12 deletions satellite/metabase/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,20 @@ package metabase

import (
"context"
"errors"
"math/rand"
"sort"
"strings"
"time"

"cloud.google.com/go/spanner"
"github.com/zeebo/errs"
"google.golang.org/api/iterator"
"google.golang.org/grpc/codes"

"storj.io/common/dbutil/pgutil"
"storj.io/common/storj"
"storj.io/common/uuid"
)

// NodeAlias is a metabase local alias for NodeID-s to reduce segment table size.
Expand All @@ -32,12 +40,77 @@ type EnsureNodeAliases struct {
func (db *DB) EnsureNodeAliases(ctx context.Context, opts EnsureNodeAliases) (err error) {
defer mon.Task()(&ctx)(&err)

unique := make([]storj.NodeID, 0, len(opts.Nodes))
seen := make(map[storj.NodeID]bool, len(opts.Nodes))
// TODO(spanner) long term this needs to be a coordinated insert across all adapters,
// i.e. one of them needs to be the source of truth, otherwise there will be issues
// with different db having different NodeAlias for the same node id.
return db.ChooseAdapter(uuid.UUID{}).EnsureNodeAliases(ctx, opts)
}

// EnsureNodeAliases implements Adapter.
func (p *PostgresAdapter) EnsureNodeAliases(ctx context.Context, opts EnsureNodeAliases) (err error) {
defer mon.Task()(&ctx)(&err)

unique, err := ensureNodesUniqueness(opts.Nodes)
if err != nil {
return err
}

_, err = p.db.ExecContext(ctx, `
INSERT INTO node_aliases(node_id)
SELECT unnest($1::BYTEA[])
ON CONFLICT DO NOTHING
`, pgutil.NodeIDArray(unique))
return Error.Wrap(err)
}

// EnsureNodeAliases implements Adapter.
func (s *SpannerAdapter) EnsureNodeAliases(ctx context.Context, opts EnsureNodeAliases) (err error) {
defer mon.Task()(&ctx)(&err)

unique, err := ensureNodesUniqueness(opts.Nodes)
if err != nil {
return err
}

// TODO(spanner) this is not prod ready implementation
// TODO(spanner) limited alias value to avoid out of memory
maxAliasValue := int64(10000)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))

// TODO(spanner) figure out how to do something like ON CONFLICT DO NOTHING
index := 0
for index < len(unique) {
entry := unique[index]
alias := rng.Int63n(maxAliasValue) + 1
_, err = s.client.Apply(ctx, []*spanner.Mutation{
spanner.Insert("node_aliases", []string{"node_id", "node_alias"}, []interface{}{
entry.Bytes(), alias,
}),
})
if err != nil {
if spanner.ErrCode(err) == codes.AlreadyExists {
// TODO(spanner) figure out how to detect UNIQUE violation
if strings.Contains(spanner.ErrDesc(err), "UNIQUE violation on index node_aliases_node_alias_key") {
// go back and find unique alias
continue
}
} else {
return Error.Wrap(err)
}
}
index++
}
return nil

}

func ensureNodesUniqueness(nodes []storj.NodeID) ([]storj.NodeID, error) {
unique := make([]storj.NodeID, 0, len(nodes))
seen := make(map[storj.NodeID]bool, len(nodes))

for _, node := range opts.Nodes {
for _, node := range nodes {
if node.IsZero() {
return Error.New("tried to add alias to zero node")
return nil, Error.New("tried to add alias to zero node")
}
if !seen[node] {
seen[node] = true
Expand All @@ -46,21 +119,22 @@ func (db *DB) EnsureNodeAliases(ctx context.Context, opts EnsureNodeAliases) (er
}

sort.Sort(storj.NodeIDList(unique))

_, err = db.db.ExecContext(ctx, `
INSERT INTO node_aliases(node_id)
SELECT unnest($1::BYTEA[])
ON CONFLICT DO NOTHING
`, pgutil.NodeIDArray(unique))
return Error.Wrap(err)
return unique, nil
}

// ListNodeAliases lists all node alias mappings.
func (db *DB) ListNodeAliases(ctx context.Context) (_ []NodeAliasEntry, err error) {
defer mon.Task()(&ctx)(&err)

return db.ChooseAdapter(uuid.UUID{}).ListNodeAliases(ctx)
}

// ListNodeAliases implements Adapter.
func (p *PostgresAdapter) ListNodeAliases(ctx context.Context) (_ []NodeAliasEntry, err error) {
defer mon.Task()(&ctx)(&err)

var aliases []NodeAliasEntry
rows, err := db.db.Query(ctx, `
rows, err := p.db.Query(ctx, `
SELECT node_id, node_alias
FROM node_aliases
`)
Expand All @@ -84,6 +158,42 @@ func (db *DB) ListNodeAliases(ctx context.Context) (_ []NodeAliasEntry, err erro
return aliases, nil
}

// ListNodeAliases implements Adapter.
func (s *SpannerAdapter) ListNodeAliases(ctx context.Context) (aliases []NodeAliasEntry, err error) {
defer mon.Task()(&ctx)(&err)

stmt := spanner.Statement{SQL: `
SELECT node_id, node_alias FROM node_aliases
`}
iter := s.client.Single().Query(ctx, stmt)
defer iter.Stop()

for {
row, err := iter.Next()
if errors.Is(err, iterator.Done) {
return aliases, nil
}
if err != nil {
return nil, Error.Wrap(err)
}

var nodeID []byte
var nodeAlias int64
if err := row.Columns(&nodeID, &nodeAlias); err != nil {
return nil, Error.New("ListNodeAliases scan failed: %w", err)
}

id, err := storj.NodeIDFromBytes(nodeID)
if err != nil {
return nil, Error.Wrap(err)
}
aliases = append(aliases, NodeAliasEntry{
ID: id,
Alias: NodeAlias(nodeAlias),
})
}
}

// LatestNodesAliasMap returns the latest mapping between storj.NodeID and NodeAlias.
func (db *DB) LatestNodesAliasMap(ctx context.Context) (_ *NodeAliasMap, err error) {
defer mon.Task()(&ctx)(&err)
Expand Down
18 changes: 15 additions & 3 deletions satellite/metabase/alias_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"storj.io/common/storj"
"storj.io/common/testcontext"
"storj.io/common/testrand"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/metabase/metabasetest"
)
Expand Down Expand Up @@ -72,7 +73,9 @@ func TestNodeAliases(t *testing.T) {

for _, entry := range aliases {
require.True(t, nodesContains(nodes, entry.ID))
require.LessOrEqual(t, int(entry.Alias), 3)
nonSpannerCheck(db, func() {
require.LessOrEqual(t, int(entry.Alias), len(nodes))
})
}

metabasetest.EnsureNodeAliases{
Expand Down Expand Up @@ -109,7 +112,9 @@ func TestNodeAliases(t *testing.T) {
require.Len(t, aliases, len(nodes))
for _, entry := range aliases {
require.True(t, nodesContains(nodes, entry.ID))
require.LessOrEqual(t, int(entry.Alias), len(nodes))
nonSpannerCheck(db, func() {
require.LessOrEqual(t, int(entry.Alias), len(nodes))
})

require.False(t, seen[entry.Alias])
seen[entry.Alias] = true
Expand Down Expand Up @@ -226,7 +231,14 @@ func TestNodeAliases(t *testing.T) {
}
require.NoError(t, group.Wait())
})
})
}, metabasetest.WithSpanner())
}

func nonSpannerCheck(db *metabase.DB, check func()) {
adapter := db.ChooseAdapter(uuid.UUID{})
if _, ok := adapter.(*metabase.SpannerAdapter); !ok {
check()
}
}

func nodesContains(nodes []storj.NodeID, v storj.NodeID) bool {
Expand Down
1 change: 1 addition & 0 deletions satellite/metabase/aliascache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func TestNodeAliasCache(t *testing.T) {
})
}

// TODO(spanner) enable this test for spanner.
func TestNodeAliasCache_DB(t *testing.T) {
metabasetest.Run(t, func(ctx *testcontext.Context, t *testing.T, db *metabase.DB) {
t.Run("missing aliases", func(t *testing.T) {
Expand Down
24 changes: 22 additions & 2 deletions satellite/metabase/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,33 @@ func (db *DB) TestingGetState(ctx context.Context) (_ *RawState, err error) {

// TestingDeleteAll deletes all objects and segments from the database.
func (db *DB) TestingDeleteAll(ctx context.Context) (err error) {
_, err = db.db.ExecContext(ctx, `
db.aliasCache = NewNodeAliasCache(db)
for _, a := range db.adapters {
if err := a.TestingDeleteAll(ctx); err != nil {
return err
}
}
return nil
}

// TestingDeleteAll implements Adapter.
func (p *PostgresAdapter) TestingDeleteAll(ctx context.Context) (err error) {
_, err = p.db.ExecContext(ctx, `
WITH ignore_full_scan_for_test AS (SELECT 1) DELETE FROM objects;
WITH ignore_full_scan_for_test AS (SELECT 1) DELETE FROM segments;
WITH ignore_full_scan_for_test AS (SELECT 1) DELETE FROM node_aliases;
WITH ignore_full_scan_for_test AS (SELECT 1) SELECT setval('node_alias_seq', 1, false);
`)
db.aliasCache = NewNodeAliasCache(db)
return Error.Wrap(err)
}

// TestingDeleteAll implements Adapter.
func (s *SpannerAdapter) TestingDeleteAll(ctx context.Context) (err error) {
_, err = s.client.Apply(ctx, []*spanner.Mutation{
spanner.Delete("objects", spanner.AllKeys()),
spanner.Delete("segments", spanner.AllKeys()),
spanner.Delete("node_aliases", spanner.AllKeys()),
})
return Error.Wrap(err)
}

Expand Down

0 comments on commit ed6bf03

Please sign in to comment.