/
grpc.go
154 lines (123 loc) · 3.43 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package grpc_transport
import (
"context"
"net"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"github.com/tkcrm/mx/logger"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
)
const (
defaultGRPCName = "grpc-server"
defaultGRPCAddress = ":9000"
defaultGRPCNetwork = "tcp"
)
type gRPCServer struct {
Config
name string
server *grpc.Server
logger logger.Logger
services []GRPCService
}
// NewServer creates a new gRPC server that implements service.IService interface.
func NewServer(opts ...Option) *gRPCServer {
srv := &gRPCServer{
name: defaultGRPCName,
logger: logger.Default(),
Config: Config{
Enabled: true,
Addr: defaultGRPCAddress,
Network: defaultGRPCNetwork,
},
}
for _, o := range opts {
o(srv)
}
srv.logger = logger.With(srv.logger, "service", srv.name)
if srv.server == nil {
// define unary interceptors
unaryInterceptors := []grpc.UnaryServerInterceptor{
otelgrpc.UnaryServerInterceptor(),
}
// define stream interceptors
streamInterceptors := []grpc.StreamServerInterceptor{
otelgrpc.StreamServerInterceptor(),
}
// add logger
if srv.LoggerEnabled {
unaryInterceptors = append(unaryInterceptors,
logging.UnaryServerInterceptor(InterceptorLogger(srv.logger)),
)
streamInterceptors = append(streamInterceptors,
logging.StreamServerInterceptor(InterceptorLogger(srv.logger)),
)
}
// add recovery
if srv.RecoveryEnabled {
opts := []recovery.Option{
recovery.WithRecoveryHandler(RecoveryFunc(srv.logger)),
}
unaryInterceptors = append(unaryInterceptors, recovery.UnaryServerInterceptor(opts...))
streamInterceptors = append(streamInterceptors, recovery.StreamServerInterceptor(opts...))
}
// define grpc server options
srvOpts := []grpc.ServerOption{
grpc.ChainUnaryInterceptor(unaryInterceptors...),
grpc.ChainStreamInterceptor(streamInterceptors...),
}
// init default grpc server
srv.server = grpc.NewServer(srvOpts...)
}
if srv.ReflectEnabled {
srv.services = append(srv.services, new(reflectionService))
}
if srv.HealthCheckEnabled {
grpc_health_v1.RegisterHealthServer(srv.server, health.NewServer())
}
for i := range srv.services {
if srv.services[i] == nil {
srv.logger.Errorf("empty grpc service #%d", i)
continue
}
srv.logger.Infof("register grpc service: %s", srv.services[i].Name())
srv.services[i].Register(srv.server)
}
return srv
}
// Name returns name of gRPC server.
func (s *gRPCServer) Name() string { return s.name }
// Enabled returns is service enabled.
func (s *gRPCServer) Enabled() bool { return s.Config.Enabled }
// Start allows starting gRPC server.
func (s *gRPCServer) Start(ctx context.Context) error {
s.logger.Infof("prepare listener %s on %s / %s",
s.name, s.Addr, s.Network,
)
lis, err := new(net.ListenConfig).Listen(ctx, s.Network, s.Addr)
if err != nil {
return err
}
errChan := make(chan error, 1)
go func() {
if err := s.server.Serve(lis); err != nil {
errChan <- err
}
}()
select {
case err := <-errChan:
return err
case <-ctx.Done():
}
return nil
}
// Stop allows to stop grpc server.
func (s *gRPCServer) Stop(context.Context) error {
if s.server == nil {
return nil
}
s.server.GracefulStop()
return nil
}