This repository has been archived by the owner on Feb 26, 2024. It is now read-only.
/
server.go
99 lines (89 loc) · 2.46 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package raftkvpb
import (
"context"
"fmt"
"net"
"net/http"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/hashicorp/go-hclog"
"github.com/kei6u/raftkv/kv"
"google.golang.org/grpc"
)
type Server struct {
logger hclog.Logger
gRPCAddr string
gRPCServer *grpc.Server
gRPCListener net.Listener
gRPCClientConn *grpc.ClientConn
gRPCGWServer *http.Server
}
func NewServer(ctx context.Context, gRPCAddr, gRPCGWAddr string, l hclog.Logger, kvserver *kv.Server) (*Server, error) {
grpcServer := newgRPCServer(kvserver, l)
httpServer, conn, err := newgRPCGWServer(ctx, gRPCAddr, gRPCGWAddr)
if err != nil {
return nil, err
}
return &Server{
logger: l,
gRPCAddr: gRPCAddr,
gRPCServer: grpcServer,
gRPCClientConn: conn,
gRPCGWServer: httpServer,
}, nil
}
func newgRPCServer(server *kv.Server, l hclog.Logger) *grpc.Server {
grpcserver := grpc.NewServer(
grpc.ChainUnaryInterceptor(
hclogUnaryInterceptor(l),
),
)
RegisterRaftkvServiceServer(grpcserver, newRaftkvService(server))
return grpcserver
}
func newgRPCGWServer(ctx context.Context, gRPCAddr, gRPCGWAddr string) (*http.Server, *grpc.ClientConn, error) {
conn, err := grpc.DialContext(
ctx,
gRPCAddr,
grpc.WithInsecure(),
grpc.WithDisableHealthCheck(),
)
if err != nil {
return nil, nil, fmt.Errorf("failed to dial a gRPC server: %w", err)
}
mux := runtime.NewServeMux()
if err := RegisterRaftkvServiceHandler(ctx, mux, conn); err != nil {
return nil, nil, fmt.Errorf("failed to register a service handler: %w", err)
}
return &http.Server{
Addr: gRPCGWAddr,
Handler: mux,
}, conn, nil
}
func (s *Server) Start() error {
s.logger.Info("Starting gRPC and HTTP servers")
lis, err := net.Listen("tcp", s.gRPCAddr)
if err != nil {
return fmt.Errorf("failed to listen to gRPC addr: %w", err)
}
s.gRPCListener = lis
go s.gRPCServer.Serve(lis)
go s.gRPCGWServer.ListenAndServe()
return nil
}
func (s *Server) Stop() {
ctx := context.Background()
if s.gRPCGWServer != nil {
s.logger.Info("gRPC-Gateway server is shutting down")
if err := s.gRPCGWServer.Shutdown(ctx); err != nil {
s.logger.Error("failed to shutdown gRPC gateway server", "error", err)
}
}
if err := s.gRPCListener.Close(); err != nil {
s.logger.Error("failed to close listener to gRPC server", "error", err)
}
if s.gRPCServer != nil {
s.logger.Info("gRPC server is gracefully stopping")
s.gRPCServer.GracefulStop()
}
s.logger.Info("Bye~~")
}