forked from leg100/otf
/
daemon.go
343 lines (316 loc) · 10.9 KB
/
daemon.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
package agent
import (
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/spf13/pflag"
"github.com/tofutf/tofutf/internal"
otfapi "github.com/tofutf/tofutf/internal/api"
"github.com/tofutf/tofutf/internal/configversion"
"github.com/tofutf/tofutf/internal/logr"
"github.com/tofutf/tofutf/internal/logs"
"github.com/tofutf/tofutf/internal/releases"
"github.com/tofutf/tofutf/internal/run"
"github.com/tofutf/tofutf/internal/state"
"github.com/tofutf/tofutf/internal/variable"
"github.com/tofutf/tofutf/internal/workspace"
"golang.org/x/sync/errgroup"
)
const DefaultConcurrency = 5
var (
PluginCacheDir = filepath.Join(os.TempDir(), "plugin-cache")
DefaultEnvs = []string{
"TF_IN_AUTOMATION=true",
"CHECKPOINT_DISABLE=true",
}
)
type (
// Config is configuration for an agent daemon
Config struct {
Name string // descriptive name for agent
Concurrency int // number of jobs the agent can execute at any one time
Sandbox bool // isolate privileged ops within sandbox
Debug bool // toggle debug mode
PluginCache bool // toggle use of terraform's shared plugin cache
TerraformBinDir string // destination directory for terraform binaries
}
)
func NewConfigFromFlags(flags *pflag.FlagSet) *Config {
cfg := Config{}
flags.IntVar(&cfg.Concurrency, "concurrency", DefaultConcurrency, "Number of runs that can be processed concurrently")
flags.BoolVar(&cfg.Sandbox, "sandbox", false, "Isolate terraform apply within sandbox for additional security")
flags.BoolVar(&cfg.Debug, "debug", false, "Enable agent debug mode which dumps additional info to terraform runs.")
flags.BoolVar(&cfg.PluginCache, "plugin-cache", false, "Enable shared plugin cache for terraform providers.")
flags.StringVar(&cfg.Name, "name", "", "Give agent a descriptive name. Optional.")
return &cfg
}
// daemon implements the agent itself.
type daemon struct {
*daemonClient
envs []string // terraform environment variables
config Config
downloader downloader
registered chan *Agent
logger logr.Logger // logger that logs messages regardless of whether agent is a pool agent or not.
poolLogger logr.Logger // logger that only logs messages if the agent is a pool agent.
isPoolAgent bool
}
type DaemonOptions struct {
Logger logr.Logger
Config Config
client *daemonClient
// whether daemon is for a pool agent (true) or for a server agent (false).
isPoolAgent bool
}
// downloader downloads terraform versions
type downloader interface {
Download(ctx context.Context, version string, w io.Writer) (string, error)
}
// newDaemon constructs an agent daemon.
func newDaemon(opts DaemonOptions) (*daemon, error) {
poolLogger := opts.Logger
if !opts.isPoolAgent {
// disable logging for server agents otherwise the server logs are
// likely to contain duplicate logs from both the agent daemon and the
// agent service, but still make logger available to server agent when
// it does need to log something.
poolLogger = logr.NewNoopLogger()
}
if opts.Config.Concurrency == 0 {
opts.Config.Concurrency = DefaultConcurrency
}
if opts.Config.Debug {
opts.Logger.V(0).Info("enabled debug mode")
}
if opts.Config.Sandbox {
if _, err := exec.LookPath("bwrap"); errors.Is(err, exec.ErrNotFound) {
return nil, fmt.Errorf("sandbox mode requires bubblewrap: %w", err)
}
opts.Logger.V(0).Info("enabled sandbox mode")
}
d := &daemon{
daemonClient: opts.client,
envs: DefaultEnvs,
downloader: releases.NewDownloader(opts.Config.TerraformBinDir),
registered: make(chan *Agent),
config: opts.Config,
poolLogger: poolLogger,
logger: opts.Logger,
isPoolAgent: opts.isPoolAgent,
}
if opts.Config.PluginCache {
if err := os.MkdirAll(PluginCacheDir, 0o755); err != nil {
return nil, fmt.Errorf("creating plugin cache directory: %w", err)
}
d.envs = append(d.envs, "TF_PLUGIN_CACHE_DIR="+PluginCacheDir)
opts.Logger.V(0).Info("enabled plugin cache", "path", PluginCacheDir)
}
return d, nil
}
type ServerDaemonOptions struct {
Logger logr.Logger
Config Config
RunService *run.Service
WorkspaceService *workspace.Service
VariableService *variable.Service
ConfigurationVersionService *configversion.Service
StateService *state.Service
LogsService *logs.Service
AgentService *Service
HostnameService *internal.HostnameService
}
// NewServerDaemon constructs a server agent daemon that is part of the otfd
// server.
func NewServerDaemon(logger logr.Logger, cfg Config, opts ServerDaemonOptions) (*daemon, error) {
return newDaemon(DaemonOptions{
Logger: logger,
Config: cfg,
client: &daemonClient{
runs: opts.RunService,
workspaces: opts.WorkspaceService,
state: opts.StateService,
variables: opts.VariableService,
configs: opts.ConfigurationVersionService,
logs: opts.LogsService,
agents: opts.AgentService,
server: opts.HostnameService,
},
})
}
// NewPoolDaemon constructs a pool agent daemon that communicates with the otfd server via RPC.
func NewPoolDaemon(logger logr.Logger, cfg Config, apiConfig otfapi.Config) (*daemon, error) {
rpcClient, err := newRPCDaemonClient(apiConfig, nil)
if err != nil {
return nil, err
}
return newDaemon(DaemonOptions{
Logger: logger,
Config: cfg,
client: rpcClient,
isPoolAgent: true,
})
}
// Start the agent daemon.
func (d *daemon) Start(ctx context.Context) error {
d.poolLogger.Info("starting agent", "version", internal.Version)
// initialize terminator
terminator := &terminator{mapping: make(map[JobSpec]cancelable)}
if !d.isPoolAgent {
// prior to registration, the server agent identifies itself as an
// unregisteredServerAgent (the pool agent identifies itself as an
// unregisteredPoolAgent but the server-side token middleware handles
// that).
ctx = internal.AddSubjectToContext(ctx, &unregisteredServerAgent{})
}
// register agent with server
agent, err := d.agents.registerAgent(ctx, registerAgentOptions{
Name: d.config.Name,
Version: internal.Version,
Concurrency: d.config.Concurrency,
})
if err != nil {
return err
}
d.poolLogger.Info("registered successfully", "agent", agent)
// send registered agent to channel, letting caller know agent has
// registered.
go func() {
d.registered <- agent
}()
if !d.isPoolAgent {
// server agents should identify themselves as a serverAgent
// (pool agents identify themselves as a poolAgent, but the
// bearer token middleware takes care of that server-side).
ctx = internal.AddSubjectToContext(ctx, &serverAgent{Agent: agent})
}
g, ctx := errgroup.WithContext(ctx)
g.Go(func() (err error) {
defer func() {
// send final status update using a context that is still valid
// for a further 10 seconds unless daemon is forcefully shutdown.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if updateErr := d.agents.updateAgentStatus(ctx, agent.ID, AgentExited); updateErr != nil {
err = fmt.Errorf("sending final status update: %w", updateErr)
} else {
d.logger.Info("sent final status update", "status", "exited")
}
}()
// every 10 seconds update the agent status
ticker := time.NewTicker(10 * time.Second)
for {
select {
case <-ticker.C:
// send agent status update
status := AgentIdle
if terminator.totalJobs() > 0 {
status = AgentBusy
}
if err := d.agents.updateAgentStatus(ctx, agent.ID, status); err != nil {
if ctx.Err() != nil {
// context canceled
return nil
}
if errors.Is(err, internal.ErrConflict) {
// exit, compelling agent to re-register - this may
// happen when the server has de-registered the agent,
// which it may do when it hasn't heard from the agent
// in a while and the agent only belatedly succeeds in
// sending an update.
return errors.New("agent status update failed due to conflict; agent needs to re-register")
} else {
d.poolLogger.Error(err, "sending agent status update", "status", status)
}
} else {
d.poolLogger.V(9).Info("sent agent status update", "status", status)
}
case <-ctx.Done():
// context canceled
return nil
}
}
})
g.Go(func() (err error) {
defer func() {
if terminator.totalJobs() > 0 {
d.logger.Info("gracefully canceling in-progress jobs", "total", terminator.totalJobs())
// The interrupt sent to the main process is also sent to the
// forked terraform processes, so there is no need to send the
// latter another interrupt but merely set the cancel semaphore
// on each operation.
terminator.stopAll()
}
}()
// fetch jobs allocated to this agent and launch workers to do jobs; also
// handle cancelation signals for jobs
for {
processJobs := func() (err error) {
d.poolLogger.Info("waiting for next job")
// block on waiting for jobs
jobs, err := d.agents.getAgentJobs(ctx, agent.ID)
if err != nil {
return err
}
for _, j := range jobs {
if j.Status == JobAllocated {
d.poolLogger.Info("received job", "job", j)
// start job and receive job token in return
token, err := d.agents.startJob(ctx, j.Spec)
if err != nil {
if ctx.Err() != nil {
return nil
}
d.poolLogger.Error(err, "starting job")
continue
}
d.poolLogger.V(0).Info("started job")
op := newOperation(newOperationOptions{
logger: d.poolLogger.WithValues("job", j),
client: d.daemonClient,
config: d.config,
agentID: agent.ID,
job: j,
downloader: d.downloader,
envs: d.envs,
token: token,
isPoolAgent: d.isPoolAgent,
})
// check operation in with the terminator, so that if a cancelation signal
// arrives it can be handled accordingly for the duration of the operation.
terminator.checkIn(j.Spec, op)
op.V(0).Info("started job")
g.Go(func() error {
op.doAndFinish()
terminator.checkOut(op.job.Spec)
return nil
})
} else if j.Signaled != nil {
d.poolLogger.Info("received cancelation signal", "force", *j.Signaled, "job", j)
terminator.cancel(j.Spec, *j.Signaled, true)
}
}
return nil
}
policy := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
_ = backoff.RetryNotify(processJobs, policy, func(err error, next time.Duration) {
d.poolLogger.Error(err, "waiting for next job", "backoff", next)
})
// only stop retrying if context is canceled
if ctx.Err() != nil {
return nil
}
}
})
return g.Wait()
}
// Registered returns the daemon's corresponding agent on a channel once it has
// successfully registered.
func (d *daemon) Registered() <-chan *Agent {
return d.registered
}