-
Notifications
You must be signed in to change notification settings - Fork 7
/
porter.go
129 lines (116 loc) · 3.26 KB
/
porter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package client
import (
"context"
"fmt"
"github.com/tuihub/librarian/internal/conf"
"github.com/tuihub/librarian/internal/lib/libapp"
"github.com/tuihub/librarian/internal/lib/libtime"
porter "github.com/tuihub/protos/pkg/librarian/porter/v1"
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/selector"
"github.com/go-kratos/kratos/v2/transport/grpc"
)
type Porter struct {
porter.LibrarianPorterServiceClient
checker *libapp.HealthChecker
}
func NewPorter(
client porter.LibrarianPorterServiceClient,
consul *conf.Consul,
) (*Porter, error) {
checker, err := libapp.NewHealthChecker("porter", consul)
if err != nil {
return nil, err
}
return &Porter{
LibrarianPorterServiceClient: client,
checker: checker,
}, nil
}
func (p *Porter) GetServiceAddresses(ctx context.Context) ([]string, error) {
instances, err := p.checker.GetAliveInstances()
if err != nil {
return nil, err
}
res := make([]string, 0, len(instances))
for _, instance := range instances {
res = append(res, fmt.Sprintf("%s:%d", instance.Service.Address, instance.Service.Port))
}
return res, nil
}
func NewPorterClient(c *conf.Consul) (porter.LibrarianPorterServiceClient, error) {
r, err := libapp.NewDiscovery(c)
if err != nil {
return nil, err
}
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("discovery:///porter"),
grpc.WithDiscovery(r),
grpc.WithNodeFilter(
newPorterNameFilter(),
newPorterAddressFilter(),
newPorterFastFailFilter(),
),
grpc.WithMiddleware(
recovery.Recovery(),
),
grpc.WithTimeout(libtime.Minute),
)
cli := porter.NewLibrarianPorterServiceClient(conn)
return cli, err
}
type requestPorterName struct{}
type requestPorterAddress struct{}
type requestPorterFastFail struct{}
const porterNameKey = "PorterName"
func WithPorterName(ctx context.Context, name string) context.Context {
return context.WithValue(ctx, requestPorterName{}, name)
}
func WithPorterAddress(ctx context.Context, address string) context.Context {
return context.WithValue(ctx, requestPorterAddress{}, address)
}
func WithPorterFastFail(ctx context.Context) context.Context {
return context.WithValue(ctx, requestPorterFastFail{}, true)
}
func newPorterNameFilter() selector.NodeFilter {
return func(ctx context.Context, nodes []selector.Node) []selector.Node {
r, ok := ctx.Value(requestPorterName{}).(string)
if !ok {
return nodes
}
newNodes := make([]selector.Node, 0)
for _, n := range nodes {
n.InitialWeight()
if v, exist := n.Metadata()[porterNameKey]; exist && v == r {
newNodes = append(newNodes, n)
}
}
return newNodes
}
}
func newPorterAddressFilter() selector.NodeFilter {
return func(ctx context.Context, nodes []selector.Node) []selector.Node {
r, ok := ctx.Value(requestPorterAddress{}).(string)
if !ok {
return nodes
}
newNodes := make([]selector.Node, 0)
for _, n := range nodes {
n.InitialWeight()
if n.Address() == r {
newNodes = append(newNodes, n)
}
}
return newNodes
}
}
func newPorterFastFailFilter() selector.NodeFilter {
return func(ctx context.Context, nodes []selector.Node) []selector.Node {
r, ok := ctx.Value(requestPorterFastFail{}).(bool)
if !ok || !r {
return nodes
}
return make([]selector.Node, 0)
}
}