/
worker.go
376 lines (324 loc) · 11.6 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
package main
import (
"embed"
"fmt"
"html/template"
"io/ioutil"
"log"
"os"
"os/exec"
"os/user"
"runtime"
"strconv"
"sync"
"syscall"
"time"
)
// buffered (maxJobs) channel to hold submitted (*job).
var globalJobsQueue chan *Job
// store all submitted jobs after processed with job id as key.
var globalJobsResults map[string]*Job
var mapLock *sync.RWMutex
//go:embed assets/websocket.html
var streamPageFS embed.FS
var streamPageTemplate *template.Template
func initializeWorkerSettings() error {
var err error
log.Println("initializing default folders and worker settings ...")
mapLock = &sync.RWMutex{}
globalJobsQueue = make(chan *Job, Config.MaxJobsQueueBuffer)
globalJobsResults = make(map[string]*Job)
// compile the tempplate from the filesystem.
streamPageTemplate, err = template.ParseFS(streamPageFS, "assets/websocket.html")
if err != nil {
log.Printf("failed to initialize streaming web page template - errmsg: %v\n", err)
return err
}
// ensure jobs outputs & backups folder are present.
err = createFolder(Config.JobsOutputsFolder)
if err != nil {
log.Printf("failed to ensure jobs outputs folder - errmsg: %v\n", err)
return err
}
err = createFolder(Config.JobsOutputsBackupsFolder)
if err != nil {
log.Printf("failed to ensure jobs outputs backups folder - errmsg: %v\n", err)
return err
}
Config.DefaultJobsInfosTimeLocation, err = time.LoadLocation(Config.DefaultJobsInfosTimezone)
if err != nil {
log.Printf("failed to load and convert default timezone value to time location - errmsg: %v\n", err)
return err
}
// for linux-based platform lets find the current shell binary path
// if environnement shell is set and not empty we use it as default.
if runtime.GOOS != "windows" {
if len(os.Getenv("SHELL")) > 0 {
Config.DefaultLinuxShell = os.Getenv("SHELL")
}
}
// best effort to check & enforce if needed the max number of open files on linux.
enforceMaxOpenFiles()
return nil
}
// savePID is a function to create new file and put inside the pid value passed.
func savePID(pid int) error {
f, err := os.Create(Config.WorkerPidFilePath)
if err != nil {
log.Printf("failed to create pid file - errmsg: %v\n", err)
return err
}
defer f.Close()
if _, err := f.WriteString(strconv.Itoa(pid)); err != nil {
log.Printf("failed to create pid file - errmsg: %v\n", err)
return err
}
f.Sync()
return nil
}
// getPID read the stored PID value stored into the local file and return it.
func getPID() (pid int, err error) {
if _, err = os.Stat(Config.WorkerPidFilePath); err != nil {
return 0, err
}
// read the file content.
data, err := ioutil.ReadFile(Config.WorkerPidFilePath)
if err != nil {
return 0, err
}
// convert string-based value into integer.
pid, err = strconv.Atoi(string(data))
if err != nil {
return 0, fmt.Errorf("invalid PID value: %s", string(data))
}
return pid, nil
}
// runWorkerService is the core function to spin up the worker and all its background services.
func runWorkerService() error {
runtime.GOMAXPROCS(runtime.NumCPU())
Config.HttpsServerHost = os.Args[2]
Config.HttpsServerPort = os.Args[3]
user, err := user.Current()
if err != nil {
log.Printf("failed to retrieve owner name of this worker process - errmsg: %v", err)
return err
}
log.Printf("started worker service. [user: %s] - [pid: %d] - [ppid: %d]\n", user.Username, os.Getpid(), os.Getppid())
// silently remove pid file if deamon exit.
defer func() {
os.Remove(Config.WorkerPidFilePath)
os.Exit(0)
}()
// init global variables and loads configs.
err = initializeWorkerSettings()
if err != nil {
return err
}
// to make sure all goroutines exit before leaving the program.
wg := &sync.WaitGroup{}
// channel to instruct jobs monitor to exit.
exit := make(chan struct{}, 1)
// background handler for interruption signals.
wg.Add(1)
go handleSignals(exit, wg)
wg.Add(1)
// background jobs map cleaner.
go cleanupMapResults(Config.JobsCleanupRunInterval, Config.JobsCleanupMaxFetch, Config.JobsCleanupMaxAge, exit, wg)
wg.Add(1)
// start the jobs queue monitor.
go jobsMonitor(exit, wg)
// setup logs files.
setupLoggers()
// run https web server and block until server exits (shutdown).
err = startWebServer(exit)
// once server stops - wait until all goroutines exit.
wg.Wait()
log.Printf("stopped all goroutines of worker service. [pid: %d] - [user: %s] - [ppid: %d]\n", os.Getpid(), user.Username, os.Getppid())
return err
}
// checkWorkerService will read stored PID from the local file and will send a
// non impact signal 0 toward that process to check if it exists (running) or not.
func checkWorkerService() (err error) {
var pid int
// anonymous function which check final value
// of pid variable and display deamon status.
defer func() {
if pid == 0 {
fmt.Println("status: not active")
return
}
fmt.Printf("worker service [pid: %d]. status is active\n", pid)
}()
// read the pid from the file.
pid, err = getPID()
if err != nil {
if os.IsNotExist(err) {
// the file does not exist.
return nil
}
return err
}
// on windows platform, we use tasklist and findstr to search for the pid.
// Because syscall.Signal is not available in Windows.
if runtime.GOOS == "windows" {
out, err := exec.Command("cmd", "/K", fmt.Sprintf("tasklist /FI \"PID eq %d\" | findstr %d", pid, pid)).Output()
if err != nil {
fmt.Printf("\n[-] process [pid: %d] does not seem to exist. removing the pid file: <%s>\n", pid, Config.WorkerPidFilePath)
os.Remove(Config.WorkerPidFilePath)
// set back to 0 so defer function can display inactive status.
pid = 0
// manual verification instructions.
fmt.Println("\n[1] Open the windows console with administrator privileges")
fmt.Printf("[2] Check if the worker.exe is running with <tasklist /FI \"PID eq %v\">\n", pid)
fmt.Printf("[3] If it is running, try to force stop it with <taskkill /F /PID %v /T>\n\n", pid)
return err
}
// no error - command successfully executed
// check output lenght if any data inside
// no data means pid not found - so inactive.
if len(out[:]) == 0 {
pid = 0
}
// bytes data inside output so return without changing
// the pid value to have the defer print active.
return nil
}
// try to locate the process.
process, err := os.FindProcess(pid)
if err != nil {
return err
}
// on linux-based platforms, we send a dummy (non impact) signal 0 to that process.
// if it responds then it is alive otherwise it is not running.
if err = process.Signal(syscall.Signal(0)); err != nil && err == syscall.ESRCH {
fmt.Printf("process [pid: %d] is not running. removing pid file: %s\n", pid, Config.WorkerPidFilePath)
os.Remove(Config.WorkerPidFilePath)
// set back to 0 so defer function can display inactive status.
pid = 0
}
return nil
}
// stopWorkerService loads the pid from the file and
// sends a kill signal to the associated process.
func stopWorkerService() error {
// read the pid from the file.
pid, err := getPID()
if err != nil {
if os.IsNotExist(err) {
// the file does not exist.
log.Printf("cannot find pid file [%q] to load the worker service pid.\n", Config.WorkerPidFilePath)
return nil
}
log.Printf("cannot retrieve the pid of the worker service. check if [%q] is present and there is worker process with the pid.\n", Config.WorkerPidFilePath)
return err
}
// try to locate the process.
process, err := os.FindProcess(pid)
if err != nil {
log.Printf("cannot find a running worker process [pid: %v]\n", pid)
if runtime.GOOS == "windows" {
// on windows only platforms.
log.Printf("1/ Open the windows console with administrator privileges")
log.Printf("2/ check if the worker is running with <tasklist /FI \"PID eq %v\">", pid)
log.Printf("3/ if it is running, try to force stop it with <taskkill /F /PID %v /T>", pid)
} else {
// any other unix-like OSes.
log.Printf("1/ Open the console with privileges to view & stop process")
log.Printf("2/ check if the worker is running with <ps -p %v>", pid)
log.Printf("3/ if it is running, try to stop it with <kill -9 %v>", pid)
}
return err
}
// remove the PID file
os.Remove(Config.WorkerPidFilePath)
if err := process.Kill(); err != nil {
log.Printf("failed to kill worker service [pid: %v]\n", pid)
// some tips to check and stop the worker process manually.
if runtime.GOOS == "windows" {
// on windows only platforms.
log.Printf("1/ Open the windows console with administrator privileges")
log.Printf("2/ check if the worker is running with <tasklist /FI \"PID eq %v\">", pid)
log.Printf("3/ if it is running, try to force stop it with <taskkill /F /PID %v /T>", pid)
} else {
// any other unix-like OSes.
log.Printf("1/ Open the console with privileges to view & stop process")
log.Printf("2/ check if the worker is running with <ps -p %v>", pid)
log.Printf("3/ if it is running, try to stop it with <kill -9 %v>", pid)
}
return err
} else {
log.Printf("successfully stopped worker service [pid: %v]\n", pid)
return nil
}
}
// restartWorkerService restarts the worker service process.
func restartWorkerService() error {
// read the pid from the file.
pid, err := getPID()
if err != nil {
if os.IsNotExist(err) {
// the file does not exist. start deamon
return startWorkerService()
}
// file may exist. remove it and start
os.Remove(Config.WorkerPidFilePath)
return startWorkerService()
}
// existig pid retreived so remove the file and kill process
os.Remove(Config.WorkerPidFilePath)
process, err := os.FindProcess(pid)
if err != nil {
// did not find process - may has exited. start
return startWorkerService()
}
if err := process.Kill(); err != nil {
// failed to kill existing process
fmt.Printf("failed to kill worker service [pid: %v]\n", pid)
return err
}
// succeed to kill existing - so start new.
return startWorkerService()
}
// startWorkerService calls and executes runDeamon() as a parallel process.
func startWorkerService() error {
// check if the deamon is already running.
if _, err := os.Stat(Config.WorkerPidFilePath); err == nil {
fmt.Printf("worker service is already running or the pid file [%s] already exists. try to restart.\n", Config.WorkerPidFilePath)
return err
}
// launch the program with deamon as argument
// syscall exec is not the same as fork.
cmd := exec.Command(os.Args[0], "run", Config.HttpsServerHost, Config.HttpsServerPort)
// setup the process working directory.
cmd.Dir = Config.WorkerWorkingDirectory
// single file to log output of worker - read by all and write only by the user.
workerlog, err := os.OpenFile(Config.WorkerLogFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Println("failed to create or open worker process log file", Config.WorkerLogFilePath)
return err
}
// send worker process output & error to its log file.
cmd.Stderr, cmd.Stdout = workerlog, workerlog
// start the process asynchronously.
if err := cmd.Start(); err != nil {
fmt.Println("failed to start the worker service process")
return err
}
// save the worker service pid to disk file.
if err := savePID(cmd.Process.Pid); err != nil {
// try to kill the process
if err := cmd.Process.Kill(); err != nil {
fmt.Printf("failed to kill the worker service process [pid: %d]\n", cmd.Process.Pid)
}
return err
}
// speudo deamon started
log.Printf("started worker service [pid: %d] - [ppid: %d]\n", cmd.Process.Pid, os.Getpid())
// below exit will terminate the group leader (current process)
// so once you close the terminal - the session will be terminated
// which will force the pseudo deamon to be terminated as well.
// that is why it is not a real deamon since it does not survives
// to its parent lifecycle here.
// os.Exit(0)
return nil
}