/
grpc_server.go
152 lines (130 loc) · 3.63 KB
/
grpc_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package rpc
import (
"context"
"fmt"
"net"
"github.com/joyparty/nodehub/cluster"
"github.com/joyparty/nodehub/logger"
"github.com/samber/lo"
"google.golang.org/grpc"
)
const (
// MDSessID grpc metadata中的session id key
MDSessID = "x-sess"
// MDTransactionID grpc metadata中的transaction id key,用于跟踪请求
MDTransactionID = "x-trans"
// MDGateway grpc metadata中的gateway key,用于标识请求来自哪个网关
MDGateway = "x-gw"
)
// GRPCServer grpc服务
type GRPCServer struct {
listenAddr string
server *grpc.Server
services map[int32]cluster.GRPCServiceDesc
}
// NewGRPCServer 构造函数
func NewGRPCServer(listenAddr string, opts ...grpc.ServerOption) *GRPCServer {
return &GRPCServer{
listenAddr: listenAddr,
server: grpc.NewServer(opts...),
services: make(map[int32]cluster.GRPCServiceDesc),
}
}
// RegisterService 注册服务
func (gs *GRPCServer) RegisterService(code int32, desc grpc.ServiceDesc, impl any, options ...Option) error {
sd := cluster.GRPCServiceDesc{
Code: code,
Path: fmt.Sprintf("/%s", desc.ServiceName),
Balancer: cluster.BalancerRandom,
}
for _, opt := range options {
sd = opt(sd)
}
if err := sd.Validate(); err != nil {
return err
} else if _, ok := gs.services[sd.Code]; ok {
return fmt.Errorf("service code %d already registered", sd.Code)
}
gs.services[sd.Code] = sd
gs.server.RegisterService(&desc, impl)
return nil
}
// Name 服务名称
func (gs *GRPCServer) Name() string {
return "grpc"
}
// Start 启动服务
func (gs *GRPCServer) Start(ctx context.Context) error {
l, err := net.Listen("tcp", gs.listenAddr)
if err != nil {
return fmt.Errorf("listen tcp, %w", err)
}
go func() {
if err := gs.server.Serve(l); err != nil {
logger.Error("start grpc", "error", err)
panic(fmt.Errorf("start grpc, %w", err))
}
}()
return nil
}
// Stop 停止服务
func (gs *GRPCServer) Stop(ctx context.Context) error {
gs.server.GracefulStop()
return nil
}
// CompleteNodeEntry 设置条目中的grpc信息
func (gs *GRPCServer) CompleteNodeEntry(entry *cluster.NodeEntry) {
entry.GRPC = cluster.GRPCEntry{
Endpoint: gs.listenAddr,
Services: lo.Values(gs.services),
}
}
// Option 配置
type Option func(desc cluster.GRPCServiceDesc) cluster.GRPCServiceDesc
// WithPublic 设置服务为公开
func WithPublic() Option {
return func(desc cluster.GRPCServiceDesc) cluster.GRPCServiceDesc {
desc.Public = true
return desc
}
}
// WithStateful 设置服务为有状态
func WithStateful() Option {
return func(desc cluster.GRPCServiceDesc) cluster.GRPCServiceDesc {
desc.Stateful = true
desc.Allocation = cluster.ServerAllocate
return desc
}
}
// WithAllocation 设置节点分配方式
func WithAllocation(allocation string) Option {
return func(desc cluster.GRPCServiceDesc) cluster.GRPCServiceDesc {
desc.Allocation = allocation
return desc
}
}
// WithPipeline 设置管道名称
//
// 设置了管道名称的请求会按照时序性顺序处理,没有设置管道的请求会并发处理
//
// 多个服务可以声明同一个管道名称,这样请求会被分配到同一个管道中
func WithPipeline(pipeline string) Option {
return func(desc cluster.GRPCServiceDesc) cluster.GRPCServiceDesc {
desc.Pipeline = pipeline
return desc
}
}
// WithBalancer 设置负载均衡策略
func WithBalancer(balancer string) Option {
return func(desc cluster.GRPCServiceDesc) cluster.GRPCServiceDesc {
desc.Balancer = balancer
return desc
}
}
// WithWeight 设置节点权重
func WithWeight(weight int) Option {
return func(desc cluster.GRPCServiceDesc) cluster.GRPCServiceDesc {
desc.Weight = weight
return desc
}
}