-
Notifications
You must be signed in to change notification settings - Fork 0
/
grpc_server.go
104 lines (90 loc) · 2.28 KB
/
grpc_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package server
import (
"context"
"errors"
"net"
"sync/atomic"
"github.com/josexy/mini-ss/connection"
"github.com/josexy/mini-ss/connection/proto"
"github.com/josexy/mini-ss/transport"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
_ "google.golang.org/grpc/encoding/gzip"
)
var _ Server = (*GrpcServer)(nil)
type GrpcServer struct {
proto.UnimplementedStreamServiceServer
ln *tcpKeepAliveListener
server *grpc.Server
Addr string
Handler GrpcHandler
opts *transport.GrpcOptions
running atomic.Bool
}
func NewGrpcServer(addr string, handler GrpcHandler, opts transport.Options) *GrpcServer {
return &GrpcServer{
Addr: addr,
Handler: handler,
opts: opts.(*transport.GrpcOptions),
}
}
func (s *GrpcServer) Start(ctx context.Context) error {
if s.running.Load() {
return ErrServerStarted
}
laddr, err := net.ResolveTCPAddr("tcp", s.Addr)
if err != nil {
return err
}
ln, err := net.ListenTCP("tcp", laddr)
if err != nil {
return err
}
s.ln = &tcpKeepAliveListener{ln}
var opts []grpc.ServerOption
cred := insecure.NewCredentials()
tlsConfig, err := s.opts.TlsOptions.GetServerTlsConfig()
if err != nil {
return err
}
if tlsConfig != nil {
cred = credentials.NewTLS(tlsConfig)
}
opts = append(opts, grpc.Creds(cred))
if s.opts.SndBuffer > 0 {
opts = append(opts, grpc.WriteBufferSize(s.opts.SndBuffer))
}
if s.opts.RevBuffer > 0 {
opts = append(opts, grpc.ReadBufferSize(s.opts.RevBuffer))
}
s.server = grpc.NewServer(opts...)
proto.RegisterStreamServiceServer(s.server, s)
s.running.Store(true)
go closeWithContextDoneErr(ctx, s)
err = s.server.Serve(s.ln)
if err != nil && errors.Is(err, grpc.ErrServerStopped) {
err = nil
}
s.running.Store(false)
return err
}
func (s *GrpcServer) Transfer(ss proto.StreamService_TransferServer) error {
newConn(connection.NewGrpcServerStreamConn(ss), s).serve()
return nil
}
func (s *GrpcServer) LocalAddr() string { return s.Addr }
func (s *GrpcServer) Type() ServerType { return Grpc }
func (s *GrpcServer) Close() error {
if !s.running.Load() {
return ErrServerClosed
}
s.running.Store(false)
s.server.GracefulStop()
return nil
}
func (s *GrpcServer) Serve(conn *Conn) {
if s.Handler != nil {
s.Handler.ServeGRPC(conn)
}
}