/
probe.go
142 lines (125 loc) · 4.32 KB
/
probe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package cmd
import (
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/pkg/errors"
"github.com/tilt-dev/probe/pkg/probe"
"github.com/tilt-dev/probe/pkg/prober"
"github.com/tilt-dev/tilt/pkg/apis/core/v1alpha1"
)
var ErrUnsupportedProbeType = errors.New("unsupported probe type")
func ProvideProberManager() ProberManager {
return prober.NewManager()
}
type ProberManager interface {
HTTPGet(u *url.URL, headers http.Header) prober.ProberFunc
TCPSocket(host string, port int) prober.ProberFunc
Exec(name string, args ...string) prober.ProberFunc
}
func probeWorkerFromSpec(manager ProberManager, probeSpec *v1alpha1.Probe, resultFunc probe.ResultFunc) (*probe.Worker, error) {
probeFunc, err := proberFromSpec(manager, probeSpec)
if err != nil {
return nil, err
}
var opts []probe.WorkerOption
if probeSpec.InitialDelaySeconds >= 0 {
opts = append(opts, probe.WorkerInitialDelay(time.Duration(probeSpec.InitialDelaySeconds)*time.Second))
}
if probeSpec.TimeoutSeconds > 0 {
opts = append(opts, probe.WorkerTimeout(time.Duration(probeSpec.TimeoutSeconds)*time.Second))
}
if probeSpec.PeriodSeconds > 0 {
opts = append(opts, probe.WorkerPeriod(time.Duration(probeSpec.PeriodSeconds)*time.Second))
}
if probeSpec.SuccessThreshold > 0 {
opts = append(opts, probe.WorkerSuccessThreshold(int(probeSpec.SuccessThreshold)))
}
if probeSpec.FailureThreshold > 0 {
opts = append(opts, probe.WorkerFailureThreshold(int(probeSpec.FailureThreshold)))
}
if resultFunc != nil {
opts = append(opts, probe.WorkerOnProbeResult(resultFunc))
}
w := probe.NewWorker(probeFunc, opts...)
return w, nil
}
func proberFromSpec(manager ProberManager, probeSpec *v1alpha1.Probe) (prober.Prober, error) {
if probeSpec == nil {
return nil, nil
} else if probeSpec.Exec != nil {
return manager.Exec(probeSpec.Exec.Command[0], probeSpec.Exec.Command[1:]...), nil
} else if probeSpec.HTTPGet != nil {
u, err := extractURL(probeSpec.HTTPGet)
if err != nil {
return nil, err
}
return manager.HTTPGet(u, convertHeaders(probeSpec.HTTPGet.HTTPHeaders)), nil
} else if probeSpec.TCPSocket != nil {
port, err := extractPort(probeSpec.TCPSocket.Port)
if err != nil {
return nil, err
}
host := probeSpec.TCPSocket.Host
if host == "" {
// K8s defaults to pod IP; since this is a local resource,
// localhost is a sane default to somewhat mimic that
// behavior and reduce the amount of boilerplate to define
// a probe in most cases
host = "localhost"
}
return manager.TCPSocket(host, port), nil
}
return nil, ErrUnsupportedProbeType
}
// extractURL converts a K8s HTTP GET probe spec to a Go URL
// adapted from https://github.com/kubernetes/kubernetes/blob/v1.20.2/pkg/kubelet/prober/prober.go#L163-L186
func extractURL(httpGet *v1alpha1.HTTPGetAction) (*url.URL, error) {
port, err := extractPort(httpGet.Port)
if err != nil {
return nil, err
}
u, err := url.Parse(httpGet.Path)
if err != nil {
return nil, err
}
u.Scheme = strings.ToLower(string(httpGet.Scheme))
if u.Scheme == "" {
// same default as K8s (plain http)
u.Scheme = "http"
}
host := httpGet.Host
if host == "" {
// K8s defaults to pod IP; since this is a local resource,
// localhost is a sane default to somewhat mimic that
// behavior and reduce the amount of boilerplate to define
// a probe in most cases
host = "localhost"
}
u.Host = net.JoinHostPort(host, strconv.Itoa(port))
return u, nil
}
// extractPort converts a K8s multi-type value to a valid port number or returns an error.
// adapted from https://github.com/kubernetes/kubernetes/blob/v1.20.2/pkg/kubelet/prober/prober.go#L203-L223
// (note: this implementation is substantially simplified from K8s - it does not handle "named" ports as that
//
// does not apply)
func extractPort(port int32) (int, error) {
if port <= 0 || port > 65535 {
return 0, fmt.Errorf("port number out of range: %d", port)
}
return int(port), nil
}
// convertHeaders creates a stdlib http.Header map from a collection of HTTP header key-value pairs
// adapted from https://github.com/kubernetes/kubernetes/blob/v1.20.2/pkg/kubelet/prober/prober.go#L146-L154
func convertHeaders(headerList []v1alpha1.HTTPHeader) http.Header {
headers := make(http.Header)
for _, header := range headerList {
headers[header.Name] = append(headers[header.Name], header.Value)
}
return headers
}