forked from hashicorp/nomad
-
Notifications
You must be signed in to change notification settings - Fork 0
/
qemu.go
324 lines (280 loc) · 8.87 KB
/
qemu.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
package driver
import (
"bytes"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"syscall"
"time"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/nomad/structs"
)
var (
reQemuVersion = regexp.MustCompile("QEMU emulator version ([\\d\\.]+).+")
)
// QemuDriver is a driver for running images via Qemu
// We attempt to chose sane defaults for now, with more configuration available
// planned in the future
type QemuDriver struct {
DriverContext
}
// qemuHandle is returned from Start/Open as a handle to the PID
type qemuHandle struct {
proc *os.Process
vmID string
waitCh chan error
doneCh chan struct{}
}
// qemuPID is a struct to map the pid running the process to the vm image on
// disk
type qemuPID struct {
Pid int
VmID string
}
// NewQemuDriver is used to create a new exec driver
func NewQemuDriver(ctx *DriverContext) Driver {
return &QemuDriver{*ctx}
}
func (d *QemuDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool, error) {
// Only enable if we are root when running on non-windows systems.
if runtime.GOOS != "windows" && syscall.Geteuid() != 0 {
d.logger.Printf("[DEBUG] driver.qemu: must run as root user, disabling")
return false, nil
}
outBytes, err := exec.Command("qemu-system-x86_64", "-version").Output()
if err != nil {
return false, nil
}
out := strings.TrimSpace(string(outBytes))
matches := reQemuVersion.FindStringSubmatch(out)
if len(matches) != 2 {
return false, fmt.Errorf("Unable to parse Qemu version string: %#v", matches)
}
node.Attributes["driver.qemu"] = "1"
node.Attributes["driver.qemu.version"] = matches[1]
return true, nil
}
// Run an existing Qemu image. Start() will pull down an existing, valid Qemu
// image and save it to the Drivers Allocation Dir
func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
// Get the image source
source, ok := task.Config["image_source"]
if !ok || source == "" {
return nil, fmt.Errorf("Missing source image Qemu driver")
}
// Qemu defaults to 128M of RAM for a given VM. Instead, we force users to
// supply a memory size in the tasks resources
if task.Resources == nil || task.Resources.MemoryMB == 0 {
return nil, fmt.Errorf("Missing required Task Resource: Memory")
}
// Attempt to download the thing
// Should be extracted to some kind of Http Fetcher
// Right now, assume publicly accessible HTTP url
resp, err := http.Get(source)
if err != nil {
return nil, fmt.Errorf("Error downloading source for Qemu driver: %s", err)
}
// Get the tasks local directory.
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
taskLocal := filepath.Join(taskDir, allocdir.TaskLocal)
// Create a location in the local directory to download and store the image.
// TODO: Caching
vmID := fmt.Sprintf("qemu-vm-%s-%s", structs.GenerateUUID(), filepath.Base(source))
fPath := filepath.Join(taskLocal, vmID)
vmPath, err := os.OpenFile(fPath, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return nil, fmt.Errorf("Error opening file to download to: %s", err)
}
defer vmPath.Close()
defer resp.Body.Close()
// Copy remote file to local AllocDir for execution
// TODO: a retry of sort if io.Copy fails, for large binaries
_, ioErr := io.Copy(vmPath, resp.Body)
if ioErr != nil {
return nil, fmt.Errorf("Error copying Qemu image from source: %s", ioErr)
}
// compute and check checksum
if check, ok := task.Config["checksum"]; ok {
d.logger.Printf("[DEBUG] Running checksum on (%s)", vmID)
hasher := sha256.New()
file, err := os.Open(vmPath.Name())
if err != nil {
return nil, fmt.Errorf("Failed to open file for checksum")
}
defer file.Close()
io.Copy(hasher, file)
sum := hex.EncodeToString(hasher.Sum(nil))
if sum != check {
return nil, fmt.Errorf(
"Error in Qemu: checksums did not match.\nExpected (%s), got (%s)",
check,
sum)
}
}
// Parse configuration arguments
// Create the base arguments
accelerator := "tcg"
if acc, ok := task.Config["accelerator"]; ok {
accelerator = acc
}
// TODO: Check a lower bounds, e.g. the default 128 of Qemu
mem := fmt.Sprintf("%dM", task.Resources.MemoryMB)
args := []string{
"qemu-system-x86_64",
"-machine", "type=pc,accel=" + accelerator,
"-name", vmID,
"-m", mem,
"-drive", "file=" + vmPath.Name(),
"-nodefconfig",
"-nodefaults",
"-nographic",
}
// Check the Resources required Networks to add port mappings. If no resources
// are required, we assume the VM is a purely compute job and does not require
// the outside world to be able to reach it. VMs ran without port mappings can
// still reach out to the world, but without port mappings it is effectively
// firewalled
if len(task.Resources.Networks) > 0 {
// TODO: Consolidate these into map of host/guest port when we have HCL
// Note: Host port must be open and available
// Get and split guest ports. The guest_ports configuration must match up with
// the Reserved ports in the Task Resources
// Users can supply guest_hosts as a list of posts to map on the guest vm.
// These map 1:1 with the requested Reserved Ports from the hostmachine.
ports := strings.Split(task.Config["guest_ports"], ",")
if len(ports) == 0 {
return nil, fmt.Errorf("[ERR] driver.qemu: Error parsing required Guest Ports")
}
// TODO: support more than a single, default Network
if len(ports) != len(task.Resources.Networks[0].ReservedPorts) {
return nil, fmt.Errorf("[ERR] driver.qemu: Error matching Guest Ports with Reserved ports")
}
// Loop through the reserved ports and construct the hostfwd string, to map
// reserved ports to the ports listenting in the VM
// Ex:
// hostfwd=tcp::22000-:22,hostfwd=tcp::80-:8080
reservedPorts := task.Resources.Networks[0].ReservedPorts
var forwarding string
for i, p := range ports {
forwarding = fmt.Sprintf("%s,hostfwd=tcp::%s-:%s", forwarding, strconv.Itoa(reservedPorts[i]), p)
}
if "" == forwarding {
return nil, fmt.Errorf("[ERR] driver.qemu: Error constructing port forwarding")
}
args = append(args,
"-netdev",
fmt.Sprintf("user,id=user.0%s", forwarding),
"-device", "virtio-net,netdev=user.0",
)
}
// If using KVM, add optimization args
if accelerator == "kvm" {
args = append(args,
"-enable-kvm",
"-cpu", "host",
// Do we have cores information available to the Driver?
// "-smp", fmt.Sprintf("%d", cores),
)
}
// Start Qemu
var outBuf, errBuf bytes.Buffer
cmd := exec.Command(args[0], args[1:]...)
cmd.Stdout = &outBuf
cmd.Stderr = &errBuf
d.logger.Printf("[DEBUG] Starting QemuVM command: %q", strings.Join(args, " "))
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf(
"Error running QEMU: %s\n\nOutput: %s\n\nError: %s",
err, outBuf.String(), errBuf.String())
}
d.logger.Printf("[INFO] Started new QemuVM: %s", vmID)
// Create and Return Handle
h := &qemuHandle{
proc: cmd.Process,
vmID: vmPath.Name(),
doneCh: make(chan struct{}),
waitCh: make(chan error, 1),
}
go h.run()
return h, nil
}
func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error) {
// Parse the handle
pidBytes := []byte(strings.TrimPrefix(handleID, "QEMU:"))
qpid := &qemuPID{}
if err := json.Unmarshal(pidBytes, qpid); err != nil {
return nil, fmt.Errorf("failed to parse Qemu handle '%s': %v", handleID, err)
}
// Find the process
proc, err := os.FindProcess(qpid.Pid)
if proc == nil || err != nil {
return nil, fmt.Errorf("failed to find Qemu PID %d: %v", qpid.Pid, err)
}
// Return a driver handle
h := &qemuHandle{
proc: proc,
vmID: qpid.VmID,
doneCh: make(chan struct{}),
waitCh: make(chan error, 1),
}
go h.run()
return h, nil
}
func (h *qemuHandle) ID() string {
// Return a handle to the PID
pid := &qemuPID{
Pid: h.proc.Pid,
VmID: h.vmID,
}
data, err := json.Marshal(pid)
if err != nil {
log.Printf("[ERR] failed to marshal Qemu PID to JSON: %s", err)
}
return fmt.Sprintf("QEMU:%s", string(data))
}
func (h *qemuHandle) WaitCh() chan error {
return h.waitCh
}
func (h *qemuHandle) Update(task *structs.Task) error {
// Update is not possible
return nil
}
// Kill is used to terminate the task. We send an Interrupt
// and then provide a 5 second grace period before doing a Kill.
//
// TODO: allow a 'shutdown_command' that can be executed over a ssh connection
// to the VM
func (h *qemuHandle) Kill() error {
h.proc.Signal(os.Interrupt)
select {
case <-h.doneCh:
return nil
case <-time.After(5 * time.Second):
return h.proc.Kill()
}
}
func (h *qemuHandle) run() {
ps, err := h.proc.Wait()
close(h.doneCh)
if err != nil {
h.waitCh <- err
} else if !ps.Success() {
h.waitCh <- fmt.Errorf("task exited with error")
}
close(h.waitCh)
}