-
Notifications
You must be signed in to change notification settings - Fork 36
/
grpc_server.go
148 lines (127 loc) · 3.57 KB
/
grpc_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package rpcx
import (
"log"
"net"
"time"
"github.com/sunmi-OS/gocore/rpcx/serverinterceptors"
"google.golang.org/grpc"
)
type (
// Deprecated
RpcServer struct {
*baseRpcServer
register RegisterFn
}
GrpcServer struct {
Name string
addr string
isPre bool
cfg *GrpcServerConfig
register RegisterFn
server *grpc.Server
options []grpc.ServerOption
streamInterceptors []grpc.StreamServerInterceptor
unaryInterceptors []grpc.UnaryServerInterceptor
}
)
// Deprecated
// 推荐使用 NewGrpcServer
// @desc rpc服务端初始化入口函数
// @auth liuguoqiang 2020-06-11
// @param timeout 为0时,不做超时处理
// @return
func NewRpcServer(name, addr string, timeout int64, register RegisterFn) *RpcServer {
server := &RpcServer{
baseRpcServer: newBaseRpcServer(addr),
register: register,
}
setupInterceptors(server, int(timeout))
return server
}
// @desc 启动rpc
// @auth liuguoqiang 2020-06-11
// @param
// @return
func (s *RpcServer) Start() {
lis, err := net.Listen("tcp", s.address)
if err != nil {
log.Fatal(err)
}
unaryInterceptors := []grpc.UnaryServerInterceptor{
//serverinterceptors.UnaryCrashInterceptor(),
serverinterceptors.UnaryStatInterceptor(),
}
unaryInterceptors = append(unaryInterceptors, s.unaryInterceptors...)
streamInterceptors := []grpc.StreamServerInterceptor{
serverinterceptors.StreamCrashInterceptor,
}
streamInterceptors = append(streamInterceptors, s.streamInterceptors...)
options := append(s.options, WithUnaryServerInterceptors(unaryInterceptors...), WithStreamServerInterceptors(streamInterceptors...))
server := grpc.NewServer(options...)
s.register(server)
err = server.Serve(lis)
server.GracefulStop()
log.Fatal(err)
}
// @desc 设置超时
// @auth liuguoqiang 2020-06-11
// @param
// @return
func setupInterceptors(server *RpcServer, timeout int) {
if timeout > 0 {
server.AddUnaryInterceptors(serverinterceptors.UnaryTimeoutInterceptor(time.Duration(timeout) * time.Millisecond))
}
}
// NewGrpcServer new grpc server
func NewGrpcServer(name, addr string, cfg *GrpcServerConfig) *GrpcServer {
server := &GrpcServer{
Name: name,
addr: addr,
cfg: cfg,
}
if server.cfg == nil {
server.cfg = defaultServerConfig()
}
return server
}
// RegisterService .Start() 之前,必须先处理此方法
func (s *GrpcServer) RegisterService(register RegisterFn) *GrpcServer {
s.register = register
if s.cfg.Timeout > 0 {
s.AddUnaryInterceptors(serverinterceptors.UnaryTimeoutInterceptor(time.Duration(s.cfg.Timeout) * time.Millisecond))
}
unaryInterceptors := []grpc.UnaryServerInterceptor{
serverinterceptors.UnaryStatInterceptor(),
}
unaryInterceptors = append(unaryInterceptors, s.unaryInterceptors...)
streamInterceptors := []grpc.StreamServerInterceptor{
serverinterceptors.StreamCrashInterceptor,
}
streamInterceptors = append(streamInterceptors, s.streamInterceptors...)
options := append(s.options, WithUnaryServerInterceptors(unaryInterceptors...), WithStreamServerInterceptors(streamInterceptors...))
s.server = grpc.NewServer(options...)
s.isPre = true
return s
}
// Start start grpc server.
func (s *GrpcServer) Start() {
if !s.isPre {
log.Fatal("before start, you must call server.RegisterService().")
}
lis, err := net.Listen("tcp", s.addr)
if err != nil {
log.Fatal(err)
}
if s.server != nil {
s.register(s.server)
}
if err = s.server.Serve(lis); err != nil {
s.server.GracefulStop()
log.Fatal(err)
}
}
func (s *GrpcServer) Close() {
if s.server != nil {
s.server.Stop()
}
}