-
Notifications
You must be signed in to change notification settings - Fork 566
/
router.go
94 lines (85 loc) · 2.02 KB
/
router.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package shard
import (
"fmt"
"github.com/pachyderm/pachyderm/src/client/pkg/grpcutil"
"google.golang.org/grpc"
)
type router struct {
sharder Sharder
dialer grpcutil.Dialer
localAddress string
}
func newRouter(
sharder Sharder,
dialer grpcutil.Dialer,
localAddress string,
) *router {
return &router{
sharder,
dialer,
localAddress,
}
}
func (r *router) GetShards(version int64) (map[uint64]bool, error) {
shardToAddress, err := r.sharder.GetShardToAddress(version)
if err != nil {
return nil, err
}
result := make(map[uint64]bool)
for shard, address := range shardToAddress {
if address == r.localAddress {
result[shard] = true
}
}
return result, nil
}
func (r *router) GetAllShards(version int64) (map[uint64]bool, error) {
shardToAddress, err := r.sharder.GetShardToAddress(version)
if err != nil {
return nil, err
}
result := make(map[uint64]bool)
for shard, address := range shardToAddress {
if address == r.localAddress {
result[shard] = true
}
}
return result, nil
}
func (r *router) GetClientConn(shard uint64, version int64) (*grpc.ClientConn, error) {
address, ok, err := r.sharder.GetAddress(shard, version)
if err != nil {
return nil, err
}
if !ok {
return nil, fmt.Errorf("no master found for %d", shard)
}
return r.dialer.Dial(address)
}
func (r *router) GetAllClientConns(version int64) ([]*grpc.ClientConn, error) {
addresses, err := r.getAllAddresses(version)
if err != nil {
return nil, err
}
var clientConns []*grpc.ClientConn
for address := range addresses {
// TODO: huge race, this whole thing is bad
clientConn, err := r.dialer.Dial(address)
if err != nil {
return nil, err
}
clientConns = append(clientConns, clientConn)
}
return clientConns, nil
}
func (r *router) getAllAddresses(version int64) (map[string]bool, error) {
result := make(map[string]bool)
shardToAddress, err := r.sharder.GetShardToAddress(version)
if err != nil {
return nil, err
}
for _, address := range shardToAddress {
result[address] = true
}
return result, nil
}