forked from thought-machine/please
/
grpc_server.go
160 lines (145 loc) · 5.26 KB
/
grpc_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// +build !bootstrap
// Package follow implements remote connections to other plz processes.
// Specifically it implements a gRPC server and client that can stream
// build events.
package follow
import (
"fmt"
"net"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
"gopkg.in/op/go-logging.v1"
"github.com/thought-machine/please/src/core"
pb "github.com/thought-machine/please/src/follow/proto/build_event"
)
var log = logging.MustGetLogger("remote")
// disconnectTimeout is the grace period we give clients to disconnect before they are ditched.
var disconnectTimeout = 1 * time.Second
// buffering is the size of buffer we allocate in the server channels.
// Larger values consume more memory but protect better against slow clients.
const buffering = 1000
// InitialiseServer sets up the gRPC server on the given port.
// It dies on any errors.
// The returned function should be called to shut down once the server is no longer required.
func InitialiseServer(state *core.BuildState, port int) func() {
_, f := initialiseServer(state, port)
return f
}
// initialiseServer sets up the gRPC server on the given port.
// It's split out from the above for testing purposes.
func initialiseServer(state *core.BuildState, port int) (string, func()) {
// TODO(peterebden): TLS support
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatalf("%s", err)
}
addr := lis.Addr().String()
s := grpc.NewServer()
server := &eventServer{State: state}
results, _ := state.RemoteResults()
go server.MultiplexEvents(results)
pb.RegisterPlzEventsServer(s, server)
go s.Serve(lis)
log.Notice("Serving events over gRPC on :%s", addr)
return addr, func() {
stopServer(s)
}
}
// An eventServer handles the RPC requests to connected clients.
type eventServer struct {
State *core.BuildState
Clients []chan *pb.BuildEventResponse
}
// ServerConfig implements the RPC interface.
func (e *eventServer) ServerConfig(ctx context.Context, r *pb.ServerConfigRequest) (*pb.ServerConfigResponse, error) {
targets := make([]*pb.BuildLabel, len(e.State.OriginalTargets))
for i, t := range e.State.OriginalTargets {
targets[i] = toProtoBuildLabel(t)
}
_, results := e.State.RemoteResults()
return &pb.ServerConfigResponse{
NumThreads: int32(e.State.Config.Please.NumThreads),
OriginalTargets: targets,
Tests: e.State.NeedTests,
Coverage: e.State.NeedCoverage,
LastEvents: toProtos(results, e.State.NumActive(), e.State.NumDone()),
StartTime: e.State.StartTime.UnixNano(),
}, nil
}
// BuildEvents implements the RPC interface.
func (e *eventServer) BuildEvents(r *pb.BuildEventRequest, s pb.PlzEvents_BuildEventsServer) error {
if p, ok := peer.FromContext(s.Context()); ok {
log.Notice("Remote client connected from %s to receive events", p.Addr)
}
c := make(chan *pb.BuildEventResponse, buffering)
e.Clients = append(e.Clients, c)
// Client is now connected to the stream and will receive all events from here on.
for event := range c {
if err := s.Send(event); err != nil {
// Something's stuffed, disconnect the client from our event streams
log.Notice("Remote client disconnected (%s)", err)
for i, client := range e.Clients {
if client == c {
copy(e.Clients[i:], e.Clients[i+1:])
last := len(e.Clients) - 1
e.Clients[last] = nil
e.Clients = e.Clients[:last]
}
}
return err
}
}
log.Notice("Events finished, terminating remote session")
return nil
}
// ResourceUsage implements the RPC interface.
func (e *eventServer) ResourceUsage(r *pb.ResourceUsageRequest, s pb.PlzEvents_ResourceUsageServer) error {
// This doesn't necessarily have to match the update frequency, but it seems sensible to do so
// since the clients won't get any benefit of anything more frequent.
ticker := time.NewTicker(resourceUpdateFrequency)
defer ticker.Stop()
for range ticker.C {
if err := s.Send(resourceToProto(e.State.Stats)); err != nil {
return err
}
}
return nil
}
// MultiplexEvents receives events from core and distributes them to receiving clients
func (e *eventServer) MultiplexEvents(ch <-chan *core.BuildResult) {
for r := range ch {
p := toProto(r)
// Target labels don't exist on the internal build events, retrieve them here.
if t := e.State.Graph.Target(r.Label); t != nil {
p.Labels = t.Labels
}
// Similarly these fields come off the state, they're not stored historically for each event.
p.NumActive = int64(e.State.NumActive())
p.NumDone = int64(e.State.NumDone())
for _, c := range e.Clients {
c <- p
}
}
log.Info("Reached end of event stream, shutting down connected clients")
for _, c := range e.Clients {
close(c) // This terminates communication with whichever client is on the end of it.
}
log.Info("Closed channels to all connected clients")
}
// stopServer implements a graceful server stop with a timeout, followed by a non-graceful (ungainly?) shutdown.
// Essentially GracefulStop can block forever and we don't want to allow clients to do that to us.
func stopServer(s *grpc.Server) {
ch := make(chan bool, 1)
go func() {
s.GracefulStop()
ch <- true
}()
select {
case <-ch:
case <-time.After(disconnectTimeout):
log.Warning("Remote client hasn't disconnected in alloted time, rapid shutdown initiated")
s.Stop()
}
}