-
Notifications
You must be signed in to change notification settings - Fork 251
/
dockerworker.go
189 lines (160 loc) · 5.14 KB
/
dockerworker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package dockerworker
import (
"encoding/json"
"fmt"
"log"
"os"
"os/exec"
"path/filepath"
"strings"
"github.com/taskcluster/taskcluster/v55/tools/worker-runner/cfg"
"github.com/taskcluster/taskcluster/v55/tools/worker-runner/run"
"github.com/taskcluster/taskcluster/v55/tools/worker-runner/util"
"github.com/taskcluster/taskcluster/v55/tools/worker-runner/worker/worker"
"github.com/taskcluster/taskcluster/v55/tools/workerproto"
)
type dockerworkerConfig struct {
Path string
ConfigPath string
}
type dockerworker struct {
runnercfg *cfg.RunnerConfig
wicfg dockerworkerConfig
cmd *exec.Cmd
}
func (d *dockerworker) ConfigureRun(state *run.State) error {
state.Lock()
defer state.Unlock()
var err error
// copy some values from the provisioner metadata, if they are set; if not,
// docker-worker will fall back to defaults
for cfg, md := range map[string]string{
// docker-worker config : providerMetadata
"host": "public-hostname",
"publicIp": "public-ipv4",
"privateIp": "local-ipv4",
"workerNodeType": "instance-type",
"instanceType": "instance-type",
"instanceId": "instance-id",
"region": "region",
} {
v, ok := state.ProviderMetadata[md]
if ok {
state.WorkerConfig, err = state.WorkerConfig.Set(cfg, v)
if err != nil {
return err
}
} else {
log.Printf("provider metadata %s not available; not setting config %s", md, cfg)
}
}
workerLocationJson, err := json.Marshal(state.WorkerLocation)
if err != nil {
return fmt.Errorf("error encoding worker location: %v", err)
}
state.WorkerConfig, err = state.WorkerConfig.Set("workerLocation", string(workerLocationJson))
if err != nil {
return fmt.Errorf("could not set worker location in the worker config: %v", err)
}
set := func(key, value string) {
var err error
// only programming errors can cause this to fail
state.WorkerConfig, err = state.WorkerConfig.Set(key, value)
if err != nil {
panic(err)
}
}
set("rootUrl", state.RootURL)
set("taskcluster.clientId", state.Credentials.ClientID)
set("taskcluster.accessToken", state.Credentials.AccessToken)
if state.Credentials.Certificate != "" {
set("taskcluster.certificate", state.Credentials.Certificate)
}
set("workerId", state.WorkerID)
set("workerGroup", state.WorkerGroup)
workerPoolID := strings.SplitAfterN(state.WorkerPoolID, "/", 2)
set("provisionerId", workerPoolID[0][:len(workerPoolID[0])-1])
set("workerType", workerPoolID[1])
return nil
}
func (d *dockerworker) UseCachedRun(state *run.State) error {
return nil
}
func (d *dockerworker) StartWorker(state *run.State) (workerproto.Transport, error) {
state.Lock()
defer state.Unlock()
// write out the config file
content, err := json.MarshalIndent(state.WorkerConfig, "", " ")
if err != nil {
return nil, fmt.Errorf("error constructing worker config: %v", err)
}
err = os.WriteFile(d.wicfg.ConfigPath, content, 0600)
if err != nil {
return nil, fmt.Errorf("error writing worker config to %s: %v", d.wicfg.ConfigPath, err)
}
// the --host worker-runner instructs docker-worker to merge
// config from $DOCKER_WORKER_CONFIG.
args := []string{"--host", "worker-runner", "production"}
// Support the `bin/docker-worker` script in the docker-worker tarball format,
// falling back to the old repo format if that is not found
var cmd *exec.Cmd
dwScript := filepath.Join(d.wicfg.Path, "bin", "docker-worker")
if _, err := os.Stat(dwScript); os.IsNotExist(err) {
// fall back to running the old path-to-repo format
mainJs := fmt.Sprintf("%s/src/bin/worker.js", d.wicfg.Path)
args = append([]string{mainJs}, args...)
cmd = exec.Command("node", args...)
} else {
cmd = exec.Command(dwScript, args...)
}
cmd.Env = os.Environ()
cmd.Env = append(cmd.Env, "DOCKER_WORKER_CONFIG="+d.wicfg.ConfigPath)
cmd.Stderr = os.Stderr
d.cmd = cmd
cmdStdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
cmdStdin, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
transp := workerproto.NewPipeTransport(cmdStdout, cmdStdin)
err = cmd.Start()
if err != nil {
return nil, err
}
if err = util.DisableOOM(cmd.Process.Pid); err != nil {
log.Printf("Error disabling OOM killer for the docker-worker process: %v", err)
}
return transp, nil
}
func (d *dockerworker) SetProtocol(proto *workerproto.Protocol) {
}
func (d *dockerworker) Wait() error {
return d.cmd.Wait()
}
func New(runnercfg *cfg.RunnerConfig) (worker.Worker, error) {
rv := dockerworker{runnercfg, dockerworkerConfig{}, nil}
err := runnercfg.WorkerImplementation.Unpack(&rv.wicfg)
if err != nil {
return nil, err
}
return &rv, nil
}
func Usage() string {
return `
The "docker-worker" worker implementation starts docker-worker
(https://github.com/taskcluster/taskcluster/tree/main/workers/docker-worker). It takes the following
values in the 'worker' section of the runner configuration:
` + "```yaml" + `
worker:
implementation: docker-worker
# path to the 'docker-worker' directory from the docker-worker release tarball
path: /path/to/docker-worker
# path where worker-runner should write the generated
# docker-worker configuration.
configPath: ..
` + "```" + `
`
}