/
viceroyd.go
178 lines (152 loc) · 4.8 KB
/
viceroyd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// Package viceroyd implements the viceroy daemon. viceroyd manages workers and
// proxies build commands to running workers.
package viceroyd
import (
"context"
"fmt"
"net"
"net/url"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/mitchellh/go-homedir"
"github.com/rfratto/viceroy/internal/fine/grpcfine"
"github.com/rfratto/viceroy/internal/fine/server"
"github.com/rfratto/viceroy/internal/viceroypb"
"google.golang.org/grpc"
)
// DefaultOptions is the set of defaults for viceroyd.
var DefaultOptions = Options{
ListenAddr: "tcp://0.0.0.0:12194",
ProvisionerContainer: "rfratto/viceroy:latest",
ProvisionerContainerName: "viceroy-worker",
}
type Options struct {
ListenAddr string // Address to listen for client connections.
ProvisionerContainer string // Container to use for provisioning
ProvisionerContainerName string // Container name to use for provisioning
}
// Daemon is the viceroy daemon. Daemon exposes a gRPC API.
type Daemon struct {
log log.Logger
lis net.Listener
srv *grpc.Server
opts Options
ctx context.Context
cancel context.CancelFunc
}
// New creates a new Daemon.
func New(l log.Logger, o Options) (d *Daemon, err error) {
u, err := url.Parse(o.ListenAddr)
if err != nil {
return nil, fmt.Errorf("cannot parse listen addr %q as url: %w", o.ListenAddr, err)
}
address, err := homedir.Expand(u.Host + u.Path)
if err != nil {
return nil, fmt.Errorf("invalid listen addr: %w", err)
}
lis, err := net.Listen(u.Scheme, address)
if err != nil {
return nil, fmt.Errorf("cannot open %s listener %s: %w", u.Scheme, address, err)
}
srv := grpc.NewServer(
grpc.ChainUnaryInterceptor(loggingUnaryInterceptor(l)),
grpc.ChainStreamInterceptor(loggingStreamingInterceptor(l)),
)
ctx, cancel := context.WithCancel(context.Background())
defer func() {
if err != nil {
cancel()
}
}()
d = &Daemon{
log: l,
lis: lis,
srv: srv,
opts: o,
ctx: ctx,
cancel: cancel,
}
provisioner, err := newProvisioner(l, o, d.mountHost)
if err != nil {
return nil, fmt.Errorf("failed to create provisioner: %w", err)
}
commandRunner, err := newProvisionerRunner(l, provisioner)
if err != nil {
return nil, fmt.Errorf("failed to create compiler server: %w", err)
}
viceroypb.RegisterRunnerServer(srv, commandRunner)
return d, nil
}
func (d *Daemon) mountHost(c *Container) (err error) {
// TODO(rfratto): since we expect the container to have a grpcfine server, we
// can move this to the provisioner instead.
l := log.With(d.log, "container", c.ID)
transportCli := grpcfine.NewTransportClient(c.FS)
transportStream, err := transportCli.Stream(d.ctx)
if err != nil {
return fmt.Errorf("connecting filesystem: %w", err)
}
defer func() {
// Close our transport if we're returning with an error.
if err != nil {
level.Debug(l).Log("msg", "closing transport on return")
_ = transportStream.CloseSend()
}
}()
transport := grpcfine.NewClientTransport(transportStream, grpcfine.MsgpackCodec())
fsys, err := server.New(l, server.Options{
ConcurrencyLimit: server.DefaultOptions.ConcurrencyLimit,
RequestTimeout: 15 * time.Second,
Transport: transport,
Handler: server.Passthrough(nil, "/"),
})
if err != nil {
return fmt.Errorf("creating remote filesystem: %w", err)
}
go func() {
level.Debug(l).Log("msg", "serving remote filesystem")
err := fsys.Serve(d.ctx)
level.Info(l).Log("msg", "remote filesystem exited", "err", err)
// TODO(rfratto): what's the right thing to do here? Shut down the
// container? Re-establish the filesystem?
//
// Shutting down the container seems the easiest to recover from, while
// re-establishing might fail for any number of reasons.
}()
// TODO(rfratto): this should use a backoff to check for a specific mounted file in the overlay
time.Sleep(1500 * time.Millisecond)
return nil
}
// Start starts d and doesn't return until it stops or there's an error.
func (d *Daemon) Start() error {
level.Info(d.log).Log("msg", "starting viceroyd", "listen_addr", d.lis.Addr().String())
return d.srv.Serve(d.lis)
}
func (d *Daemon) Stop() error {
d.cancel()
d.srv.GracefulStop()
return nil
}
func loggingUnaryInterceptor(l log.Logger) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
level.Debug(l).Log("msg", "receieved gRPC request", "method", info.FullMethod)
return handler(ctx, req)
}
}
func loggingStreamingInterceptor(l log.Logger) grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
level.Debug(l).Log("msg", "receieved gRPC request", "method", info.FullMethod)
return handler(srv, ss)
}
}