forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
discover.go
99 lines (85 loc) · 2.7 KB
/
discover.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package srvtopo
import (
"sync"
"golang.org/x/net/context"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)
// FindAllTargets goes through all serving shards in the topology
// for the provided tablet types. It returns one Target object per
// keyspace / shard / matching TabletType.
func FindAllTargets(ctx context.Context, ts Server, cell string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, error) {
ksNames, err := ts.GetSrvKeyspaceNames(ctx, cell)
if err != nil {
return nil, err
}
var targets []*querypb.Target
var wg sync.WaitGroup
var mu sync.Mutex
var errRecorder concurrency.AllErrorRecorder
for _, ksName := range ksNames {
wg.Add(1)
go func(keyspace string) {
defer wg.Done()
// Get SrvKeyspace for cell/keyspace.
ks, err := ts.GetSrvKeyspace(ctx, cell, keyspace)
if err != nil {
if topo.IsErrType(err, topo.NoNode) {
// Possibly a race condition, or leftover
// crud in the topology service. Just log it.
log.Warningf("GetSrvKeyspace(%v, %v) returned ErrNoNode, skipping that SrvKeyspace", cell, keyspace)
} else {
// More serious error, abort.
errRecorder.RecordError(err)
}
return
}
// Get all shard names that are used for serving.
for _, ksPartition := range ks.Partitions {
// Check we're waiting for tablets of that type.
waitForIt := false
for _, tt := range tabletTypes {
if tt == ksPartition.ServedType {
waitForIt = true
}
}
if !waitForIt {
continue
}
// Add all the shards. Note we can't have
// duplicates, as there is only one entry per
// TabletType in the Partitions list.
mu.Lock()
for _, shard := range ksPartition.ShardReferences {
targets = append(targets, &querypb.Target{
Cell: cell,
Keyspace: keyspace,
Shard: shard.Name,
TabletType: ksPartition.ServedType,
})
}
mu.Unlock()
}
}(ksName)
}
wg.Wait()
if errRecorder.HasErrors() {
return nil, errRecorder.Error()
}
return targets, nil
}