-
Notifications
You must be signed in to change notification settings - Fork 176
/
server.go
99 lines (81 loc) · 2.95 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package grpcserver
import (
"net"
"sync"
"go.uber.org/atomic"
"github.com/rs/zerolog"
"google.golang.org/grpc"
_ "google.golang.org/grpc/encoding/gzip" //required for gRPC compression
_ "github.com/onflow/flow-go/engine/common/grpc/compressor/deflate" // required for gRPC compression
_ "github.com/onflow/flow-go/engine/common/grpc/compressor/snappy" // required for gRPC compression
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
)
// GrpcServer wraps `grpc.Server` and allows to manage it using `component.Component` interface. It can be injected
// into different engines making it possible to use single grpc server for multiple services which live in different modules.
type GrpcServer struct {
component.Component
log zerolog.Logger
Server *grpc.Server
grpcSignalerCtx *atomic.Pointer[irrecoverable.SignalerContext]
grpcListenAddr string // the GRPC server address as ip:port
addrLock sync.RWMutex
grpcAddress net.Addr
}
var _ component.Component = (*GrpcServer)(nil)
// NewGrpcServer returns a new grpc server.
func NewGrpcServer(log zerolog.Logger,
grpcListenAddr string,
grpcServer *grpc.Server,
grpcSignalerCtx *atomic.Pointer[irrecoverable.SignalerContext],
) *GrpcServer {
server := &GrpcServer{
log: log,
Server: grpcServer,
grpcListenAddr: grpcListenAddr,
grpcSignalerCtx: grpcSignalerCtx,
}
server.Component = component.NewComponentManagerBuilder().
AddWorker(server.serveGRPCWorker).
AddWorker(server.shutdownWorker).
Build()
return server
}
// serveGRPCWorker is a worker routine which starts the gRPC server.
// The ready callback is called after the server address is bound and set.
func (g *GrpcServer) serveGRPCWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
g.log = g.log.With().Str("grpc_address", g.grpcListenAddr).Logger()
g.log.Info().Msg("starting grpc server on address")
g.grpcSignalerCtx.Store(&ctx)
l, err := net.Listen("tcp", g.grpcListenAddr)
if err != nil {
g.log.Err(err).Msg("failed to start the grpc server")
ctx.Throw(err)
return
}
// save the actual address on which we are listening (may be different from g.config.GRPCListenAddr if not port
// was specified)
g.addrLock.Lock()
g.grpcAddress = l.Addr()
g.addrLock.Unlock()
g.log.Debug().Msg("listening on port")
ready()
err = g.Server.Serve(l) // blocking call
if err != nil {
g.log.Err(err).Msg("fatal error in grpc server")
ctx.Throw(err)
}
}
// GRPCAddress returns the listen address of the GRPC server.
// Guaranteed to be non-nil after Engine.Ready is closed.
func (g *GrpcServer) GRPCAddress() net.Addr {
g.addrLock.RLock()
defer g.addrLock.RUnlock()
return g.grpcAddress
}
// shutdownWorker is a worker routine which shuts down server when the context is cancelled.
func (g *GrpcServer) shutdownWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()
<-ctx.Done()
g.Server.GracefulStop()
}