/
vms.go
657 lines (527 loc) · 13.7 KB
/
vms.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
// Copyright (2012) Sandia Corporation.
// Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
// the U.S. Government retains certain rights in this software.
package main
import (
"errors"
"fmt"
"meshage"
"minicli"
log "minilog"
"ranges"
"ron"
"runtime"
"strconv"
"strings"
"sync"
"time"
)
// VMs contains all the VMs running on this host, the key is the VM's ID
type VMs struct {
m map[int]VM
mu sync.Mutex
}
// vmApplyFunc is passed into VMs.apply
type vmApplyFunc func(VM, bool) (bool, error)
type Tag struct {
ID int
Key, Value string
}
// QueuedVMs stores all the info needed to launch a batch of VMs
type QueuedVMs struct {
Names []string
VMType // embed
VMConfig // embed
// book keeping for scheduler
// counts for colocated VMs, indexed by name
colocatedCounts map[string]int
// sum of colocatedCounts, used for sorting
colocatedCount int
}
// GetFiles looks through the VMConfig for files in the IOMESHAGE directory and
// fetches them if they do not already exist. Currently, we enumerate all the
// fields that take a file.
func (q QueuedVMs) GetFiles() error {
files := []string{
q.ContainerConfig.Preinit,
q.KVMConfig.CdromPath,
q.KVMConfig.InitrdPath,
q.KVMConfig.KernelPath,
q.KVMConfig.MigratePath,
}
for _, f := range q.KVMConfig.Disks {
files = append(files, f.Path)
}
for _, f := range files {
if strings.HasPrefix(f, *f_iomBase) {
if _, err := iomHelper(f); err != nil {
return err
}
}
}
return nil
}
// Count of launched VMs.
func (vms *VMs) Count() int {
vms.mu.Lock()
defer vms.mu.Unlock()
return len(vms.m)
}
// Limit is the lowest coschedule value for VMs (-1 is no limit)
func (vms *VMs) Limit() int {
vms.mu.Lock()
defer vms.mu.Unlock()
// assume unlimited
limit := -1
for _, vm := range vms.m {
// update if limit is unlimited or we're not unlimited and we're less
// than the previous limit
v := vm.GetCoschedule()
if limit == -1 || (v != -1 && v < limit) {
limit = v
}
}
return limit
}
// Returns the total cpu, memory, and network commit across all VMs.
func (vms *VMs) Commit() (uint64, uint64, int) {
vms.mu.Lock()
defer vms.mu.Unlock()
cpu := uint64(0)
mem := uint64(0)
net := 0
for _, vm := range vms.m {
cpu += vm.GetCPUs()
mem += vm.GetMem()
net += len(vm.GetNetworks())
}
return cpu, mem, net
}
// Info populates resp with info about launched VMs.
func (vms *VMs) Info(masks []string, resp *minicli.Response) {
vms.mu.Lock()
defer vms.mu.Unlock()
resp.Header = masks
// for resp.Data
res := []VM{}
for _, vm := range vms.m {
// Update dynamic fields before querying info
vm.UpdateNetworks()
// Copy the VM and use the copy from here on. This ensures that the
// Tabular info matches the Data field.
vm := vm.Copy()
res = append(res, vm)
row := []string{}
for _, mask := range masks {
if v, err := vm.Info(mask); err != nil {
// Field most likely not set for VM type
row = append(row, "N/A")
} else {
row = append(row, v)
}
}
resp.Tabular = append(resp.Tabular, row)
}
resp.Data = res
}
// FindVM finds a VM based on its ID, name, or UUID.
func (vms *VMs) FindVM(s string) VM {
vms.mu.Lock()
defer vms.mu.Unlock()
return vms.findVM(s)
}
// findVM assumes vms.mu is held.
func (vms *VMs) findVM(s string) VM {
if id, err := strconv.Atoi(s); err == nil {
if vm, ok := vms.m[id]; ok {
return vm
}
return nil
}
// Search for VM by name or UUID
for _, vm := range vms.m {
if vm.GetName() == s || vm.GetUUID() == s {
return vm
}
}
return nil
}
// FindContainerVM finds a VM based on its ID, name, or UUID.
func (vms *VMs) FindContainerVM(s string) (*ContainerVM, error) {
vms.mu.Lock()
defer vms.mu.Unlock()
return vms.findContainerVM(s)
}
// findContainerVM assumes vms.mu is held.
func (vms *VMs) findContainerVM(s string) (*ContainerVM, error) {
vm := vms.findVM(s)
if vm == nil {
return nil, vmNotFound(s)
}
if vm, ok := vm.(*ContainerVM); ok {
return vm, nil
}
return nil, vmNotContainer(s)
}
// FindKvmVM finds a VM based on its ID, name, or UUID.
func (vms *VMs) FindKvmVM(s string) (*KvmVM, error) {
vms.mu.Lock()
defer vms.mu.Unlock()
return vms.findKvmVM(s)
}
// findKvmVm assumesvms.mu is held.
func (vms *VMs) findKvmVM(s string) (*KvmVM, error) {
vm := vms.findVM(s)
if vm == nil {
return nil, vmNotFound(s)
}
if vm, ok := vm.(*KvmVM); ok {
return vm, nil
}
return nil, vmNotKVM(s)
}
// FindKvmVMs finds all KvmVMs.
func (vms *VMs) FindKvmVMs() []*KvmVM {
vms.mu.Lock()
defer vms.mu.Unlock()
res := []*KvmVM{}
for _, vm := range vms.m {
if vm, ok := vm.(*KvmVM); ok {
res = append(res, vm)
}
}
return res
}
// Launch takes QueuedVMs and launches them after performing a few sanity
// checks. Launch returns any errors that occur via a channel since it launches
// VMs asynchronously.
func (vms *VMs) Launch(namespace string, q *QueuedVMs) <-chan error {
errs := make(chan error)
go func() {
defer close(errs)
// prefetch any files associated with VMs
if err := q.GetFiles(); err != nil {
errs <- err
return
}
vms.mu.Lock()
defer vms.mu.Unlock()
var wg sync.WaitGroup
log.Info("launching %v %v vms", len(q.Names), q.VMType)
start := time.Now()
for _, name := range q.Names {
// Create new VM and test it for conflicts against other VMs.
vm, err := NewVM(name, namespace, q.VMType, q.VMConfig)
if err == nil {
for _, vm2 := range vms.m {
if err = vm2.Conflicts(vm); err != nil {
break
}
}
}
if err != nil {
errs <- err
continue
}
// Add the newly created VM to the map so that it gets included in
// future conflict tests.
vms.m[vm.GetID()] = vm
// The actual launching can happen in parallel while we keep checking
// for conflicts.
wg.Add(1)
go func(name string) {
defer wg.Done()
// Note: the VM is already in the VMs map
if err := vm.Launch(); err != nil {
errs <- err
return
}
if err := writeVMConfig(vm); err != nil {
errs <- err
return
}
}(name)
}
wg.Wait()
stop := time.Now()
log.Info("launched %v %v vms in %v", len(q.Names), q.VMType, stop.Sub(start))
}()
return errs
}
// Stop VMs matching target.
func (vms *VMs) Stop(target string) error {
return vms.Apply(target, func(vm VM, _ bool) (bool, error) {
if vm.GetState()&VM_RUNNING != 0 {
return true, vm.Stop()
}
return false, nil
})
}
// Kill VMs matching target
func (vms *VMs) Kill(target string) error {
return vms.Apply(target, func(vm VM, _ bool) (bool, error) {
if vm.GetState()&VM_KILLABLE == 0 {
return false, nil
}
if err := vm.Kill(); err != nil {
log.Error("unleash the zombie VM: %v", err)
return true, err
}
return true, nil
})
}
// Flush deletes VMs that are in the QUIT or ERROR state, disconnecting them
// from the provided ron.Server first.
func (vms *VMs) Flush(cc *ron.Server) error {
vms.mu.Lock()
defer vms.mu.Unlock()
var wg sync.WaitGroup
var mapLock sync.Mutex
for i, vm := range vms.m {
if vm.GetState()&(VM_QUIT|VM_ERROR) != 0 {
wg.Add(1)
go func(i int, vm VM) {
log.Info("deleting VM: %v", i)
if err := vm.Disconnect(cc); err != nil {
log.Error("unable to disconnect to cc for vm %v: %v", vm.GetID(), err)
}
if err := vm.Flush(); err != nil {
log.Error("clogged vm %v: %v", vm.GetID(), err)
}
mapLock.Lock()
defer mapLock.Unlock()
delete(vms.m, i)
wg.Done()
}(i, vm)
}
}
wg.Wait()
return nil
}
func (vms *VMs) ProcStats(d time.Duration) []*VMProcStats {
vms.mu.Lock()
defer vms.mu.Unlock()
var res []*VMProcStats
var wg sync.WaitGroup
var mu sync.Mutex
for _, vm := range vms.m {
wg.Add(1)
go func(vm VM) {
defer wg.Done()
var err error
p := &VMProcStats{
Name: vm.GetName(),
}
p.A, err = vm.ProcStats()
if err != nil {
log.Error("failed to get process stats for %v: %v", vm.GetID(), err)
return
}
time.Sleep(d)
p.B, err = vm.ProcStats()
if err != nil {
log.Error("failed to get process stats for %v: %v", vm.GetID(), err)
return
}
// Update dynamic fields before querying info
vm.UpdateNetworks()
for _, nic := range vm.GetNetworks() {
p.RxRate += nic.RxRate
p.TxRate += nic.TxRate
}
mu.Lock()
defer mu.Unlock()
res = append(res, p)
}(vm)
}
wg.Wait()
return res
}
// Apply fn to VMs, wrapping apply, with proper locking. Collapses error slice
// into single error.
func (vms *VMs) Apply(target string, fn vmApplyFunc) error {
vms.mu.Lock()
defer vms.mu.Unlock()
return makeErrSlice(vms.apply(target, fn))
}
// apply is the fan-out/fan-in method to apply a function to a set of VMs
// specified by target. Specifically, it:
//
// 1. Expands target to a list of VM names and IDs (or wild)
// 2. Invokes fn on all the matching VMs
// 3. Collects all the errors from the invoked fns
// 4. Records in the log a list of VMs that were not found
//
// The fn that is passed in takes two arguments: the VM struct and a boolean
// specifying whether the invocation was wild or not. The fn returns a boolean
// that indicates whether the target was applicable (e.g. calling start on an
// already running VM would not be applicable) and an error.
func (vms *VMs) apply(target string, fn vmApplyFunc) []error {
// Some callstack voodoo magic
if pc, _, _, ok := runtime.Caller(1); ok {
if fn := runtime.FuncForPC(pc); fn != nil {
log.Debug("applying %v to %v", fn.Name(), target)
}
}
names := map[string]bool{} // Names of VMs for which to apply fn
ids := map[int]bool{} // IDs of VMs for which to apply fn
vals, err := ranges.SplitList(target)
if err != nil {
return []error{err}
}
for _, v := range vals {
id, err := strconv.Atoi(v)
if err == nil {
ids[id] = true
} else {
names[v] = true
}
}
wild := hasWildcard(names)
delete(names, Wildcard)
// wg determine when it's okay to close errChan
var wg sync.WaitGroup
errChan := make(chan error)
// lock prevents concurrent writes to results
var lock sync.Mutex
results := map[string]bool{}
// Wrap function with magic
magicFn := func(vm VM) {
defer wg.Done()
ok, err := fn(vm, wild)
if err != nil {
errChan <- err
}
lock.Lock()
defer lock.Unlock()
results[vm.GetName()] = ok
results[strconv.Itoa(vm.GetID())] = ok
}
for _, vm := range vms.m {
if wild || names[vm.GetName()] || ids[vm.GetID()] {
delete(names, vm.GetName())
delete(ids, vm.GetID())
wg.Add(1)
go magicFn(vm)
}
}
go func() {
wg.Wait()
close(errChan)
}()
var errs []error
for err := range errChan {
errs = append(errs, err)
}
// Special cases: specified one VM and
// 1. it wasn't found
// 2. it wasn't a valid target (e.g. start already running VM)
if len(vals) == 1 && !wild {
if (len(names) + len(ids)) == 1 {
errs = append(errs, vmNotFound(vals[0]))
} else if !results[vals[0]] {
errs = append(errs, fmt.Errorf("VM state error: %v", vals[0]))
}
}
// Log the names/ids of the vms that weren't found
if (len(names) + len(ids)) > 0 {
vals := []string{}
for v := range names {
vals = append(vals, v)
}
for v := range ids {
vals = append(vals, strconv.Itoa(v))
}
log.Info("VMs not found: %v", vals)
}
return errs
}
// meshageVMLauncher handles VM launches sent by the scheduler
func meshageVMLauncher() {
for m := range meshageVMLaunchChan {
go func(m *meshage.Message) {
cmd := m.Body.(meshageVMLaunch)
ns := GetOrCreateNamespace(cmd.Namespace)
errs := []string{}
for _, err := range ns.Launch(cmd.QueuedVMs) {
errs = append(errs, err.Error())
}
to := []string{m.Source}
msg := meshageVMResponse{Errors: errs, TID: cmd.TID}
if _, err := meshageNode.Set(to, msg); err != nil {
log.Errorln(err)
}
}(m)
}
}
// GlobalVMs gets the VMs from all hosts in the mesh, filtered to the current
// namespace, if applicable. The keys of the returned map do not match the VM's
// ID.
func GlobalVMs(ns *Namespace) []VM {
cmdLock.Lock()
defer cmdLock.Unlock()
return globalVMs(ns)
}
// globalVMs is GlobalVMs without locking cmdLock.
func globalVMs(ns *Namespace) []VM {
// run `vm info` across the namespace
cmds := namespaceCommands(ns, minicli.MustCompile("vm info"))
// Collected VMs
vms := []VM{}
// LOCK: see func description.
for resps := range runCommands(cmds...) {
for _, resp := range resps {
if resp.Error != "" {
log.Errorln(resp.Error)
continue
}
if vms2, ok := resp.Data.([]VM); ok {
for _, vm := range vms2 {
vms = append(vms, vm)
}
} else {
log.Error("unknown data field in `vm info` from %v", resp.Host)
}
}
}
return vms
}
// expandVMNames takes a VM name, range, or count and expands it into a list of
// names of VMs that should be launched. Does several sanity checks on the
// names to make sure that they aren't reserved words.
func expandLaunchNames(arg string) ([]string, error) {
names := []string{}
count, err := strconv.ParseInt(arg, 10, 32)
if err != nil {
names, err = ranges.SplitList(arg)
} else if count <= 0 {
err = errors.New("invalid number of vms (must be > 0)")
} else {
names = make([]string, count)
}
if err != nil {
return nil, err
}
if len(names) == 0 {
return nil, errors.New("no VMs to launch")
}
for i, name := range names {
if isReserved(name) {
return nil, fmt.Errorf("invalid vm name, `%s` is a reserved word", name)
}
if _, err := strconv.Atoi(name); err == nil {
return nil, fmt.Errorf("invalid vm name, `%s` is an integer", name)
}
if name == "vince" {
log.Warn("vince is unstoppable")
}
// Check for conflicts within the provided names. Don't conflict with
// ourselves or if the name is unspecified.
for j, name2 := range names {
if i != j && name == name2 && name != "" {
return nil, fmt.Errorf("`%s` is specified twice in VMs to launch", name)
}
}
}
return names, nil
}