-
Notifications
You must be signed in to change notification settings - Fork 203
/
server.go
172 lines (137 loc) · 4.02 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package rpc
import (
"errors"
"net"
"github.com/sirupsen/logrus"
"golang.org/x/net/netutil"
"github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/nebulasio/go-nebulas/core"
"github.com/nebulasio/go-nebulas/neblet/pb"
nebnet "github.com/nebulasio/go-nebulas/net"
"github.com/nebulasio/go-nebulas/rpc/pb"
"github.com/nebulasio/go-nebulas/util/logging"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
// Errors
var (
ErrEmptyRPCListenList = errors.New("empty rpc listen list")
)
// Const
const (
DefaultConnectionLimits = 128
MaxRecvMsgSize = 64 * 1024 * 1024
)
// Neblet interface breaks cycle import dependency and hides unused services.
type Neblet interface {
Config() *nebletpb.Config
StartPprof(string) error
BlockChain() *core.BlockChain
AccountManager() core.AccountManager
NetService() nebnet.Service
EventEmitter() *core.EventEmitter
Consensus() core.Consensus
}
// GRPCServer server interface for api & management etc.
type GRPCServer interface {
// Start start server
Start() error
// Stop stop server
Stop()
// Neblet return neblet
Neblet() core.Neblet
RunGateway() error
}
// Server is the RPC server type.
type Server struct {
neblet core.Neblet
rpcServer *grpc.Server
rpcConfig *nebletpb.RPCConfig
}
// NewServer creates a new RPC server and registers the rpc endpoints.
func NewServer(neblet core.Neblet) *Server {
cfg := neblet.Config().Rpc
if cfg == nil {
logging.CLog().Fatal("Failed to find rpc config in config file.")
}
rpc := grpc.NewServer(grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(loggingStream)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(loggingUnary)),
grpc.MaxRecvMsgSize(MaxRecvMsgSize))
srv := &Server{neblet: neblet, rpcServer: rpc, rpcConfig: cfg}
api := &APIService{server: srv}
admin := &AdminService{server: srv}
rpcpb.RegisterApiServiceServer(rpc, api)
rpcpb.RegisterAdminServiceServer(rpc, admin)
// Register reflection service on gRPC server.
// TODO: Enable reflection only for testing mode.
reflection.Register(rpc)
return srv
}
// Start starts the rpc server and serves incoming requests.
func (s *Server) Start() error {
logging.CLog().Info("Starting RPC GRPCServer...")
if len(s.rpcConfig.RpcListen) == 0 {
return ErrEmptyRPCListenList
}
for _, v := range s.rpcConfig.RpcListen {
if err := s.start(v); err != nil {
return err
}
}
return nil
}
func (s *Server) start(addr string) error {
listener, err := net.Listen("tcp", addr)
if err != nil {
logging.CLog().WithFields(logrus.Fields{
"err": err,
}).Error("Failed to listen to RPC GRPCServer")
return err
}
logging.CLog().WithFields(logrus.Fields{
"address": addr,
}).Info("Started RPC GRPCServer.")
// Limit the total number of grpc connections.
connectionLimits := s.rpcConfig.ConnectionLimits
if connectionLimits == 0 {
connectionLimits = DefaultConnectionLimits
}
listener = netutil.LimitListener(listener, int(connectionLimits))
go func() {
if err := s.rpcServer.Serve(listener); err != nil {
logging.CLog().WithFields(logrus.Fields{
"err": err,
}).Info("RPC server exited.")
}
}()
return nil
}
// RunGateway run grpc mapping to http after apiserver have started.
func (s *Server) RunGateway() error {
//time.Sleep(3 * time.Second)
logging.CLog().WithFields(logrus.Fields{
"rpc-server": s.rpcConfig.RpcListen[0],
"http-server": s.rpcConfig.HttpListen,
"http-cors": s.rpcConfig.HttpCors,
}).Info("Starting RPC Gateway GRPCServer...")
go func() {
if err := Run(s.rpcConfig); err != nil {
logging.CLog().WithFields(logrus.Fields{
"error": err,
}).Fatal("Failed to start RPC Gateway.")
}
}()
return nil
}
// Stop stops the rpc server and closes listener.
func (s *Server) Stop() {
logging.CLog().WithFields(logrus.Fields{
"listen": s.rpcConfig.RpcListen,
}).Info("Stopping RPC GRPCServer and Gateway...")
s.rpcServer.Stop()
logging.CLog().Info("Stopped RPC GRPCServer and Gateway.")
}
// Neblet returns weak reference to Neblet.
func (s *Server) Neblet() core.Neblet {
return s.neblet
}