-
Notifications
You must be signed in to change notification settings - Fork 387
/
alias.go
197 lines (165 loc) · 5.24 KB
/
alias.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
// Copyright (C) 2021 Storj Labs, Inc.
// See LICENSE for copying information.
package metabase
import (
"context"
"errors"
"math/rand"
"sort"
"strings"
"time"
"cloud.google.com/go/spanner"
"github.com/zeebo/errs"
"google.golang.org/api/iterator"
"google.golang.org/grpc/codes"
"storj.io/common/dbutil/pgutil"
"storj.io/common/storj"
"storj.io/common/uuid"
)
// NodeAlias is a metabase local alias for NodeID-s to reduce segment table size.
type NodeAlias int32
// NodeAliasEntry is a mapping between NodeID and NodeAlias.
type NodeAliasEntry struct {
ID storj.NodeID
Alias NodeAlias
}
// EnsureNodeAliases contains arguments necessary for creating NodeAlias-es.
type EnsureNodeAliases struct {
Nodes []storj.NodeID
}
// EnsureNodeAliases ensures that the supplied node ID-s have a alias.
// It's safe to concurrently try and create node ID-s for the same NodeID.
func (db *DB) EnsureNodeAliases(ctx context.Context, opts EnsureNodeAliases) (err error) {
defer mon.Task()(&ctx)(&err)
// TODO(spanner) long term this needs to be a coordinated insert across all adapters,
// i.e. one of them needs to be the source of truth, otherwise there will be issues
// with different db having different NodeAlias for the same node id.
return db.ChooseAdapter(uuid.UUID{}).EnsureNodeAliases(ctx, opts)
}
// EnsureNodeAliases implements Adapter.
func (p *PostgresAdapter) EnsureNodeAliases(ctx context.Context, opts EnsureNodeAliases) (err error) {
defer mon.Task()(&ctx)(&err)
unique, err := ensureNodesUniqueness(opts.Nodes)
if err != nil {
return err
}
_, err = p.db.ExecContext(ctx, `
INSERT INTO node_aliases(node_id)
SELECT unnest($1::BYTEA[])
ON CONFLICT DO NOTHING
`, pgutil.NodeIDArray(unique))
return Error.Wrap(err)
}
// EnsureNodeAliases implements Adapter.
func (s *SpannerAdapter) EnsureNodeAliases(ctx context.Context, opts EnsureNodeAliases) (err error) {
defer mon.Task()(&ctx)(&err)
unique, err := ensureNodesUniqueness(opts.Nodes)
if err != nil {
return err
}
// TODO(spanner) this is not prod ready implementation
// TODO(spanner) limited alias value to avoid out of memory
maxAliasValue := int64(10000)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
// TODO(spanner) figure out how to do something like ON CONFLICT DO NOTHING
index := 0
for index < len(unique) {
entry := unique[index]
alias := rng.Int63n(maxAliasValue) + 1
_, err = s.client.Apply(ctx, []*spanner.Mutation{
spanner.Insert("node_aliases", []string{"node_id", "node_alias"}, []interface{}{
entry.Bytes(), alias,
}),
})
if err != nil {
if spanner.ErrCode(err) == codes.AlreadyExists {
// TODO(spanner) figure out how to detect UNIQUE violation
if strings.Contains(spanner.ErrDesc(err), "UNIQUE violation on index node_aliases_node_alias_key") {
// go back and find unique alias
continue
}
} else {
return Error.Wrap(err)
}
}
index++
}
return nil
}
func ensureNodesUniqueness(nodes []storj.NodeID) ([]storj.NodeID, error) {
unique := make([]storj.NodeID, 0, len(nodes))
seen := make(map[storj.NodeID]bool, len(nodes))
for _, node := range nodes {
if node.IsZero() {
return nil, Error.New("tried to add alias to zero node")
}
if !seen[node] {
seen[node] = true
unique = append(unique, node)
}
}
sort.Sort(storj.NodeIDList(unique))
return unique, nil
}
// ListNodeAliases lists all node alias mappings.
func (db *DB) ListNodeAliases(ctx context.Context) (_ []NodeAliasEntry, err error) {
defer mon.Task()(&ctx)(&err)
return db.ChooseAdapter(uuid.UUID{}).ListNodeAliases(ctx)
}
// ListNodeAliases implements Adapter.
func (p *PostgresAdapter) ListNodeAliases(ctx context.Context) (_ []NodeAliasEntry, err error) {
defer mon.Task()(&ctx)(&err)
var aliases []NodeAliasEntry
rows, err := p.db.Query(ctx, `
SELECT node_id, node_alias
FROM node_aliases
`)
if err != nil {
return nil, Error.New("ListNodeAliases query: %w", err)
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var entry NodeAliasEntry
err := rows.Scan(&entry.ID, &entry.Alias)
if err != nil {
return nil, Error.New("ListNodeAliases scan failed: %w", err)
}
aliases = append(aliases, entry)
}
if err := rows.Err(); err != nil {
return nil, Error.New("ListNodeAliases scan failed: %w", err)
}
return aliases, nil
}
// ListNodeAliases implements Adapter.
func (s *SpannerAdapter) ListNodeAliases(ctx context.Context) (aliases []NodeAliasEntry, err error) {
defer mon.Task()(&ctx)(&err)
stmt := spanner.Statement{SQL: `
SELECT node_id, node_alias FROM node_aliases
`}
iter := s.client.Single().Query(ctx, stmt)
defer iter.Stop()
for {
row, err := iter.Next()
if errors.Is(err, iterator.Done) {
return aliases, nil
}
if err != nil {
return nil, Error.Wrap(err)
}
var nodeID storj.NodeID
var nodeAlias int64
if err := row.Columns(&nodeID, &nodeAlias); err != nil {
return nil, Error.New("ListNodeAliases scan failed: %w", err)
}
aliases = append(aliases, NodeAliasEntry{
ID: nodeID,
Alias: NodeAlias(nodeAlias),
})
}
}
// LatestNodesAliasMap returns the latest mapping between storj.NodeID and NodeAlias.
func (db *DB) LatestNodesAliasMap(ctx context.Context) (_ *NodeAliasMap, err error) {
defer mon.Task()(&ctx)(&err)
return db.aliasCache.Latest(ctx)
}