/
server.go
118 lines (99 loc) · 2.77 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package grpc
import (
"net"
"sync"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
"github.com/mesg-foundation/core/api"
"github.com/mesg-foundation/core/interface/grpc/core"
"github.com/mesg-foundation/core/interface/grpc/service"
"github.com/mesg-foundation/core/protobuf/coreapi"
"github.com/mesg-foundation/core/protobuf/serviceapi"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"
)
// Server contains the server config.
type Server struct {
api *api.API
instance *grpc.Server
closed bool
mi sync.Mutex // protects startup.
network string
address string
}
// New returns a new gRPC server.
func New(address string, api *api.API) *Server {
return &Server{
api: api,
address: address,
network: "tcp",
}
}
// listen listens for connections.
func (s *Server) listen() (net.Listener, error) {
s.mi.Lock()
defer s.mi.Unlock()
if s.closed {
return nil, &alreadyClosedError{}
}
ln, err := net.Listen(s.network, s.address)
if err != nil {
return nil, err
}
// Keep alive prevents Docker network to drop TCP idle connections after 15 minutes.
// See: https://forum.mesg.com/t/solution-summary-for-docker-dropping-connections-after-15-min/246
keepaliveOpt := grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 1 * time.Minute,
})
s.instance = grpc.NewServer(
keepaliveOpt,
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_logrus.StreamServerInterceptor(logrus.NewEntry(logrus.StandardLogger())),
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_logrus.UnaryServerInterceptor(logrus.NewEntry(logrus.StandardLogger())),
)),
)
if err := s.register(); err != nil {
return nil, err
}
logrus.Info("server listens on ", ln.Addr())
return ln, nil
}
// Serve starts the server and listens for client connections.
func (s *Server) Serve() error {
ln, err := s.listen()
if err != nil {
return err
}
// TODO: check if server still on after a connection throw an error. otherwise, add a for around serve
return s.instance.Serve(ln)
}
// Close gracefully closes the server.
func (s *Server) Close() {
s.mi.Lock()
defer s.mi.Unlock()
if s.closed {
return
}
if s.instance != nil {
s.instance.GracefulStop()
}
s.closed = true
}
// register all server
func (s *Server) register() error {
coreServer := core.NewServer(s.api)
serviceServer := service.NewServer(s.api)
serviceapi.RegisterServiceServer(s.instance, serviceServer)
coreapi.RegisterCoreServer(s.instance, coreServer)
reflection.Register(s.instance)
return nil
}
type alreadyClosedError struct{}
func (e *alreadyClosedError) Error() string {
return "already closed"
}