-
Notifications
You must be signed in to change notification settings - Fork 1
/
xCUTEr.go
132 lines (116 loc) · 2.97 KB
/
xCUTEr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright (c) 2016 Niklas Wolber
// This file is licensed under the MIT license.
// See the LICENSE file for more information.
package xCUTEr
import (
"context"
"fmt"
"io/ioutil"
"log"
"os"
"sync/atomic"
"time"
"github.com/fsnotify/fsnotify"
"github.com/nwolber/xCUTEr/job"
)
// XCUTEr contains the main logic for xCUTEr
type XCUTEr struct {
Start, Stop, Cancel func()
Done <-chan struct{}
Inactive, Scheduled func() []*schedInfo
Running, Completed func() []*runInfo
MaxCompleted func() uint32
SetMaxCompleted func(uint32)
}
const (
outputKey = "output"
)
// New creates a new xCUTEr with the given config options.
func New(jobDir string, sshTTL, sshKeepAlive time.Duration, file, logFile, telemetryEndpoint string, once, quiet bool) (*XCUTEr, error) {
log.SetFlags(log.Flags() | log.Lshortfile)
if logFile != "" && !quiet {
f, err := os.OpenFile(logFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
log.Fatalln(err)
}
log.SetOutput(f)
os.Stdout = f
os.Stderr = f
}
mainCtx, mainCancel := context.WithCancel(context.Background())
if quiet {
log.SetOutput(ioutil.Discard)
mainCtx = context.WithValue(mainCtx, outputKey, ioutil.Discard)
}
job.InitializeSSHClientStore(sshTTL)
job.KeepAliveInterval = sshKeepAlive
e, err := newExecutor(mainCtx, telemetryEndpoint)
if err != nil {
mainCancel()
return nil, err
}
e.Start()
// do we run only a single job file?
if file != "" {
j, err := e.parse(file)
if err != nil {
err = fmt.Errorf("error parsing %s: %s", file, err)
mainCancel()
return nil, err
}
go func() {
e.Run(j, once)
if j.c.Schedule == "once" || once {
defer mainCancel()
}
}()
} else {
fsEvents := make(chan fsnotify.Event)
w := &watcher{
path: jobDir,
}
go w.watch(mainCtx, fsEvents)
// main event loop
go func() {
for {
select {
case event := <-fsEvents:
if event.Op&fsnotify.Create == fsnotify.Create {
j, err := e.parse(event.Name)
if err != nil {
log.Println("error parsing", event.Name, err)
continue
}
e.Add(j)
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
e.Remove(event.Name)
} else if event.Op&fsnotify.Rename == fsnotify.Rename {
e.Remove(event.Name)
} else if event.Op&fsnotify.Write == fsnotify.Write {
e.Remove(event.Name)
j, err := e.parse(event.Name)
if err != nil {
log.Println("error parsing", event.Name, err)
continue
}
e.Add(j)
}
case <-mainCtx.Done():
return
}
}
}()
}
return &XCUTEr{
Done: mainCtx.Done(),
Cancel: mainCancel,
Start: e.Start,
Stop: e.Stop,
Inactive: e.GetInactive,
Scheduled: e.GetScheduled,
Running: e.GetRunning,
Completed: e.GetCompleted,
MaxCompleted: func() uint32 { return e.maxCompleted },
SetMaxCompleted: func(max uint32) { atomic.StoreUint32(&e.maxCompleted, max) },
}, nil
}