-
Notifications
You must be signed in to change notification settings - Fork 1
/
grpc.go
139 lines (126 loc) · 3.45 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// Common helper constructs for running a gRPC server.
package grpc
import (
"net"
"os"
"os/signal"
"sync"
"syscall"
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
// An Option function can override configuration options
// for a server
type Option func(*Server)
// WithAddress overrides the default configured listen
// address for a server
func WithAddress(addr string) Option {
return func(s *Server) {
s.addr = addr
}
}
// WithLogger overrides the logger instance
func WithLogger(log zerolog.Logger) Option {
return func(s *Server) {
s.log = log
}
}
// WithServer overrides the grpc Server instance
func WithServer(srv *grpc.Server) Option {
return func(s *Server) {
s.srv = srv
}
}
// RegisterServiceFunc registers a service with the gRPC server
// returning the service name
//
// Example:
// var contentManager = func(srv *grpc.Server) string {
// pb.RegisterContentManagerServer(srv, &content.Manager{})
// return "kit.content.v1.ContentManager"
// }
type RegisterServiceFunc func(*grpc.Server) string
// A Server can create and stop a gRPC server
//
// Example:
// registerSvc := func(s *grpc.Server) string {
// healthpb.RegisterHealthServer(s, hs)
// return "kit.test.v1.Health"
// }
// s := grpc.New([]grpc.RegisterServiceFunc{registerSvc})
// if err := s.Start(); err != nil {
// // handle server runtime err
// }
// if err := s.Stop(); err != nil {
// // handle server shutdown err
// }
type Server struct {
addr string // address to bind too
services []RegisterServiceFunc
running sync.Mutex // protects server running state
srv *grpc.Server
log zerolog.Logger
errC chan error
sigC chan os.Signal
}
// Start starts serving the gRPC server
func (s *Server) Start() error {
s.running.Lock()
defer s.running.Unlock()
log := s.log.With().Str("func", "Server.Start").Logger()
log.Debug().Str("listen", s.addr).Msg("opening net listener")
listener, err := net.Listen("tcp", s.addr)
if err != nil {
return err
}
// Health check server
hs := health.NewServer()
// Register services
for _, register := range s.services {
serviceName := register(s.srv)
hs.SetServingStatus(serviceName, healthpb.HealthCheckResponse_SERVING)
}
// Register healthcheck server with gRPC server
healthpb.RegisterHealthServer(s.srv, hs)
// Start server
log.Debug().Str("listen", s.addr).Msg("starting gRPC server")
go func() { s.errC <- s.srv.Serve(listener) }()
// Wait for OS signal or runtime error
signal.Notify(s.sigC, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT)
select {
case err := <-s.errC:
return err
case sig := <-s.sigC:
log.Debug().Str("signal", sig.String()).Msg("received OS signal")
return nil
}
}
// Stop gracefully stops the grpc server
func (s *Server) Stop() error {
s.running.Lock()
defer s.running.Unlock()
log := s.log.With().Str("func", "Server.Stop").Logger()
if s.srv != nil {
log.Debug().Msg("gracefully stopping gRPC server")
s.srv.GracefulStop()
}
return nil
}
// New creates a new gRPC server. Provide a slice of service registers
// and use Option functions to override defaults.
func New(services []RegisterServiceFunc, opts ...Option) *Server {
s := &Server{
srv: grpc.NewServer(),
addr: ":5000",
log: zerolog.New(os.Stdout),
sigC: make(chan os.Signal),
errC: make(chan error),
services: services,
}
for _, opt := range opts {
opt(s)
}
return s
}