/
workerrunner.go
84 lines (72 loc) · 2.52 KB
/
workerrunner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package main
import (
"bytes"
"io"
"log"
"os"
"github.com/taskcluster/taskcluster/v29/internal/workerproto"
)
var (
// Support for communication betweeen this process and worker-runner. This
// is initialized early in the `generic-worker run` process and can be used
// by any component after that time.
WorkerRunnerProtocol *workerproto.Protocol
// The transport behind WorkerRunnerProtocol
workerRunnerTransport workerproto.Transport
)
// A loggingWriter implements io.Writer and should be passed to a `log` instance
// as its Output. It will translate all written messages into messages to
// worker-runner, or if that is not supported output them to stderr as usual.
type loggingWriter struct {
// when the protocol does not support logging, messages go to this logger.
backup *log.Logger
}
func (w *loggingWriter) Write(p []byte) (n int, err error) {
// https://golang.org/pkg/log/
// > Each logging operation makes a single call to the Writer's Write method.
message := string(bytes.TrimRight(p, "\n"))
if WorkerRunnerProtocol.Capable("log") {
WorkerRunnerProtocol.Send(workerproto.Message{
Type: "log",
Properties: map[string]interface{}{
"body": map[string]interface{}{
"textPayload": message,
},
},
})
} else {
w.backup.Println(message)
}
n = len(p)
return
}
// Set up the worker process to interact with worker-runner or, if withWorkerRunner is false,
// set up a "null" protocol that does not claim any capabilities.
func initializeWorkerRunnerProtocol(input io.Reader, output io.Writer, withWorkerRunner bool) {
if withWorkerRunner {
transp := workerproto.NewPipeTransport(input, output)
workerRunnerTransport = transp
// set up to send everything that goes through the log package's default
// logger through the protocol, with a backup strategy sending to stderr
// location as the default logger.
backup := log.New(os.Stderr, "", log.Flags())
log.SetOutput(&loggingWriter{backup})
log.SetFlags(0)
} else {
workerRunnerTransport = workerproto.NewNullTransport()
}
WorkerRunnerProtocol = workerproto.NewProtocol(workerRunnerTransport)
WorkerRunnerProtocol.AddCapability("graceful-termination")
WorkerRunnerProtocol.AddCapability("log")
WorkerRunnerProtocol.Start(true)
// when not using worker-runner, consider the protocol initialized with no capabilities
if !withWorkerRunner {
WorkerRunnerProtocol.SetInitialized()
}
}
func teardownWorkerRunnerProtocol() {
log.SetOutput(os.Stderr)
log.SetFlags(log.LstdFlags)
WorkerRunnerProtocol = nil
workerRunnerTransport = nil
}